diff --git a/plugins/analysis/server/plugin.go b/plugins/analysis/server/plugin.go index 9aebe353354705021020a5299b1ecdb26286d4be..56db8ffce5ab9fa42fd909b04500c99336152547 100644 --- a/plugins/analysis/server/plugin.go +++ b/plugins/analysis/server/plugin.go @@ -1,7 +1,6 @@ package server import ( - "errors" "net" "strconv" "time" @@ -15,6 +14,7 @@ import ( "github.com/iotaledger/hive.go/network" "github.com/iotaledger/hive.go/network/tcp" "github.com/iotaledger/hive.go/node" + "github.com/iotaledger/hive.go/protocol" flag "github.com/spf13/pflag" ) @@ -27,8 +27,6 @@ const ( // IdleTimeout defines the idle timeout. IdleTimeout = 10 * time.Second - // StateHeartbeat defines the state of the heartbeat. - StateHeartbeat = packet.HeartbeatPacketHeader ) func init() { @@ -36,8 +34,6 @@ func init() { } var ( - // ErrInvalidPacketHeader defines an invalid packet header error. - ErrInvalidPacketHeader = errors.New("invalid packet header") // Plugin is the plugin instance of the analysis server plugin. Plugin = node.NewPlugin(PluginName, node.Disabled, configure, run) server *tcp.TCPServer @@ -77,9 +73,6 @@ func run(_ *node.Plugin) { } } -// ConnectionState defines the type of a connection state as a byte -type ConnectionState = byte - // HandleConnection handles the given connection. func HandleConnection(conn *network.ManagedConnection) { err := conn.SetTimeout(IdleTimeout) @@ -87,14 +80,15 @@ func HandleConnection(conn *network.ManagedConnection) { log.Errorf(err.Error()) } - var connectionState byte - var receiveBuffer []byte - - var onDisconnect *events.Closure + // create new protocol instance + p := protocol.New(conn) onReceiveData := events.NewClosure(func(data []byte) { - processIncomingPacket(&connectionState, &receiveBuffer, conn, data) + // process incoming data in protocol.Receive() + p.Receive(data) }) + + var onDisconnect *events.Closure onDisconnect = events.NewClosure(func() { conn.Events.ReceiveData.Detach(onReceiveData) conn.Events.Close.Detach(onDisconnect) @@ -103,57 +97,26 @@ func HandleConnection(conn *network.ManagedConnection) { conn.Events.ReceiveData.Attach(onReceiveData) conn.Events.Close.Attach(onDisconnect) - maxPacketsSize := getMaxPacketSize(packet.HeartbeatPacketMaxSize) + // connect protocol events to processors + wireUp(p) - go conn.Read(make([]byte, maxPacketsSize)) + // starts the protocol and reads from its connection + go p.Start() } -func getMaxPacketSize(packetSizes ...int) int { - maxPacketSize := 0 - - for _, packetSize := range packetSizes { - if packetSize > maxPacketSize { - maxPacketSize = packetSize - } - } - - return maxPacketSize -} - -func processIncomingPacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte) { - var err error - if *connectionState, *receiveBuffer, err = parsePackageHeader(data); err != nil { - Events.Error.Trigger(err) - conn.Close() - return - } - - switch *connectionState { - case StateHeartbeat: - processHeartbeatPacket(connectionState, receiveBuffer, conn, data) - } -} - -func parsePackageHeader(data []byte) (ConnectionState, []byte, error) { - var connectionState ConnectionState - var receiveBuffer []byte - - switch data[0] { - case packet.HeartbeatPacketHeader: - receiveBuffer = make([]byte, packet.HeartbeatPacketMaxSize) - connectionState = StateHeartbeat - default: - return 0, nil, ErrInvalidPacketHeader - } - - return connectionState, receiveBuffer, nil +// wireUp connects the Received events of the protocol to the packet specific processor +func wireUp(p *protocol.Protocol) { + p.Events.Received[packet.MessageTypeHeartbeat].Attach(events.NewClosure(func(data []byte) { + processHeartbeatPacket(data, p) + })) } -func processHeartbeatPacket(_ *byte, _ *[]byte, conn *network.ManagedConnection, data []byte) { +// processHeartbeatPacket parses the serialized data into a Heartbeat packet and triggers its event +func processHeartbeatPacket(data []byte, p *protocol.Protocol) { heartbeatPacket, err := packet.ParseHeartbeat(data) if err != nil { Events.Error.Trigger(err) - conn.Close() + p.CloseConnection() return } Events.Heartbeat.Trigger(heartbeatPacket)