plugin.go 5.48 KiB
package client
import (
"encoding/hex"
"net"
"time"
"github.com/iotaledger/autopeering-sim/discover"
"github.com/iotaledger/autopeering-sim/selection"
"github.com/iotaledger/goshimmer/packages/network"
"github.com/iotaledger/goshimmer/packages/timeutil"
"github.com/iotaledger/goshimmer/plugins/analysis/types/addnode"
"github.com/iotaledger/goshimmer/plugins/analysis/types/connectnodes"
"github.com/iotaledger/goshimmer/plugins/analysis/types/disconnectnodes"
"github.com/iotaledger/goshimmer/plugins/analysis/types/ping"
"github.com/iotaledger/goshimmer/plugins/analysis/types/removenode"
"github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
"github.com/iotaledger/hive.go/parameter"
)
var log = logger.NewLogger("Analysis-Client")
func Run(plugin *node.Plugin) {
daemon.BackgroundWorker("Analysis Client", func() {
shuttingDown := false
for !shuttingDown {
select {
case <-daemon.ShutdownSignal:
return
default:
if conn, err := net.Dial("tcp", parameter.NodeConfig.GetString(CFG_SERVER_ADDRESS)); err != nil {
log.Debugf("Could not connect to reporting server: %s", err.Error())
timeutil.Sleep(1 * time.Second)
} else {
managedConn := network.NewManagedConnection(conn)
eventDispatchers := getEventDispatchers(managedConn)
reportCurrentStatus(eventDispatchers)
setupHooks(plugin, managedConn, eventDispatchers)
shuttingDown = keepConnectionAlive(managedConn)
}
}
}
})
}
func getEventDispatchers(conn *network.ManagedConnection) *EventDispatchers {
return &EventDispatchers{
AddNode: func(nodeId []byte) {
_, _ = conn.Write((&addnode.Packet{NodeId: nodeId}).Marshal())
},
RemoveNode: func(nodeId []byte) {
_, _ = conn.Write((&removenode.Packet{NodeId: nodeId}).Marshal())
},
ConnectNodes: func(sourceId []byte, targetId []byte) {
_, _ = conn.Write((&connectnodes.Packet{SourceId: sourceId, TargetId: targetId}).Marshal())
},
DisconnectNodes: func(sourceId []byte, targetId []byte) {
_, _ = conn.Write((&disconnectnodes.Packet{SourceId: sourceId, TargetId: targetId}).Marshal())
},
}
}
func reportCurrentStatus(eventDispatchers *EventDispatchers) {
if local.INSTANCE != nil {
eventDispatchers.AddNode(local.INSTANCE.ID().Bytes())
}
reportChosenNeighbors(eventDispatchers)
}
func setupHooks(plugin *node.Plugin, conn *network.ManagedConnection, eventDispatchers *EventDispatchers) {
// define hooks ////////////////////////////////////////////////////////////////////////////////////////////////////
onDiscoverPeer := events.NewClosure(func(ev *discover.DiscoveredEvent) {
log.Info("onDiscoverPeer: " + hex.EncodeToString(ev.Peer.ID().Bytes()))
eventDispatchers.AddNode(ev.Peer.ID().Bytes())
})
onDeletePeer := events.NewClosure(func(ev *discover.DeletedEvent) {
log.Info("onDeletePeer: " + hex.EncodeToString(ev.Peer.ID().Bytes()))
eventDispatchers.RemoveNode(ev.Peer.ID().Bytes())
})
onAddAcceptedNeighbor := events.NewClosure(func(ev *selection.PeeringEvent) {
log.Info("onAddAcceptedNeighbor: " + hex.EncodeToString(ev.Peer.ID().Bytes()) + " - " + hex.EncodeToString(local.INSTANCE.ID().Bytes()))
eventDispatchers.ConnectNodes(ev.Peer.ID().Bytes(), local.INSTANCE.ID().Bytes())
})
onRemoveNeighbor := events.NewClosure(func(ev *selection.DroppedEvent) {
log.Info("onRemoveNeighbor: " + hex.EncodeToString(ev.DroppedID.Bytes()) + " - " + hex.EncodeToString(local.INSTANCE.ID().Bytes()))
eventDispatchers.DisconnectNodes(ev.DroppedID.Bytes(), local.INSTANCE.ID().Bytes())
})
onAddChosenNeighbor := events.NewClosure(func(ev *selection.PeeringEvent) {
log.Info("onAddChosenNeighbor: " + hex.EncodeToString(local.INSTANCE.ID().Bytes()) + " - " + hex.EncodeToString(ev.Peer.ID().Bytes()))
eventDispatchers.ConnectNodes(local.INSTANCE.ID().Bytes(), ev.Peer.ID().Bytes())
})
// setup hooks /////////////////////////////////////////////////////////////////////////////////////////////////////
discover.Events.PeerDiscovered.Attach(onDiscoverPeer)
discover.Events.PeerDeleted.Attach(onDeletePeer)
selection.Events.IncomingPeering.Attach(onAddAcceptedNeighbor)
selection.Events.OutgoingPeering.Attach(onAddChosenNeighbor)
selection.Events.Dropped.Attach(onRemoveNeighbor)
// clean up hooks on close /////////////////////////////////////////////////////////////////////////////////////////
var onClose *events.Closure
onClose = events.NewClosure(func() {
discover.Events.PeerDiscovered.Detach(onDiscoverPeer)
selection.Events.IncomingPeering.Detach(onAddAcceptedNeighbor)
selection.Events.OutgoingPeering.Detach(onAddChosenNeighbor)
selection.Events.Dropped.Detach(onRemoveNeighbor)
conn.Events.Close.Detach(onClose)
})
conn.Events.Close.Attach(onClose)
}
func reportChosenNeighbors(dispatchers *EventDispatchers) {
if autopeering.Selection != nil {
for _, chosenNeighbor := range autopeering.Selection.GetOutgoingNeighbors() {
dispatchers.AddNode(chosenNeighbor.ID().Bytes())
dispatchers.ConnectNodes(local.INSTANCE.ID().Bytes(), chosenNeighbor.ID().Bytes())
}
}
}
func keepConnectionAlive(conn *network.ManagedConnection) bool {
go conn.Read(make([]byte, 1))
ticker := time.NewTicker(1 * time.Second)
for {
select {
case <-daemon.ShutdownSignal:
return true
case <-ticker.C:
if _, err := conn.Write((&ping.Packet{}).Marshal()); err != nil {
return false
}
}
}
}