Skip to content
Snippets Groups Projects
plugin.go 4.83 KiB
package client

import (
	"encoding/hex"
	"net"
	"strings"
	"sync"
	"time"

	"github.com/iotaledger/goshimmer/dapps/fpctest"
	"github.com/iotaledger/goshimmer/packages/shutdown"
	"github.com/iotaledger/goshimmer/packages/vote"
	"github.com/iotaledger/goshimmer/plugins/analysis/packet"
	"github.com/iotaledger/goshimmer/plugins/autopeering"
	"github.com/iotaledger/goshimmer/plugins/autopeering/local"
	"github.com/iotaledger/goshimmer/plugins/config"
	"github.com/iotaledger/hive.go/daemon"
	"github.com/iotaledger/hive.go/events"
	"github.com/iotaledger/hive.go/logger"
	"github.com/iotaledger/hive.go/network"
	"github.com/iotaledger/hive.go/node"
	flag "github.com/spf13/pflag"
)

const (
	// PluginName is the name of  the analysis client plugin.
	PluginName = "Analysis-Client"
	// CfgServerAddress defines the config flag of the analysis server address.
	CfgServerAddress = "analysis.client.serverAddress"
	// defines the report interval of the reporting in seconds.
	reportIntervalSec = 5
)

func init() {
	flag.String(CfgServerAddress, "ressims.iota.cafe:188", "tcp server for collecting analysis information")
}

var (
	// Plugin is the plugin instance of the analysis client plugin.
	Plugin      = node.NewPlugin(PluginName, node.Enabled, run)
	log         *logger.Logger
	managedConn *network.ManagedConnection
	connLock    sync.Mutex
)

func run(_ *node.Plugin) {
	log = logger.NewLogger(PluginName)

	conn, err := net.Dial("tcp", config.Node.GetString(CfgServerAddress))
	if err != nil {
		log.Debugf("Could not connect to reporting server: %s", err.Error())
		return
	}

	managedConn = network.NewManagedConnection(conn)

	if err := daemon.BackgroundWorker(PluginName, func(shutdownSignal <-chan struct{}) {

		fpctest.Voter().Events().RoundExecuted.Attach(events.NewClosure(onRoundExecuted))
		defer fpctest.Voter().Events().RoundExecuted.Detach(events.NewClosure(onRoundExecuted))

		ticker := time.NewTicker(reportIntervalSec * time.Second)
		defer ticker.Stop()
		for {
			select {
			case <-shutdownSignal:
				return

			case <-ticker.C:
				sendHeartbeat(managedConn, createHeartbeat())
			}
		}
	}, shutdown.PriorityAnalysis); err != nil {
		log.Panic(err)
	}
}

// EventDispatchers holds the Heartbeat function.
type EventDispatchers struct {
	// Heartbeat defines the Heartbeat function.
	Heartbeat func(heartbeat *packet.Heartbeat)
}

func sendHeartbeat(conn *network.ManagedConnection, hb *packet.Heartbeat) {
	var out strings.Builder
	for _, value := range hb.OutboundIDs {
		out.WriteString(hex.EncodeToString(value))
	}
	var in strings.Builder
	for _, value := range hb.InboundIDs {
		in.WriteString(hex.EncodeToString(value))
	}
	log.Debugw(
		"Heartbeat",
		"nodeID", hex.EncodeToString(hb.OwnID),
		"outboundIDs", out.String(),
		"inboundIDs", in.String(),
	)

	data, err := packet.NewHeartbeatMessage(hb)
	if err != nil {
		log.Info(err, " - heartbeat message skipped")
		return
	}

	connLock.Lock()
	defer connLock.Unlock()
	if _, err = conn.Write(data); err != nil {
		log.Debugw("Error while writing to connection", "Description", err)
	}
}

func createHeartbeat() *packet.Heartbeat {
	// get own ID
	var nodeID []byte
	if local.GetInstance() != nil {
		// doesn't copy the ID, take care not to modify underlying bytearray!
		nodeID = local.GetInstance().ID().Bytes()
	}

	var outboundIDs [][]byte
	var inboundIDs [][]byte

	// get outboundIDs (chosen neighbors)
	outgoingNeighbors := autopeering.Selection().GetOutgoingNeighbors()
	outboundIDs = make([][]byte, len(outgoingNeighbors))
	for i, neighbor := range outgoingNeighbors {
		// doesn't copy the ID, take care not to modify underlying bytearray!
		outboundIDs[i] = neighbor.ID().Bytes()
	}

	// get inboundIDs (accepted neighbors)
	incomingNeighbors := autopeering.Selection().GetIncomingNeighbors()
	inboundIDs = make([][]byte, len(incomingNeighbors))
	for i, neighbor := range incomingNeighbors {
		// doesn't copy the ID, take care not to modify underlying bytearray!
		inboundIDs[i] = neighbor.ID().Bytes()
	}

	return &packet.Heartbeat{OwnID: nodeID, OutboundIDs: outboundIDs, InboundIDs: inboundIDs}
}

func onRoundExecuted(roundStats *vote.RoundStats) {
	// get own ID
	var nodeID []byte
	if local.GetInstance() != nil {
		// doesn't copy the ID, take care not to modify underlying bytearray!
		nodeID = local.GetInstance().ID().Bytes()
	}

	rs := vote.RoundStats{
		Duration:           roundStats.Duration,
		RandUsed:           roundStats.RandUsed,
		ActiveVoteContexts: roundStats.ActiveVoteContexts,
	}

	hb := &packet.FPCHeartbeat{
		OwnID:      nodeID,
		RoundStats: rs,
	}

	data, err := packet.NewFPCHeartbeatMessage(hb)
	if err != nil {
		log.Info(err, " - FPC heartbeat message skipped")
		return
	}

	log.Info("Client: onRoundExecuted data size: ", len(data))

	connLock.Lock()
	defer connLock.Unlock()
	if _, err = managedConn.Write(data); err != nil {
		log.Debugw("Error while writing to connection", "Description", err)
		return
	}
}