Skip to content
Snippets Groups Projects
plugin.go 3.91 KiB
Newer Older
	"github.com/iotaledger/goshimmer/packages/shutdown"
	"github.com/iotaledger/goshimmer/plugins/analysis/types/heartbeat"
	"github.com/iotaledger/goshimmer/plugins/config"
	"github.com/iotaledger/hive.go/daemon"
capossele's avatar
capossele committed
	"github.com/iotaledger/hive.go/events"
	"github.com/iotaledger/hive.go/logger"
Wolfgang Welz's avatar
Wolfgang Welz committed
	"github.com/iotaledger/hive.go/network"
	"github.com/iotaledger/hive.go/network/tcp"
	"github.com/iotaledger/hive.go/node"
	// ErrInvalidPackageHeader defines an invalid package header error.
	ErrInvalidPackageHeader = errors.New("invalid package header")
	// ErrExpectedInitialAddNodePackage defines an expected initial add node package error.
	ErrExpectedInitialAddNodePackage = errors.New("expected initial add node package")
Wolfgang Welz's avatar
Wolfgang Welz committed
	server                           *tcp.TCPServer
capossele's avatar
capossele committed

// Configure configures the plugin.
func Configure(plugin *node.Plugin) {
Luca Moser's avatar
Luca Moser committed
	log = logger.NewLogger("Analysis-Server")
capossele's avatar
capossele committed
	server = tcp.NewServer()

	server.Events.Connect.Attach(events.NewClosure(HandleConnection))
	server.Events.Error.Attach(events.NewClosure(func(err error) {
		log.Errorf("error in server: %s", err.Error())
capossele's avatar
capossele committed
	}))
	server.Events.Start.Attach(events.NewClosure(func() {
		log.Infof("Starting Server (port %d) ... done", config.Node.GetInt(CfgServerPort))
capossele's avatar
capossele committed
	}))
	server.Events.Shutdown.Attach(events.NewClosure(func() {
		log.Info("Stopping Server ... done")
capossele's avatar
capossele committed
	}))
// Run runs the plugin.
func Run(plugin *node.Plugin) {
Luca Moser's avatar
Luca Moser committed
	daemon.BackgroundWorker("Analysis Server", func(shutdownSignal <-chan struct{}) {
		log.Infof("Starting Server (port %d) ... done", config.Node.GetInt(CfgServerPort))
		go server.Listen("0.0.0.0", config.Node.GetInt(CfgServerPort))
		<-shutdownSignal
		Shutdown()
// Shutdown shutdowns the plugin.
func Shutdown() {
	log.Info("Stopping Server ...")
capossele's avatar
capossele committed
	server.Shutdown()
	log.Info("Stopping Server ... done")
// HandleConnection handles the given connection.
func HandleConnection(conn *network.ManagedConnection) {
	err := conn.SetTimeout(IdleTimeout)
	if err != nil {
		log.Errorf(err.Error())
	}
	var connectionState byte
capossele's avatar
capossele committed
	var receiveBuffer []byte
capossele's avatar
capossele committed
	var onDisconnect *events.Closure
capossele's avatar
capossele committed
	onReceiveData := events.NewClosure(func(data []byte) {
		processIncomingPacket(&connectionState, &receiveBuffer, conn, data)
capossele's avatar
capossele committed
	})
	onDisconnect = events.NewClosure(func() {
		conn.Events.ReceiveData.Detach(onReceiveData)
		conn.Events.Close.Detach(onDisconnect)
	})
capossele's avatar
capossele committed
	conn.Events.ReceiveData.Attach(onReceiveData)
	conn.Events.Close.Attach(onDisconnect)
capossele's avatar
capossele committed
	maxPacketsSize := getMaxPacketSize(
		heartbeat.MaxMarshaledTotalSize,
capossele's avatar
capossele committed
	)
capossele's avatar
capossele committed
	go conn.Read(make([]byte, maxPacketsSize))
}

func getMaxPacketSize(packetSizes ...int) int {
capossele's avatar
capossele committed
	maxPacketSize := 0
capossele's avatar
capossele committed
	for _, packetSize := range packetSizes {
		if packetSize > maxPacketSize {
			maxPacketSize = packetSize
		}
	}
capossele's avatar
capossele committed
	return maxPacketSize
func processIncomingPacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte) {
	var err error
	if *connectionState, *receiveBuffer, err = parsePackageHeader(data); err != nil {
		Events.Error.Trigger(err)
		conn.Close()
capossele's avatar
capossele committed
	switch *connectionState {
	case StateHeartbeat:
		processHeartbeatPacket(connectionState, receiveBuffer, conn, data)
}

func parsePackageHeader(data []byte) (ConnectionState, []byte, error) {
capossele's avatar
capossele committed
	var connectionState ConnectionState
	var receiveBuffer []byte
capossele's avatar
capossele committed
	switch data[0] {
	case heartbeat.MarshaledPacketHeader:
		receiveBuffer = make([]byte, heartbeat.MaxMarshaledTotalSize)
		connectionState = StateHeartbeat
capossele's avatar
capossele committed
	default:
		return 0, nil, ErrInvalidPackageHeader
capossele's avatar
capossele committed
	return connectionState, receiveBuffer, nil
func processHeartbeatPacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte) {
	heartbeatPacket, err := heartbeat.Unmarshal(data)
	if err != nil {
		Events.Error.Trigger(err)
		conn.Close()
		return
capossele's avatar
capossele committed
	}
	Events.Heartbeat.Trigger(*heartbeatPacket)
capossele's avatar
capossele committed
}