Newer
Older
package server
import (
capossele
committed
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/analysis/types/heartbeat"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/network"
"github.com/iotaledger/hive.go/network/tcp"
"github.com/iotaledger/hive.go/node"
// ErrInvalidPackageHeader defines an invalid package header error.
ErrInvalidPackageHeader = errors.New("invalid package header")
// ErrExpectedInitialAddNodePackage defines an expected initial add node package error.
ErrExpectedInitialAddNodePackage = errors.New("expected initial add node package")
log *logger.Logger
)
func Configure(plugin *node.Plugin) {
server = tcp.NewServer()
server.Events.Connect.Attach(events.NewClosure(HandleConnection))
server.Events.Error.Attach(events.NewClosure(func(err error) {
log.Errorf("error in server: %s", err.Error())
}))
server.Events.Start.Attach(events.NewClosure(func() {
log.Infof("Starting Server (port %d) ... done", config.Node.GetInt(CfgServerPort))
}))
server.Events.Shutdown.Attach(events.NewClosure(func() {
log.Info("Stopping Server ... done")
func Run(plugin *node.Plugin) {
daemon.BackgroundWorker("Analysis Server", func(shutdownSignal <-chan struct{}) {
log.Infof("Starting Server (port %d) ... done", config.Node.GetInt(CfgServerPort))
go server.Listen("0.0.0.0", config.Node.GetInt(CfgServerPort))
}, shutdown.PriorityAnalysis)
log.Info("Stopping Server ...")
log.Info("Stopping Server ... done")
// HandleConnection handles the given connection.
func HandleConnection(conn *network.ManagedConnection) {
onReceiveData := events.NewClosure(func(data []byte) {
processIncomingPacket(&connectionState, &receiveBuffer, conn, data)
})
onDisconnect = events.NewClosure(func() {
conn.Events.ReceiveData.Detach(onReceiveData)
conn.Events.Close.Detach(onDisconnect)
})
conn.Events.ReceiveData.Attach(onReceiveData)
conn.Events.Close.Attach(onDisconnect)
}
func getMaxPacketSize(packetSizes ...int) int {
for _, packetSize := range packetSizes {
if packetSize > maxPacketSize {
maxPacketSize = packetSize
}
}
func processIncomingPacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte) {
var err error
if *connectionState, *receiveBuffer, err = parsePackageHeader(data); err != nil {
Events.Error.Trigger(err)
case StateHeartbeat:
processHeartbeatPacket(connectionState, receiveBuffer, conn, data)
}
func parsePackageHeader(data []byte) (ConnectionState, []byte, error) {
var connectionState ConnectionState
var receiveBuffer []byte
case heartbeat.MarshaledPacketHeader:
receiveBuffer = make([]byte, heartbeat.MaxMarshaledTotalSize)
return 0, nil, ErrInvalidPackageHeader
func processHeartbeatPacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte) {
heartbeatPacket, err := heartbeat.Unmarshal(data)
if err != nil {
Events.Error.Trigger(err)
conn.Close()
return
Events.Heartbeat.Trigger(*heartbeatPacket)