diff --git a/plugins/analysis/server/plugin.go b/plugins/analysis/server/plugin.go index 30e317a223e9e75949af8727de436c4086b13f17..6f396244562c328e98c528dd43fbf634f5a5928c 100644 --- a/plugins/analysis/server/plugin.go +++ b/plugins/analysis/server/plugin.go @@ -26,7 +26,7 @@ const ( CfgAnalysisServerBindAddress = "analysis.server.bindAddress" // IdleTimeout defines the idle timeout. - IdleTimeout = 10 * time.Second + IdleTimeout = 1 * time.Minute ) func init() { @@ -37,6 +37,7 @@ var ( // Plugin is the plugin instance of the analysis server plugin. Plugin = node.NewPlugin(PluginName, node.Disabled, configure, run) server *tcp.TCPServer + prot *protocol.Protocol log *logger.Logger ) @@ -64,7 +65,13 @@ func run(_ *node.Plugin) { if err := daemon.BackgroundWorker(PluginName, func(shutdownSignal <-chan struct{}) { log.Infof("%s started, bind-address=%s", PluginName, bindAddr) defer log.Infof("Stopping %s ... done", PluginName) + + // connect protocol events to processors + prot = protocol.New(packet.AnalysisMsgRegistry) + wireUp(prot) + go server.Listen(addr, port) + <-shutdownSignal log.Info("Stopping Server ...") server.Shutdown() @@ -75,76 +82,72 @@ func run(_ *node.Plugin) { // HandleConnection handles the given connection. func HandleConnection(conn *network.ManagedConnection) { - err := conn.SetTimeout(IdleTimeout) + conn.Events.Error.Attach(events.NewClosure(func(err error) { + log.Warn(err) + _ = conn.Close() + })) + + err := conn.SetReadTimeout(IdleTimeout) if err != nil { - log.Errorf(err.Error()) + log.Error(err) } - // create new protocol instance - p := protocol.New(conn, packet.AnalysisMsgRegistry) - onReceiveData := events.NewClosure(func(data []byte) { // process incoming data in protocol.Receive() - p.Receive(data) - }) - - var onDisconnect *events.Closure - onDisconnect = events.NewClosure(func() { - conn.Events.ReceiveData.Detach(onReceiveData) - conn.Events.Close.Detach(onDisconnect) + _, err := prot.Read(data) + if err != nil { + log.Warnf("invalid message received: %s", err) + _ = conn.Close() + } }) conn.Events.ReceiveData.Attach(onReceiveData) - conn.Events.Close.Attach(onDisconnect) - - // connect protocol events to processors - wireUp(p) // starts the protocol and reads from its connection - go p.Start() + go func() { + buffer := make([]byte, 2048) + _, _ = conn.Read(buffer) + }() } // wireUp connects the Received events of the protocol to the packet specific processor func wireUp(p *protocol.Protocol) { p.Events.Received[packet.MessageTypeHeartbeat].Attach(events.NewClosure(func(data []byte) { - processHeartbeatPacket(data, p) + processHeartbeatPacket(data) })) p.Events.Received[packet.MessageTypeFPCHeartbeat].Attach(events.NewClosure(func(data []byte) { - processFPCHeartbeatPacket(data, p) + processFPCHeartbeatPacket(data) })) p.Events.Received[packet.MessageTypeMetricHeartbeat].Attach(events.NewClosure(func(data []byte) { - processMetricHeartbeatPacket(data, p) + processMetricHeartbeatPacket(data) })) } // processHeartbeatPacket parses the serialized data into a Heartbeat packet and triggers its event -func processHeartbeatPacket(data []byte, p *protocol.Protocol) { +func processHeartbeatPacket(data []byte) { heartbeatPacket, err := packet.ParseHeartbeat(data) if err != nil { Events.Error.Trigger(err) - p.CloseConnection() return } Events.Heartbeat.Trigger(heartbeatPacket) } // processHeartbeatPacket parses the serialized data into a FPC Heartbeat packet and triggers its event -func processFPCHeartbeatPacket(data []byte, p *protocol.Protocol) { +func processFPCHeartbeatPacket(data []byte) { hb, err := packet.ParseFPCHeartbeat(data) if err != nil { Events.Error.Trigger(err) - p.CloseConnection() return } Events.FPCHeartbeat.Trigger(hb) } -// processMetricHeartbeatPacket parses the serialized data into a MEtric Heartbeat packet and triggers its event -func processMetricHeartbeatPacket(data []byte, p *protocol.Protocol) { +// processMetricHeartbeatPacket parses the serialized data into a Metric Heartbeat packet and triggers its event +func processMetricHeartbeatPacket(data []byte) { hb, err := packet.ParseMetricHeartbeat(data) if err != nil { Events.Error.Trigger(err) - p.CloseConnection() return } Events.MetricHeartbeat.Trigger(hb)