Skip to content
Snippets Groups Projects
Unverified Commit ad5110f0 authored by Levente Pap's avatar Levente Pap
Browse files

Refactor Analysis-Server for TLV protocol

parent 4c74b14e
No related branches found
No related tags found
No related merge requests found
package server package server
import ( import (
"errors"
"net" "net"
"strconv" "strconv"
"time" "time"
...@@ -15,6 +14,7 @@ import ( ...@@ -15,6 +14,7 @@ import (
"github.com/iotaledger/hive.go/network" "github.com/iotaledger/hive.go/network"
"github.com/iotaledger/hive.go/network/tcp" "github.com/iotaledger/hive.go/network/tcp"
"github.com/iotaledger/hive.go/node" "github.com/iotaledger/hive.go/node"
"github.com/iotaledger/hive.go/protocol"
flag "github.com/spf13/pflag" flag "github.com/spf13/pflag"
) )
...@@ -27,8 +27,6 @@ const ( ...@@ -27,8 +27,6 @@ const (
// IdleTimeout defines the idle timeout. // IdleTimeout defines the idle timeout.
IdleTimeout = 10 * time.Second IdleTimeout = 10 * time.Second
// StateHeartbeat defines the state of the heartbeat.
StateHeartbeat = packet.HeartbeatPacketHeader
) )
func init() { func init() {
...@@ -36,8 +34,6 @@ func init() { ...@@ -36,8 +34,6 @@ func init() {
} }
var ( var (
// ErrInvalidPacketHeader defines an invalid packet header error.
ErrInvalidPacketHeader = errors.New("invalid packet header")
// Plugin is the plugin instance of the analysis server plugin. // Plugin is the plugin instance of the analysis server plugin.
Plugin = node.NewPlugin(PluginName, node.Disabled, configure, run) Plugin = node.NewPlugin(PluginName, node.Disabled, configure, run)
server *tcp.TCPServer server *tcp.TCPServer
...@@ -77,9 +73,6 @@ func run(_ *node.Plugin) { ...@@ -77,9 +73,6 @@ func run(_ *node.Plugin) {
} }
} }
// ConnectionState defines the type of a connection state as a byte
type ConnectionState = byte
// HandleConnection handles the given connection. // HandleConnection handles the given connection.
func HandleConnection(conn *network.ManagedConnection) { func HandleConnection(conn *network.ManagedConnection) {
err := conn.SetTimeout(IdleTimeout) err := conn.SetTimeout(IdleTimeout)
...@@ -87,14 +80,15 @@ func HandleConnection(conn *network.ManagedConnection) { ...@@ -87,14 +80,15 @@ func HandleConnection(conn *network.ManagedConnection) {
log.Errorf(err.Error()) log.Errorf(err.Error())
} }
var connectionState byte // create new protocol instance
var receiveBuffer []byte p := protocol.New(conn)
var onDisconnect *events.Closure
onReceiveData := events.NewClosure(func(data []byte) { onReceiveData := events.NewClosure(func(data []byte) {
processIncomingPacket(&connectionState, &receiveBuffer, conn, data) // process incoming data in protocol.Receive()
p.Receive(data)
}) })
var onDisconnect *events.Closure
onDisconnect = events.NewClosure(func() { onDisconnect = events.NewClosure(func() {
conn.Events.ReceiveData.Detach(onReceiveData) conn.Events.ReceiveData.Detach(onReceiveData)
conn.Events.Close.Detach(onDisconnect) conn.Events.Close.Detach(onDisconnect)
...@@ -103,57 +97,26 @@ func HandleConnection(conn *network.ManagedConnection) { ...@@ -103,57 +97,26 @@ func HandleConnection(conn *network.ManagedConnection) {
conn.Events.ReceiveData.Attach(onReceiveData) conn.Events.ReceiveData.Attach(onReceiveData)
conn.Events.Close.Attach(onDisconnect) conn.Events.Close.Attach(onDisconnect)
maxPacketsSize := getMaxPacketSize(packet.HeartbeatPacketMaxSize) // connect protocol events to processors
wireUp(p)
go conn.Read(make([]byte, maxPacketsSize))
}
func getMaxPacketSize(packetSizes ...int) int {
maxPacketSize := 0
for _, packetSize := range packetSizes {
if packetSize > maxPacketSize {
maxPacketSize = packetSize
}
}
return maxPacketSize
}
func processIncomingPacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte) { // starts the protocol and reads from its connection
var err error go p.Start()
if *connectionState, *receiveBuffer, err = parsePackageHeader(data); err != nil {
Events.Error.Trigger(err)
conn.Close()
return
} }
switch *connectionState { // wireUp connects the Received events of the protocol to the packet specific processor
case StateHeartbeat: func wireUp(p *protocol.Protocol) {
processHeartbeatPacket(connectionState, receiveBuffer, conn, data) p.Events.Received[packet.MessageTypeHeartbeat].Attach(events.NewClosure(func(data []byte) {
} processHeartbeatPacket(data, p)
} }))
func parsePackageHeader(data []byte) (ConnectionState, []byte, error) {
var connectionState ConnectionState
var receiveBuffer []byte
switch data[0] {
case packet.HeartbeatPacketHeader:
receiveBuffer = make([]byte, packet.HeartbeatPacketMaxSize)
connectionState = StateHeartbeat
default:
return 0, nil, ErrInvalidPacketHeader
}
return connectionState, receiveBuffer, nil
} }
func processHeartbeatPacket(_ *byte, _ *[]byte, conn *network.ManagedConnection, data []byte) { // processHeartbeatPacket parses the serialized data into a Heartbeat packet and triggers its event
func processHeartbeatPacket(data []byte, p *protocol.Protocol) {
heartbeatPacket, err := packet.ParseHeartbeat(data) heartbeatPacket, err := packet.ParseHeartbeat(data)
if err != nil { if err != nil {
Events.Error.Trigger(err) Events.Error.Trigger(err)
conn.Close() p.CloseConnection()
return return
} }
Events.Heartbeat.Trigger(heartbeatPacket) Events.Heartbeat.Trigger(heartbeatPacket)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment