Skip to content
Snippets Groups Projects
Unverified Commit 0bf4e5a2 authored by Wolfgang Welz's avatar Wolfgang Welz
Browse files

use new protocol

parent e68d3f27
No related branches found
No related tags found
No related merge requests found
...@@ -26,7 +26,7 @@ const ( ...@@ -26,7 +26,7 @@ const (
CfgAnalysisServerBindAddress = "analysis.server.bindAddress" CfgAnalysisServerBindAddress = "analysis.server.bindAddress"
// IdleTimeout defines the idle timeout. // IdleTimeout defines the idle timeout.
IdleTimeout = 10 * time.Second IdleTimeout = 1 * time.Minute
) )
func init() { func init() {
...@@ -37,6 +37,7 @@ var ( ...@@ -37,6 +37,7 @@ var (
// Plugin is the plugin instance of the analysis server plugin. // Plugin is the plugin instance of the analysis server plugin.
Plugin = node.NewPlugin(PluginName, node.Disabled, configure, run) Plugin = node.NewPlugin(PluginName, node.Disabled, configure, run)
server *tcp.TCPServer server *tcp.TCPServer
prot *protocol.Protocol
log *logger.Logger log *logger.Logger
) )
...@@ -64,7 +65,13 @@ func run(_ *node.Plugin) { ...@@ -64,7 +65,13 @@ func run(_ *node.Plugin) {
if err := daemon.BackgroundWorker(PluginName, func(shutdownSignal <-chan struct{}) { if err := daemon.BackgroundWorker(PluginName, func(shutdownSignal <-chan struct{}) {
log.Infof("%s started, bind-address=%s", PluginName, bindAddr) log.Infof("%s started, bind-address=%s", PluginName, bindAddr)
defer log.Infof("Stopping %s ... done", PluginName) defer log.Infof("Stopping %s ... done", PluginName)
// connect protocol events to processors
prot = protocol.New(packet.AnalysisMsgRegistry)
wireUp(prot)
go server.Listen(addr, port) go server.Listen(addr, port)
<-shutdownSignal <-shutdownSignal
log.Info("Stopping Server ...") log.Info("Stopping Server ...")
server.Shutdown() server.Shutdown()
...@@ -75,76 +82,72 @@ func run(_ *node.Plugin) { ...@@ -75,76 +82,72 @@ func run(_ *node.Plugin) {
// HandleConnection handles the given connection. // HandleConnection handles the given connection.
func HandleConnection(conn *network.ManagedConnection) { func HandleConnection(conn *network.ManagedConnection) {
err := conn.SetTimeout(IdleTimeout) conn.Events.Error.Attach(events.NewClosure(func(err error) {
log.Warn(err)
_ = conn.Close()
}))
err := conn.SetReadTimeout(IdleTimeout)
if err != nil { if err != nil {
log.Errorf(err.Error()) log.Error(err)
} }
// create new protocol instance
p := protocol.New(conn, packet.AnalysisMsgRegistry)
onReceiveData := events.NewClosure(func(data []byte) { onReceiveData := events.NewClosure(func(data []byte) {
// process incoming data in protocol.Receive() // process incoming data in protocol.Receive()
p.Receive(data) _, err := prot.Read(data)
}) if err != nil {
log.Warnf("invalid message received: %s", err)
var onDisconnect *events.Closure _ = conn.Close()
onDisconnect = events.NewClosure(func() { }
conn.Events.ReceiveData.Detach(onReceiveData)
conn.Events.Close.Detach(onDisconnect)
}) })
conn.Events.ReceiveData.Attach(onReceiveData) conn.Events.ReceiveData.Attach(onReceiveData)
conn.Events.Close.Attach(onDisconnect)
// connect protocol events to processors
wireUp(p)
// starts the protocol and reads from its connection // starts the protocol and reads from its connection
go p.Start() go func() {
buffer := make([]byte, 2048)
_, _ = conn.Read(buffer)
}()
} }
// wireUp connects the Received events of the protocol to the packet specific processor // wireUp connects the Received events of the protocol to the packet specific processor
func wireUp(p *protocol.Protocol) { func wireUp(p *protocol.Protocol) {
p.Events.Received[packet.MessageTypeHeartbeat].Attach(events.NewClosure(func(data []byte) { p.Events.Received[packet.MessageTypeHeartbeat].Attach(events.NewClosure(func(data []byte) {
processHeartbeatPacket(data, p) processHeartbeatPacket(data)
})) }))
p.Events.Received[packet.MessageTypeFPCHeartbeat].Attach(events.NewClosure(func(data []byte) { p.Events.Received[packet.MessageTypeFPCHeartbeat].Attach(events.NewClosure(func(data []byte) {
processFPCHeartbeatPacket(data, p) processFPCHeartbeatPacket(data)
})) }))
p.Events.Received[packet.MessageTypeMetricHeartbeat].Attach(events.NewClosure(func(data []byte) { p.Events.Received[packet.MessageTypeMetricHeartbeat].Attach(events.NewClosure(func(data []byte) {
processMetricHeartbeatPacket(data, p) processMetricHeartbeatPacket(data)
})) }))
} }
// processHeartbeatPacket parses the serialized data into a Heartbeat packet and triggers its event // processHeartbeatPacket parses the serialized data into a Heartbeat packet and triggers its event
func processHeartbeatPacket(data []byte, p *protocol.Protocol) { func processHeartbeatPacket(data []byte) {
heartbeatPacket, err := packet.ParseHeartbeat(data) heartbeatPacket, err := packet.ParseHeartbeat(data)
if err != nil { if err != nil {
Events.Error.Trigger(err) Events.Error.Trigger(err)
p.CloseConnection()
return return
} }
Events.Heartbeat.Trigger(heartbeatPacket) Events.Heartbeat.Trigger(heartbeatPacket)
} }
// processHeartbeatPacket parses the serialized data into a FPC Heartbeat packet and triggers its event // processHeartbeatPacket parses the serialized data into a FPC Heartbeat packet and triggers its event
func processFPCHeartbeatPacket(data []byte, p *protocol.Protocol) { func processFPCHeartbeatPacket(data []byte) {
hb, err := packet.ParseFPCHeartbeat(data) hb, err := packet.ParseFPCHeartbeat(data)
if err != nil { if err != nil {
Events.Error.Trigger(err) Events.Error.Trigger(err)
p.CloseConnection()
return return
} }
Events.FPCHeartbeat.Trigger(hb) Events.FPCHeartbeat.Trigger(hb)
} }
// processMetricHeartbeatPacket parses the serialized data into a MEtric Heartbeat packet and triggers its event // processMetricHeartbeatPacket parses the serialized data into a Metric Heartbeat packet and triggers its event
func processMetricHeartbeatPacket(data []byte, p *protocol.Protocol) { func processMetricHeartbeatPacket(data []byte) {
hb, err := packet.ParseMetricHeartbeat(data) hb, err := packet.ParseMetricHeartbeat(data)
if err != nil { if err != nil {
Events.Error.Trigger(err) Events.Error.Trigger(err)
p.CloseConnection()
return return
} }
Events.MetricHeartbeat.Trigger(hb) Events.MetricHeartbeat.Trigger(hb)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment