Skip to content
Snippets Groups Projects
  • Luca Moser's avatar
    6d3b2a2c
    Refactors analysis plugins (#403) · 6d3b2a2c
    Luca Moser authored
    * normalize bind addresses prints
    
    * refactors analysis plugins
    
    * adds tests for parsing/serializing heartbeat packets
    
    * adds additional test case
    
    * normalize usage of word 'ids'
    
    * updates packr files
    
    * normalizes handler routes for static assets for the analysis web interface
    
    * normalize config key names
    
    * adjusts configs for new analysis config keys
    
    * fixes review comments
    Refactors analysis plugins (#403)
    Luca Moser authored
    * normalize bind addresses prints
    
    * refactors analysis plugins
    
    * adds tests for parsing/serializing heartbeat packets
    
    * adds additional test case
    
    * normalize usage of word 'ids'
    
    * updates packr files
    
    * normalizes handler routes for static assets for the analysis web interface
    
    * normalize config key names
    
    * adjusts configs for new analysis config keys
    
    * fixes review comments
plugin.go 4.40 KiB
package server

import (
	"errors"
	"net"
	"strconv"
	"time"

	"github.com/iotaledger/goshimmer/packages/shutdown"
	"github.com/iotaledger/goshimmer/plugins/analysis/packet"
	"github.com/iotaledger/goshimmer/plugins/config"
	"github.com/iotaledger/hive.go/daemon"
	"github.com/iotaledger/hive.go/events"
	"github.com/iotaledger/hive.go/logger"
	"github.com/iotaledger/hive.go/network"
	"github.com/iotaledger/hive.go/network/tcp"
	"github.com/iotaledger/hive.go/node"
	flag "github.com/spf13/pflag"
)

const (
	// PluginName is the name of the analysis server plugin.
	PluginName = "Analysis-Server"

	// CfgAnalysisServerBindAddress defines the bind address of the analysis server.
	CfgAnalysisServerBindAddress = "analysis.server.bindAddress"

	// IdleTimeout defines the idle timeout.
	IdleTimeout = 10 * time.Second
	// StateHeartbeat defines the state of the heartbeat.
	StateHeartbeat = packet.HeartbeatPacketHeader
)

func init() {
	flag.String(CfgAnalysisServerBindAddress, "0.0.0.0:16178", "the bind address of the analysis server")
}

var (
	// ErrInvalidPacketHeader defines an invalid packet header error.
	ErrInvalidPacketHeader = errors.New("invalid packet header")
	// Plugin is the plugin instance of the analysis server plugin.
	Plugin = node.NewPlugin(PluginName, node.Disabled, configure, run)
	server *tcp.TCPServer
	log    *logger.Logger
)

func configure(_ *node.Plugin) {
	log = logger.NewLogger(PluginName)
	server = tcp.NewServer()

	server.Events.Connect.Attach(events.NewClosure(HandleConnection))
	server.Events.Error.Attach(events.NewClosure(func(err error) {
		log.Errorf("error in server: %s", err.Error())
	}))
}

func run(_ *node.Plugin) {
	bindAddr := config.Node.GetString(CfgAnalysisServerBindAddress)
	addr, portStr, err := net.SplitHostPort(bindAddr)
	if err != nil {
		log.Fatal("invalid bind address in %s: %s", CfgAnalysisServerBindAddress, err)
	}
	port, err := strconv.Atoi(portStr)
	if err != nil {
		log.Fatal("invalid port in %s: %s", CfgAnalysisServerBindAddress, err)
	}

	if err := daemon.BackgroundWorker(PluginName, func(shutdownSignal <-chan struct{}) {
		log.Infof("%s started, bind-address=%s", PluginName, bindAddr)
		defer log.Infof("Stopping %s ... done", PluginName)
		go server.Listen(addr, port)
		<-shutdownSignal
		log.Info("Stopping Server ...")
		server.Shutdown()
	}, shutdown.PriorityAnalysis); err != nil {
		log.Panic(err)
	}
}

// ConnectionState defines the type of a connection state as a byte
type ConnectionState = byte

// HandleConnection handles the given connection.
func HandleConnection(conn *network.ManagedConnection) {
	err := conn.SetTimeout(IdleTimeout)
	if err != nil {
		log.Errorf(err.Error())
	}

	var connectionState byte
	var receiveBuffer []byte

	var onDisconnect *events.Closure

	onReceiveData := events.NewClosure(func(data []byte) {
		processIncomingPacket(&connectionState, &receiveBuffer, conn, data)
	})
	onDisconnect = events.NewClosure(func() {
		conn.Events.ReceiveData.Detach(onReceiveData)
		conn.Events.Close.Detach(onDisconnect)
	})

	conn.Events.ReceiveData.Attach(onReceiveData)
	conn.Events.Close.Attach(onDisconnect)

	maxPacketsSize := getMaxPacketSize(packet.HeartbeatPacketMaxSize)

	go conn.Read(make([]byte, maxPacketsSize))
}

func getMaxPacketSize(packetSizes ...int) int {
	maxPacketSize := 0

	for _, packetSize := range packetSizes {
		if packetSize > maxPacketSize {
			maxPacketSize = packetSize
		}
	}

	return maxPacketSize
}

func processIncomingPacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte) {
	var err error
	if *connectionState, *receiveBuffer, err = parsePackageHeader(data); err != nil {
		Events.Error.Trigger(err)
		conn.Close()
		return
	}

	switch *connectionState {
	case StateHeartbeat:
		processHeartbeatPacket(connectionState, receiveBuffer, conn, data)
	}
}

func parsePackageHeader(data []byte) (ConnectionState, []byte, error) {
	var connectionState ConnectionState
	var receiveBuffer []byte
	switch data[0] {
	case packet.HeartbeatPacketHeader:
		receiveBuffer = make([]byte, packet.HeartbeatPacketMaxSize)
		connectionState = StateHeartbeat
	default:
		return 0, nil, ErrInvalidPacketHeader
	}

	return connectionState, receiveBuffer, nil
}

func processHeartbeatPacket(_ *byte, _ *[]byte, conn *network.ManagedConnection, data []byte) {
	heartbeatPacket, err := packet.ParseHeartbeat(data)
	if err != nil {
		Events.Error.Trigger(err)
		conn.Close()
		return
	}
	Events.Heartbeat.Trigger(heartbeatPacket)
}