Skip to content
Snippets Groups Projects
Unverified Commit eeafca86 authored by Levente Pap's avatar Levente Pap Committed by GitHub
Browse files

Analysis Client/Server Refactor (#317)

* Update analysis/client with Heartbeat message type

* Refactor analysis/server for heartbeat messages

* Bug fix in analysis server

* Fix heartbeat packet marshaling

* Fix `nodes` and `links` maps handling

* Fix in cleanup in recordedevents

* Fix typos

* Fix event log and heartbeat handling

* Refactor analysis server

 * Remove obsolete packet types
 * Run Analysis Server Record Manager in background
 * Refactor Analysis Client with ticker

* Define max amount of neighbors to report

* Small fixes

* New visualization for analysis server

* PR review fixes

* Remove `nodeOnline` and `nodeOffline` events from analysis server

* Refactor analysis frontend

* Fix bug in websocket, remove obsolete triggers

* Remove debug vars, re-enable websocket ping

* Stop ws pings when closed

* Color settings for visualizer

* Update graph colors

* Fix no neighbor bug

* Dynamically change springLength based on nodes

* Small color fix

* Rename ShutdownPriorityAnalysis

* Formatting, linter suggestions

* Update packr file for visualizer

* Websocket Write() instead of TryWrite()
parent 036199bb
Branches
Tags
No related merge requests found
Showing
with 281 additions and 640 deletions
......@@ -5,9 +5,10 @@ import (
)
const (
CFG_SERVER_ADDRESS = "analysis.client.serverAddress"
CfgServerAddress = "analysis.client.serverAddress"
ReportInterval = 5
)
func init() {
flag.String(CFG_SERVER_ADDRESS, "ressims.iota.cafe:188", "tcp server for collecting analysis information")
flag.String(CfgServerAddress, "ressims.iota.cafe:188", "tcp server for collecting analysis information")
}
......@@ -3,24 +3,17 @@ package client
import (
"encoding/hex"
"net"
"strings"
"sync"
"time"
"github.com/iotaledger/hive.go/autopeering/discover"
"github.com/iotaledger/hive.go/autopeering/selection"
"github.com/iotaledger/goshimmer/plugins/analysis/types/heartbeat"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/network"
"github.com/iotaledger/hive.go/node"
"github.com/iotaledger/hive.go/timeutil"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/analysis/types/addnode"
"github.com/iotaledger/goshimmer/plugins/analysis/types/connectnodes"
"github.com/iotaledger/goshimmer/plugins/analysis/types/disconnectnodes"
"github.com/iotaledger/goshimmer/plugins/analysis/types/ping"
"github.com/iotaledger/goshimmer/plugins/analysis/types/removenode"
"github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/goshimmer/plugins/config"
......@@ -32,27 +25,22 @@ var connLock sync.Mutex
func Run(plugin *node.Plugin) {
log = logger.NewLogger("Analysis-Client")
daemon.BackgroundWorker("Analysis Client", func(shutdownSignal <-chan struct{}) {
shuttingDown := false
for !shuttingDown {
ticker := time.NewTicker(ReportInterval * time.Second)
defer ticker.Stop()
for {
select {
case <-shutdownSignal:
return
default:
if conn, err := net.Dial("tcp", config.Node.GetString(CFG_SERVER_ADDRESS)); err != nil {
case <-ticker.C:
conn, err := net.Dial("tcp", config.Node.GetString(CfgServerAddress))
if err != nil {
log.Debugf("Could not connect to reporting server: %s", err.Error())
timeutil.Sleep(1*time.Second, shutdownSignal)
} else {
continue
}
managedConn := network.NewManagedConnection(conn)
eventDispatchers := getEventDispatchers(managedConn)
reportCurrentStatus(eventDispatchers)
setupHooks(plugin, managedConn, eventDispatchers)
shuttingDown = keepConnectionAlive(managedConn, shutdownSignal)
}
reportHeartbeat(eventDispatchers)
}
}
}, shutdown.PriorityAnalysis)
......@@ -60,135 +48,63 @@ func Run(plugin *node.Plugin) {
func getEventDispatchers(conn *network.ManagedConnection) *EventDispatchers {
return &EventDispatchers{
AddNode: func(nodeId []byte) {
log.Debugw("AddNode", "nodeId", hex.EncodeToString(nodeId))
connLock.Lock()
_, _ = conn.Write((&addnode.Packet{NodeId: nodeId}).Marshal())
connLock.Unlock()
},
RemoveNode: func(nodeId []byte) {
log.Debugw("RemoveNode", "nodeId", hex.EncodeToString(nodeId))
connLock.Lock()
_, _ = conn.Write((&removenode.Packet{NodeId: nodeId}).Marshal())
connLock.Unlock()
},
ConnectNodes: func(sourceId []byte, targetId []byte) {
log.Debugw("ConnectNodes", "sourceId", hex.EncodeToString(sourceId), "targetId", hex.EncodeToString(targetId))
connLock.Lock()
_, _ = conn.Write((&connectnodes.Packet{SourceId: sourceId, TargetId: targetId}).Marshal())
connLock.Unlock()
},
DisconnectNodes: func(sourceId []byte, targetId []byte) {
log.Debugw("DisconnectNodes", "sourceId", hex.EncodeToString(sourceId), "targetId", hex.EncodeToString(targetId))
connLock.Lock()
_, _ = conn.Write((&disconnectnodes.Packet{SourceId: sourceId, TargetId: targetId}).Marshal())
connLock.Unlock()
},
}
}
func reportCurrentStatus(eventDispatchers *EventDispatchers) {
if local.GetInstance() != nil {
eventDispatchers.AddNode(local.GetInstance().ID().Bytes())
}
Heartbeat: func(packet *heartbeat.Packet) {
var out strings.Builder
for _, value := range packet.OutboundIDs {
out.WriteString(hex.EncodeToString(value))
}
var in strings.Builder
for _, value := range packet.InboundIDs {
in.WriteString(hex.EncodeToString(value))
}
log.Debugw(
"Heartbeat",
"nodeId", hex.EncodeToString(packet.OwnID),
"outboundIds", out.String(),
"inboundIds", in.String(),
)
reportKnownPeers(eventDispatchers)
reportChosenNeighbors(eventDispatchers)
reportAcceptedNeighbors(eventDispatchers)
// Marshal() copies the content of packet, it doesn't modify it.
data, err := packet.Marshal()
if err != nil {
log.Info(err, " - heartbeat message skipped")
return
}
func setupHooks(plugin *node.Plugin, conn *network.ManagedConnection, eventDispatchers *EventDispatchers) {
// define hooks ////////////////////////////////////////////////////////////////////////////////////////////////////
onDiscoverPeer := events.NewClosure(func(ev *discover.DiscoveredEvent) {
eventDispatchers.AddNode(ev.Peer.ID().Bytes())
})
onDeletePeer := events.NewClosure(func(ev *discover.DeletedEvent) {
eventDispatchers.RemoveNode(ev.Peer.ID().Bytes())
})
onAddChosenNeighbor := events.NewClosure(func(ev *selection.PeeringEvent) {
if ev.Status {
eventDispatchers.ConnectNodes(local.GetInstance().ID().Bytes(), ev.Peer.ID().Bytes())
connLock.Lock()
defer connLock.Unlock()
if _, err = conn.Write(data); err != nil {
log.Debugw("Error while writing to connection", "Description", err)
}
})
onAddAcceptedNeighbor := events.NewClosure(func(ev *selection.PeeringEvent) {
if ev.Status {
eventDispatchers.ConnectNodes(ev.Peer.ID().Bytes(), local.GetInstance().ID().Bytes())
},
}
})
onRemoveNeighbor := events.NewClosure(func(ev *selection.DroppedEvent) {
eventDispatchers.DisconnectNodes(ev.DroppedID.Bytes(), local.GetInstance().ID().Bytes())
})
// setup hooks /////////////////////////////////////////////////////////////////////////////////////////////////////
discover.Events.PeerDiscovered.Attach(onDiscoverPeer)
discover.Events.PeerDeleted.Attach(onDeletePeer)
selection.Events.IncomingPeering.Attach(onAddAcceptedNeighbor)
selection.Events.OutgoingPeering.Attach(onAddChosenNeighbor)
selection.Events.Dropped.Attach(onRemoveNeighbor)
// clean up hooks on close /////////////////////////////////////////////////////////////////////////////////////////
var onClose *events.Closure
onClose = events.NewClosure(func() {
discover.Events.PeerDiscovered.Detach(onDiscoverPeer)
discover.Events.PeerDeleted.Detach(onDeletePeer)
selection.Events.IncomingPeering.Detach(onAddAcceptedNeighbor)
selection.Events.OutgoingPeering.Detach(onAddChosenNeighbor)
selection.Events.Dropped.Detach(onRemoveNeighbor)
conn.Events.Close.Detach(onClose)
})
conn.Events.Close.Attach(onClose)
}
func reportKnownPeers(dispatchers *EventDispatchers) {
if autopeering.Discovery != nil {
for _, peer := range autopeering.Discovery.GetVerifiedPeers() {
dispatchers.AddNode(peer.ID().Bytes())
}
}
func reportHeartbeat(dispatchers *EventDispatchers) {
// Get own ID
var nodeID []byte
if local.GetInstance() != nil {
// Doesn't copy the ID, take care not to modify underlying bytearray!
nodeID = local.GetInstance().ID().Bytes()
}
func reportChosenNeighbors(dispatchers *EventDispatchers) {
if autopeering.Selection != nil {
for _, chosenNeighbor := range autopeering.Selection.GetOutgoingNeighbors() {
//dispatchers.AddNode(chosenNeighbor.ID().Bytes())
dispatchers.ConnectNodes(local.GetInstance().ID().Bytes(), chosenNeighbor.ID().Bytes())
}
}
// Get outboundIds (chosen neighbors)
outgoingNeighbors := autopeering.Selection.GetOutgoingNeighbors()
outboundIds := make([][]byte, len(outgoingNeighbors))
for i, neighbor := range outgoingNeighbors {
// Doesn't copy the ID, take care not to modify underlying bytearray!
outboundIds[i] = neighbor.ID().Bytes()
}
func reportAcceptedNeighbors(dispatchers *EventDispatchers) {
if autopeering.Selection != nil {
for _, acceptedNeighbor := range autopeering.Selection.GetIncomingNeighbors() {
//dispatchers.AddNode(acceptedNeighbor.ID().Bytes())
dispatchers.ConnectNodes(acceptedNeighbor.ID().Bytes(), local.GetInstance().ID().Bytes())
}
}
// Get inboundIds (accepted neighbors)
incomingNeighbors := autopeering.Selection.GetIncomingNeighbors()
inboundIds := make([][]byte, len(incomingNeighbors))
for i, neighbor := range incomingNeighbors {
// Doesn't copy the ID, take care not to modify underlying bytearray!
inboundIds[i] = neighbor.ID().Bytes()
}
func keepConnectionAlive(conn *network.ManagedConnection, shutdownSignal <-chan struct{}) bool {
go conn.Read(make([]byte, 1))
ticker := time.NewTicker(1 * time.Second)
for {
select {
case <-shutdownSignal:
return true
packet := &heartbeat.Packet{OwnID: nodeID, OutboundIDs: outboundIds, InboundIDs: inboundIds}
case <-ticker.C:
connLock.Lock()
if _, err := conn.Write((&ping.Packet{}).Marshal()); err != nil {
connLock.Unlock()
return false
}
connLock.Unlock()
}
}
dispatchers.Heartbeat(packet)
}
package client
import "github.com/iotaledger/goshimmer/plugins/analysis/types/heartbeat"
type EventDispatchers struct {
AddNode func(nodeId []byte)
RemoveNode func(nodeId []byte)
ConnectNodes func(sourceId []byte, targetId []byte)
DisconnectNodes func(sourceId []byte, targetId []byte)
Heartbeat func(*heartbeat.Packet)
}
......@@ -15,21 +15,21 @@ var log *logger.Logger
func configure(plugin *node.Plugin) {
log = logger.NewLogger("Analysis")
if config.Node.GetInt(server.CFG_SERVER_PORT) != 0 {
if config.Node.GetInt(server.CfgServerPort) != 0 {
webinterface.Configure(plugin)
server.Configure(plugin)
}
}
func run(plugin *node.Plugin) {
if config.Node.GetInt(server.CFG_SERVER_PORT) != 0 {
if config.Node.GetInt(server.CfgServerPort) != 0 {
webinterface.Run(plugin)
server.Run(plugin)
} else {
log.Info("Server is disabled (server-port is 0)")
}
if config.Node.GetString(client.CFG_SERVER_ADDRESS) != "" {
if config.Node.GetString(client.CfgServerAddress) != "" {
client.Run(plugin)
} else {
log.Info("Client is disabled (server-address is empty)")
......
......@@ -3,22 +3,11 @@ package server
import (
"time"
"github.com/iotaledger/goshimmer/plugins/analysis/types/addnode"
"github.com/iotaledger/goshimmer/plugins/analysis/types/connectnodes"
"github.com/iotaledger/goshimmer/plugins/analysis/types/disconnectnodes"
"github.com/iotaledger/goshimmer/plugins/analysis/types/ping"
"github.com/iotaledger/goshimmer/plugins/analysis/types/removenode"
"github.com/iotaledger/goshimmer/plugins/analysis/types/heartbeat"
)
const (
IDLE_TIMEOUT = 5 * time.Second
IdleTimeout = 10 * time.Second
STATE_INITIAL = byte(255)
STATE_INITIAL_ADDNODE = byte(254)
STATE_CONSECUTIVE = byte(253)
STATE_PING = ping.MARSHALED_PACKET_HEADER
STATE_ADD_NODE = addnode.MARSHALED_PACKET_HEADER
STATE_REMOVE_NODE = removenode.MARSHALED_PACKET_HEADER
STATE_CONNECT_NODES = connectnodes.MARSHALED_PACKET_HEADER
STATE_DISCONNECT_NODES = disconnectnodes.MARSHALED_PACKET_HEADER
StateHeartbeat = heartbeat.MarshaledPacketHeader
)
package server
import (
"github.com/iotaledger/goshimmer/plugins/analysis/types/heartbeat"
"github.com/iotaledger/hive.go/events"
)
......@@ -9,23 +10,29 @@ var Events = struct {
RemoveNode *events.Event
ConnectNodes *events.Event
DisconnectNodes *events.Event
NodeOnline *events.Event
NodeOffline *events.Event
Error *events.Event
Heartbeat *events.Event
}{
events.NewEvent(stringCaller),
events.NewEvent(stringCaller),
events.NewEvent(stringStringCaller),
events.NewEvent(stringStringCaller),
events.NewEvent(stringCaller),
events.NewEvent(stringCaller),
events.NewEvent(errorCaller),
events.NewEvent(heartbeatPacketCaller),
}
func stringCaller(handler interface{}, params ...interface{}) {
handler.(func(string))(params[0].(string))
}
func stringStringCaller(handler interface{}, params ...interface{}) {
handler.(func(string, string))(params[0].(string), params[1].(string))
}
func errorCaller(handler interface{}, params ...interface{}) { handler.(func(error))(params[0].(error)) }
func errorCaller(handler interface{}, params ...interface{}) {
handler.(func(error))(params[0].(error))
}
func heartbeatPacketCaller(handler interface{}, params ...interface{}) {
handler.(func(heartbeat.Packet))(params[0].(heartbeat.Packet))
}
......@@ -5,9 +5,9 @@ import (
)
const (
CFG_SERVER_PORT = "analysis.server.port"
CfgServerPort = "analysis.server.port"
)
func init() {
flag.Int(CFG_SERVER_PORT, 0, "tcp port for incoming analysis packets")
flag.Int(CfgServerPort, 0, "tcp port for incoming analysis packets")
}
package server
import (
"encoding/hex"
"errors"
"math"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
......@@ -13,11 +11,7 @@ import (
"github.com/iotaledger/hive.go/node"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/analysis/types/addnode"
"github.com/iotaledger/goshimmer/plugins/analysis/types/connectnodes"
"github.com/iotaledger/goshimmer/plugins/analysis/types/disconnectnodes"
"github.com/iotaledger/goshimmer/plugins/analysis/types/ping"
"github.com/iotaledger/goshimmer/plugins/analysis/types/removenode"
"github.com/iotaledger/goshimmer/plugins/analysis/types/heartbeat"
"github.com/iotaledger/goshimmer/plugins/config"
)
......@@ -37,7 +31,7 @@ func Configure(plugin *node.Plugin) {
log.Errorf("error in server: %s", err.Error())
}))
server.Events.Start.Attach(events.NewClosure(func() {
log.Infof("Starting Server (port %d) ... done", config.Node.GetInt(CFG_SERVER_PORT))
log.Infof("Starting Server (port %d) ... done", config.Node.GetInt(CfgServerPort))
}))
server.Events.Shutdown.Attach(events.NewClosure(func() {
log.Info("Stopping Server ... done")
......@@ -46,8 +40,8 @@ func Configure(plugin *node.Plugin) {
func Run(plugin *node.Plugin) {
daemon.BackgroundWorker("Analysis Server", func(shutdownSignal <-chan struct{}) {
log.Infof("Starting Server (port %d) ... done", config.Node.GetInt(CFG_SERVER_PORT))
go server.Listen("0.0.0.0", config.Node.GetInt(CFG_SERVER_PORT))
log.Infof("Starting Server (port %d) ... done", config.Node.GetInt(CfgServerPort))
go server.Listen("0.0.0.0", config.Node.GetInt(CfgServerPort))
<-shutdownSignal
Shutdown()
}, shutdown.PriorityAnalysis)
......@@ -60,21 +54,20 @@ func Shutdown() {
}
func HandleConnection(conn *network.ManagedConnection) {
conn.SetTimeout(IDLE_TIMEOUT)
err := conn.SetTimeout(IdleTimeout)
if err!=nil {
log.Errorf(err.Error())
}
var connectionState = STATE_INITIAL
var connectionState byte
var receiveBuffer []byte
var offset int
var connectedNodeId string
var onDisconnect *events.Closure
onReceiveData := events.NewClosure(func(data []byte) {
processIncomingPacket(&connectionState, &receiveBuffer, conn, data, &offset, &connectedNodeId)
processIncomingPacket(&connectionState, &receiveBuffer, conn, data)
})
onDisconnect = events.NewClosure(func() {
Events.NodeOffline.Trigger(connectedNodeId)
conn.Events.ReceiveData.Detach(onReceiveData)
conn.Events.Close.Detach(onDisconnect)
})
......@@ -83,11 +76,7 @@ func HandleConnection(conn *network.ManagedConnection) {
conn.Events.Close.Attach(onDisconnect)
maxPacketsSize := getMaxPacketSize(
ping.MARSHALED_TOTAL_SIZE,
addnode.MARSHALED_TOTAL_SIZE,
removenode.MARSHALED_TOTAL_SIZE,
connectnodes.MARSHALED_TOTAL_SIZE,
disconnectnodes.MARSHALED_PACKET_HEADER,
heartbeat.MaxMarshaledTotalSize,
)
go conn.Read(make([]byte, maxPacketsSize))
......@@ -105,10 +94,7 @@ func getMaxPacketSize(packetSizes ...int) int {
return maxPacketSize
}
func processIncomingPacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte, offset *int, connectedNodeId *string) {
firstPackage := *connectionState == STATE_INITIAL
if firstPackage || *connectionState == STATE_CONSECUTIVE {
func processIncomingPacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte) {
var err error
if *connectionState, *receiveBuffer, err = parsePackageHeader(data); err != nil {
Events.Error.Trigger(err)
......@@ -118,53 +104,11 @@ func processIncomingPacket(connectionState *byte, receiveBuffer *[]byte, conn *n
return
}
*offset = 0
switch *connectionState {
case STATE_ADD_NODE:
*receiveBuffer = make([]byte, addnode.MARSHALED_TOTAL_SIZE)
case STATE_PING:
*receiveBuffer = make([]byte, ping.MARSHALED_TOTAL_SIZE)
case STATE_CONNECT_NODES:
*receiveBuffer = make([]byte, connectnodes.MARSHALED_TOTAL_SIZE)
case STATE_DISCONNECT_NODES:
*receiveBuffer = make([]byte, disconnectnodes.MARSHALED_TOTAL_SIZE)
case STATE_REMOVE_NODE:
*receiveBuffer = make([]byte, removenode.MARSHALED_TOTAL_SIZE)
}
case StateHeartbeat:
processHeartbeatPacket(connectionState, receiveBuffer, conn, data)
}
if firstPackage {
if *connectionState != STATE_ADD_NODE {
Events.Error.Trigger(ErrExpectedInitialAddNodePackage)
} else {
*connectionState = STATE_INITIAL_ADDNODE
}
}
switch *connectionState {
case STATE_INITIAL_ADDNODE:
processIncomingAddNodePacket(connectionState, receiveBuffer, conn, data, offset, connectedNodeId)
case STATE_ADD_NODE:
processIncomingAddNodePacket(connectionState, receiveBuffer, conn, data, offset, connectedNodeId)
case STATE_PING:
processIncomingPingPacket(connectionState, receiveBuffer, conn, data, offset, connectedNodeId)
case STATE_CONNECT_NODES:
processIncomingConnectNodesPacket(connectionState, receiveBuffer, conn, data, offset, connectedNodeId)
case STATE_DISCONNECT_NODES:
processIncomingDisconnectNodesPacket(connectionState, receiveBuffer, conn, data, offset, connectedNodeId)
case STATE_REMOVE_NODE:
processIncomingRemoveNodePacket(connectionState, receiveBuffer, conn, data, offset, connectedNodeId)
}
}
func parsePackageHeader(data []byte) (ConnectionState, []byte, error) {
......@@ -172,30 +116,10 @@ func parsePackageHeader(data []byte) (ConnectionState, []byte, error) {
var receiveBuffer []byte
switch data[0] {
case ping.MARSHALED_PACKET_HEADER:
receiveBuffer = make([]byte, ping.MARSHALED_TOTAL_SIZE)
connectionState = STATE_PING
case addnode.MARSHALED_PACKET_HEADER:
receiveBuffer = make([]byte, addnode.MARSHALED_TOTAL_SIZE)
connectionState = STATE_ADD_NODE
case connectnodes.MARSHALED_PACKET_HEADER:
receiveBuffer = make([]byte, connectnodes.MARSHALED_TOTAL_SIZE)
connectionState = STATE_CONNECT_NODES
case disconnectnodes.MARSHALED_PACKET_HEADER:
receiveBuffer = make([]byte, disconnectnodes.MARSHALED_TOTAL_SIZE)
case heartbeat.MarshaledPacketHeader:
receiveBuffer = make([]byte, heartbeat.MaxMarshaledTotalSize)
connectionState = STATE_DISCONNECT_NODES
case removenode.MARSHALED_PACKET_HEADER:
receiveBuffer = make([]byte, removenode.MARSHALED_TOTAL_SIZE)
connectionState = STATE_REMOVE_NODE
connectionState = StateHeartbeat
default:
return 0, nil, ErrInvalidPackageHeader
......@@ -204,147 +128,13 @@ func parsePackageHeader(data []byte) (ConnectionState, []byte, error) {
return connectionState, receiveBuffer, nil
}
func processIncomingAddNodePacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte, offset *int, connectedNodeId *string) {
remainingCapacity := int(math.Min(float64(addnode.MARSHALED_TOTAL_SIZE-*offset), float64(len(data))))
copy((*receiveBuffer)[*offset:], data[:remainingCapacity])
if *offset+len(data) < addnode.MARSHALED_TOTAL_SIZE {
*offset += len(data)
} else {
if addNodePacket, err := addnode.Unmarshal(*receiveBuffer); err != nil {
Events.Error.Trigger(err)
conn.Close()
return
} else {
nodeId := hex.EncodeToString(addNodePacket.NodeId)
Events.AddNode.Trigger(nodeId)
if *connectionState == STATE_INITIAL_ADDNODE {
*connectedNodeId = nodeId
Events.NodeOnline.Trigger(nodeId)
}
}
*connectionState = STATE_CONSECUTIVE
if *offset+len(data) > addnode.MARSHALED_TOTAL_SIZE {
processIncomingPacket(connectionState, receiveBuffer, conn, data[remainingCapacity:], offset, connectedNodeId)
}
}
}
func processIncomingPingPacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte, offset *int, connectedNodeId *string) {
remainingCapacity := int(math.Min(float64(ping.MARSHALED_TOTAL_SIZE-*offset), float64(len(data))))
copy((*receiveBuffer)[*offset:], data[:remainingCapacity])
if *offset+len(data) < ping.MARSHALED_TOTAL_SIZE {
*offset += len(data)
} else {
if _, err := ping.Unmarshal(*receiveBuffer); err != nil {
Events.Error.Trigger(err)
conn.Close()
return
}
*connectionState = STATE_CONSECUTIVE
func processHeartbeatPacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte) {
if *offset+len(data) > ping.MARSHALED_TOTAL_SIZE {
processIncomingPacket(connectionState, receiveBuffer, conn, data[remainingCapacity:], offset, connectedNodeId)
}
}
}
func processIncomingConnectNodesPacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte, offset *int, connectedNodeId *string) {
remainingCapacity := int(math.Min(float64(connectnodes.MARSHALED_TOTAL_SIZE-*offset), float64(len(data))))
copy((*receiveBuffer)[*offset:], data[:remainingCapacity])
if *offset+len(data) < connectnodes.MARSHALED_TOTAL_SIZE {
*offset += len(data)
} else {
if connectNodesPacket, err := connectnodes.Unmarshal(*receiveBuffer); err != nil {
heartbeatPacket, err := heartbeat.Unmarshal(data)
if err != nil {
Events.Error.Trigger(err)
conn.Close()
return
} else {
sourceNodeId := hex.EncodeToString(connectNodesPacket.SourceId)
targetNodeId := hex.EncodeToString(connectNodesPacket.TargetId)
Events.ConnectNodes.Trigger(sourceNodeId, targetNodeId)
}
*connectionState = STATE_CONSECUTIVE
if *offset+len(data) > connectnodes.MARSHALED_TOTAL_SIZE {
processIncomingPacket(connectionState, receiveBuffer, conn, data[remainingCapacity:], offset, connectedNodeId)
}
}
}
func processIncomingDisconnectNodesPacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte, offset *int, connectedNodeId *string) {
remainingCapacity := int(math.Min(float64(disconnectnodes.MARSHALED_TOTAL_SIZE-*offset), float64(len(data))))
copy((*receiveBuffer)[*offset:], data[:remainingCapacity])
if *offset+len(data) < disconnectnodes.MARSHALED_TOTAL_SIZE {
*offset += len(data)
} else {
if disconnectNodesPacket, err := disconnectnodes.Unmarshal(*receiveBuffer); err != nil {
Events.Error.Trigger(err)
conn.Close()
return
} else {
sourceNodeId := hex.EncodeToString(disconnectNodesPacket.SourceId)
targetNodeId := hex.EncodeToString(disconnectNodesPacket.TargetId)
Events.DisconnectNodes.Trigger(sourceNodeId, targetNodeId)
}
*connectionState = STATE_CONSECUTIVE
if *offset+len(data) > disconnectnodes.MARSHALED_TOTAL_SIZE {
processIncomingPacket(connectionState, receiveBuffer, conn, data[remainingCapacity:], offset, connectedNodeId)
}
}
}
func processIncomingRemoveNodePacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte, offset *int, connectedNodeId *string) {
remainingCapacity := int(math.Min(float64(removenode.MARSHALED_TOTAL_SIZE-*offset), float64(len(data))))
copy((*receiveBuffer)[*offset:], data[:remainingCapacity])
if *offset+len(data) < removenode.MARSHALED_TOTAL_SIZE {
*offset += len(data)
} else {
if removeNodePacket, err := removenode.Unmarshal(*receiveBuffer); err != nil {
Events.Error.Trigger(err)
conn.Close()
return
} else {
nodeId := hex.EncodeToString(removeNodePacket.NodeId)
Events.RemoveNode.Trigger(nodeId)
}
*connectionState = STATE_CONSECUTIVE
if *offset+len(data) > addnode.MARSHALED_TOTAL_SIZE {
processIncomingPacket(connectionState, receiveBuffer, conn, data[remainingCapacity:], offset, connectedNodeId)
}
}
Events.Heartbeat.Trigger(*heartbeatPacket)
}
package addnode
import "crypto/sha256"
const (
MARSHALED_PACKET_HEADER = 0x01
MARSHALED_PACKET_HEADER_START = 0
MARSHALED_PACKET_HEADER_SIZE = 1
MARSHALED_PACKET_HEADER_END = MARSHALED_PACKET_HEADER_START + MARSHALED_PACKET_HEADER_SIZE
MARSHALED_ID_START = MARSHALED_PACKET_HEADER_END
MARSHALED_ID_SIZE = sha256.Size
MARSHALED_ID_END = MARSHALED_ID_START + MARSHALED_ID_SIZE
MARSHALED_TOTAL_SIZE = MARSHALED_ID_END
)
package addnode
import "errors"
var (
ErrMalformedAddNodePacket = errors.New("malformed add node packet")
)
type Packet struct {
NodeId []byte
}
func Unmarshal(data []byte) (*Packet, error) {
if len(data) < MARSHALED_TOTAL_SIZE || data[0] != MARSHALED_PACKET_HEADER {
return nil, ErrMalformedAddNodePacket
}
unmarshaledPackage := &Packet{
NodeId: make([]byte, MARSHALED_ID_SIZE),
}
copy(unmarshaledPackage.NodeId, data[MARSHALED_ID_START:MARSHALED_ID_END])
return unmarshaledPackage, nil
}
func (packet *Packet) Marshal() []byte {
marshaledPackage := make([]byte, MARSHALED_TOTAL_SIZE)
marshaledPackage[MARSHALED_PACKET_HEADER_START] = MARSHALED_PACKET_HEADER
copy(marshaledPackage[MARSHALED_ID_START:MARSHALED_ID_END], packet.NodeId[:MARSHALED_ID_SIZE])
return marshaledPackage
}
package connectnodes
import "crypto/sha256"
const (
MARSHALED_PACKET_HEADER = 0x03
MARSHALED_PACKET_HEADER_START = 0
MARSHALED_PACKET_HEADER_SIZE = 1
MARSHALED_PACKET_HEADER_END = MARSHALED_PACKET_HEADER_START + MARSHALED_PACKET_HEADER_SIZE
MARSHALED_SOURCE_ID_START = MARSHALED_PACKET_HEADER_END
MARSHALED_SOURCE_ID_SIZE = sha256.Size
MARSHALED_SOURCE_ID_END = MARSHALED_SOURCE_ID_START + MARSHALED_SOURCE_ID_SIZE
MARSHALED_TARGET_ID_START = MARSHALED_SOURCE_ID_END
MARSHALED_TARGET_ID_SIZE = sha256.Size
MARSHALED_TARGET_ID_END = MARSHALED_TARGET_ID_START + MARSHALED_TARGET_ID_SIZE
MARSHALED_TOTAL_SIZE = MARSHALED_TARGET_ID_END
)
package connectnodes
import "errors"
var (
ErrMalformedConnectNodesPacket = errors.New("malformed connect nodes packet")
)
type Packet struct {
SourceId []byte
TargetId []byte
}
func Unmarshal(data []byte) (*Packet, error) {
if len(data) < MARSHALED_TOTAL_SIZE || data[0] != MARSHALED_PACKET_HEADER {
return nil, ErrMalformedConnectNodesPacket
}
unmarshaledPackage := &Packet{
SourceId: make([]byte, MARSHALED_SOURCE_ID_SIZE),
TargetId: make([]byte, MARSHALED_TARGET_ID_SIZE),
}
copy(unmarshaledPackage.SourceId, data[MARSHALED_SOURCE_ID_START:MARSHALED_SOURCE_ID_END])
copy(unmarshaledPackage.TargetId, data[MARSHALED_TARGET_ID_START:MARSHALED_TARGET_ID_END])
return unmarshaledPackage, nil
}
func (packet *Packet) Marshal() []byte {
marshaledPackage := make([]byte, MARSHALED_TOTAL_SIZE)
marshaledPackage[MARSHALED_PACKET_HEADER_START] = MARSHALED_PACKET_HEADER
copy(marshaledPackage[MARSHALED_SOURCE_ID_START:MARSHALED_SOURCE_ID_END], packet.SourceId[:MARSHALED_SOURCE_ID_SIZE])
copy(marshaledPackage[MARSHALED_TARGET_ID_START:MARSHALED_TARGET_ID_END], packet.TargetId[:MARSHALED_TARGET_ID_SIZE])
return marshaledPackage
}
package disconnectnodes
import "crypto/sha256"
const (
MARSHALED_PACKET_HEADER = 0x04
MARSHALED_PACKET_HEADER_START = 0
MARSHALED_PACKET_HEADER_SIZE = 1
MARSHALED_PACKET_HEADER_END = MARSHALED_PACKET_HEADER_START + MARSHALED_PACKET_HEADER_SIZE
MARSHALED_SOURCE_ID_START = MARSHALED_PACKET_HEADER_END
MARSHALED_SOURCE_ID_SIZE = sha256.Size
MARSHALED_SOURCE_ID_END = MARSHALED_SOURCE_ID_START + MARSHALED_SOURCE_ID_SIZE
MARSHALED_TARGET_ID_START = MARSHALED_SOURCE_ID_END
MARSHALED_TARGET_ID_SIZE = sha256.Size
MARSHALED_TARGET_ID_END = MARSHALED_TARGET_ID_START + MARSHALED_TARGET_ID_SIZE
MARSHALED_TOTAL_SIZE = MARSHALED_TARGET_ID_END
)
package disconnectnodes
import "errors"
var (
ErrMalformedDisconnectNodesPacket = errors.New("malformed disconnect nodes packet")
)
type Packet struct {
SourceId []byte
TargetId []byte
}
func Unmarshal(data []byte) (*Packet, error) {
if len(data) < MARSHALED_TOTAL_SIZE || data[0] != MARSHALED_PACKET_HEADER {
return nil, ErrMalformedDisconnectNodesPacket
}
unmarshaledPackage := &Packet{
SourceId: make([]byte, MARSHALED_SOURCE_ID_SIZE),
TargetId: make([]byte, MARSHALED_TARGET_ID_SIZE),
}
copy(unmarshaledPackage.SourceId, data[MARSHALED_SOURCE_ID_START:MARSHALED_SOURCE_ID_END])
copy(unmarshaledPackage.TargetId, data[MARSHALED_TARGET_ID_START:MARSHALED_TARGET_ID_END])
return unmarshaledPackage, nil
}
func (packet *Packet) Marshal() []byte {
marshaledPackage := make([]byte, MARSHALED_TOTAL_SIZE)
marshaledPackage[MARSHALED_PACKET_HEADER_START] = MARSHALED_PACKET_HEADER
copy(marshaledPackage[MARSHALED_SOURCE_ID_START:MARSHALED_SOURCE_ID_END], packet.SourceId[:MARSHALED_SOURCE_ID_SIZE])
copy(marshaledPackage[MARSHALED_TARGET_ID_START:MARSHALED_TARGET_ID_END], packet.TargetId[:MARSHALED_TARGET_ID_SIZE])
return marshaledPackage
}
package heartbeat
import "crypto/sha256"
const (
// MarshaledPacketHeader unique identifier of packet
MarshaledPacketHeader = 0x01
// MaxOutboundNeighborCount is the maximum number of allowed neighbors in one direction
MaxOutboundNeighborCount = 4
// MaxInboundNeighborCount is the maximum number of allowed neighbors in one direction
MaxInboundNeighborCount = 4
// MaxMarshaledTotalSize Maximum packet length in bytes
MaxMarshaledTotalSize = MarshaledPacketHeaderSize + MarshaledOwnIDSize +
MarshaledOutboundIDsLengthSize + MaxOutboundNeighborCount*MarshaledOutboundIDSize +
MarshaledInboundIDsLengthSize + MaxInboundNeighborCount*MarshaledInboundIDSize
MarshaledPacketHeaderStart = 0
MarshaledPacketHeaderSize = 1
MarshaledPacketHeaderEnd = MarshaledPacketHeaderStart + MarshaledPacketHeaderSize
MarshaledOwnIDStart = MarshaledPacketHeaderEnd
MarshaledOwnIDSize = sha256.Size
MarshaledOwnIDEnd = MarshaledOwnIDStart + MarshaledOwnIDSize
MarshaledOutboundIDsLengthStart = MarshaledOwnIDEnd
MarshaledOutboundIDsLengthSize = 1
MarshaledOutboundIDSize = sha256.Size
MarshaledOutboundIDsLengthEnd = MarshaledOutboundIDsLengthStart + MarshaledOutboundIDsLengthSize
MarshaledInboundIDsLengthSize = 1
MarshaledInboundIDSize = sha256.Size
)
package heartbeat
import "errors"
var (
ErrMalformedHeartbeatPacket = errors.New("malformed heartbeat packet")
ErrTooManyNeighborsToReport = errors.New("too many neighbors to report in packet")
)
// Packet is a heartbeat packet
type Packet struct {
OwnID []byte
OutboundIDs [][]byte
InboundIDs [][]byte
}
func Unmarshal(data []byte) (*Packet, error) {
// So far we are only sure about the static part
MarshaledTotalSize := MarshaledPacketHeaderSize + MarshaledOwnIDSize
// Check if len is smaller than the static parts we know at the moment
if len(data) < MarshaledTotalSize || data[0] != MarshaledPacketHeader {
return nil, ErrMalformedHeartbeatPacket
}
// First the static part
unmarshalledOwnID := make([]byte, MarshaledOwnIDSize)
copy(unmarshalledOwnID[:MarshaledOwnIDSize], data[MarshaledOwnIDStart:MarshaledOwnIDEnd])
// Now the dynamic parts, first outbound neighbors
lengthOutboundIDs := int(data[MarshaledOutboundIDsLengthStart])
MarshaledTotalSize += MarshaledOutboundIDsLengthSize + lengthOutboundIDs*MarshaledOutboundIDSize
// Check if len is smaller than the size we know at the moment
if len(data) < MarshaledTotalSize {
return nil, ErrMalformedHeartbeatPacket
}
unmarshalledOutboundIDs := make([][]byte, lengthOutboundIDs)
for i := range unmarshalledOutboundIDs {
// Allocate space for each ID
unmarshalledOutboundIDs[i] = make([]byte, MarshaledOutboundIDSize)
copy(unmarshalledOutboundIDs[i][:MarshaledOutboundIDSize], data[MarshaledOutboundIDsLengthEnd+i*MarshaledOutboundIDSize:MarshaledOutboundIDsLengthEnd+(i+1)*MarshaledOutboundIDSize])
}
MarshaledInboundIdsLengthStart := MarshaledOutboundIDsLengthEnd + lengthOutboundIDs*MarshaledOutboundIDSize
MarshaledInboundIdsLengthEnd := MarshaledInboundIdsLengthStart + MarshaledInboundIDsLengthSize
// Second dynamic part, inbound neighbors
lengthInboundIDs := int(data[MarshaledInboundIdsLengthStart])
MarshaledTotalSize += MarshaledInboundIDsLengthSize + lengthInboundIDs*MarshaledInboundIDSize
// Check if len is smaller than the size we know at the moment
if len(data) < MarshaledTotalSize {
return nil, ErrMalformedHeartbeatPacket
}
unmarshalledInboundIDs := make([][]byte, lengthInboundIDs)
for i := range unmarshalledInboundIDs {
// Allocate space for each ID
unmarshalledInboundIDs[i] = make([]byte, MarshaledInboundIDSize)
copy(unmarshalledInboundIDs[i][:MarshaledInboundIDSize], data[MarshaledInboundIdsLengthEnd+i*MarshaledInboundIDSize:MarshaledInboundIdsLengthEnd+(i+1)*MarshaledInboundIDSize])
}
unmarshalledPackage := &Packet{
OwnID: unmarshalledOwnID,
OutboundIDs: unmarshalledOutboundIDs,
InboundIDs: unmarshalledInboundIDs,
}
return unmarshalledPackage, nil
}
func (packet *Packet) Marshal() ([]byte, error) {
// Calculate total needed bytes based on packet
MarshaledTotalSize := MarshaledPacketHeaderSize + MarshaledOwnIDSize +
// Dynamic part 1, outbound IDs
MarshaledOutboundIDsLengthSize + len(packet.OutboundIDs)*MarshaledOutboundIDSize +
// Dynamic part 2, Inbound IDs
MarshaledInboundIDsLengthSize + len(packet.InboundIDs)*MarshaledInboundIDSize
marshaledPackage := make([]byte, MarshaledTotalSize)
// Header byte
marshaledPackage[MarshaledPacketHeaderStart] = MarshaledPacketHeader
// Own nodeId
copy(marshaledPackage[MarshaledOwnIDStart:MarshaledOwnIDEnd], packet.OwnID[:MarshaledOwnIDSize])
// Outbound nodeIds, need to tell first how many we have to be able to unmarshal it later
lengthOutboundIDs := len(packet.OutboundIDs)
if lengthOutboundIDs > MaxOutboundNeighborCount {
return nil, ErrTooManyNeighborsToReport
} else {
marshaledPackage[MarshaledOutboundIDsLengthStart] = byte(lengthOutboundIDs)
}
// Copy contents of packet.OutboundIDs
for i, outboundID := range packet.OutboundIDs {
copy(marshaledPackage[MarshaledOutboundIDsLengthEnd+i*MarshaledOutboundIDSize:MarshaledOutboundIDsLengthEnd+(i+1)*MarshaledOutboundIDSize], outboundID[:MarshaledOutboundIDSize])
}
// Calculate where inbound nodeId-s start
MarshaledInboundIdsLengthStart := MarshaledOutboundIDsLengthEnd + lengthOutboundIDs*MarshaledOutboundIDSize
// Tell how many inbound nodeId-s we have
lengthInboundIDs := len(packet.InboundIDs)
if lengthInboundIDs > MaxInboundNeighborCount {
return nil, ErrTooManyNeighborsToReport
} else {
marshaledPackage[MarshaledInboundIdsLengthStart] = byte(lengthInboundIDs)
}
// End of length is the start of inbound nodeId-s
MarshaledInboundIdsLengthEnd := MarshaledInboundIdsLengthStart + MarshaledInboundIDsLengthSize
// Copy contents of packet.InboundIDs
for i, inboundID := range packet.InboundIDs {
copy(marshaledPackage[MarshaledInboundIdsLengthEnd+i*MarshaledInboundIDSize:MarshaledInboundIdsLengthEnd+(i+1)*MarshaledInboundIDSize], inboundID[:MarshaledInboundIDSize])
}
return marshaledPackage, nil
}
package ping
const (
MARSHALED_PACKET_HEADER = 0x00
MarshaledPacketHeader = 0x00
MARSHALED_PACKET_HEADER_START = 0
MARSHALED_PACKET_HEADER_SIZE = 1
MARSHALED_PACKET_HEADER_END = MARSHALED_PACKET_HEADER_START + MARSHALED_PACKET_HEADER_SIZE
MarshaledPacketHeaderStart = 0
MarshaledPacketHeaderSize = 1
MarshaledPacketHeaderEnd = MarshaledPacketHeaderStart + MarshaledPacketHeaderSize
MARSHALED_TOTAL_SIZE = MARSHALED_PACKET_HEADER_END
MarshaledTotalSize = MarshaledPacketHeaderEnd
)
......@@ -9,7 +9,7 @@ var (
type Packet struct{}
func Unmarshal(data []byte) (*Packet, error) {
if len(data) < MARSHALED_TOTAL_SIZE || data[MARSHALED_PACKET_HEADER_START] != MARSHALED_PACKET_HEADER {
if len(data) < MarshaledTotalSize || data[MarshaledPacketHeaderStart] != MarshaledPacketHeader {
return nil, ErrMalformedPingPacket
}
......@@ -19,9 +19,9 @@ func Unmarshal(data []byte) (*Packet, error) {
}
func (packet *Packet) Marshal() []byte {
marshaledPackage := make([]byte, MARSHALED_TOTAL_SIZE)
marshaledPackage := make([]byte, MarshaledTotalSize)
marshaledPackage[MARSHALED_PACKET_HEADER_START] = MARSHALED_PACKET_HEADER
marshaledPackage[MarshaledPacketHeaderStart] = MarshaledPacketHeader
return marshaledPackage
}
package removenode
import "crypto/sha256"
const (
MARSHALED_PACKET_HEADER = 0x02
MARSHALED_PACKET_HEADER_START = 0
MARSHALED_PACKET_HEADER_SIZE = 1
MARSHALED_PACKET_HEADER_END = MARSHALED_PACKET_HEADER_START + MARSHALED_PACKET_HEADER_SIZE
MARSHALED_ID_START = MARSHALED_PACKET_HEADER_END
MARSHALED_ID_SIZE = sha256.Size
MARSHALED_ID_END = MARSHALED_ID_START + MARSHALED_ID_SIZE
MARSHALED_TOTAL_SIZE = MARSHALED_ID_END
)
package removenode
import "errors"
var (
ErrMalformedRemovePacket = errors.New("malformed remove node packet")
)
type Packet struct {
NodeId []byte
}
func Unmarshal(data []byte) (*Packet, error) {
if len(data) < MARSHALED_TOTAL_SIZE || data[0] != MARSHALED_PACKET_HEADER {
return nil, ErrMalformedRemovePacket
}
unmarshaledPackage := &Packet{
NodeId: make([]byte, MARSHALED_ID_SIZE),
}
copy(unmarshaledPackage.NodeId, data[MARSHALED_ID_START:MARSHALED_ID_END])
return unmarshaledPackage, nil
}
func (packet *Packet) Marshal() []byte {
marshaledPackage := make([]byte, MARSHALED_TOTAL_SIZE)
marshaledPackage[MARSHALED_PACKET_HEADER_START] = MARSHALED_PACKET_HEADER
copy(marshaledPackage[MARSHALED_ID_START:MARSHALED_ID_END], packet.NodeId[:MARSHALED_ID_SIZE])
return marshaledPackage
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment