From 3ee42649de5a5afc9b72c7bc5fb612b91bc868b1 Mon Sep 17 00:00:00 2001 From: Hans Moog <hm@mkjc.net> Date: Tue, 16 Apr 2019 12:51:28 +0200 Subject: [PATCH] Feat: added an analysis server that shows the network --- main.go | 2 + packages/network/events.go | 2 +- plugins/analysis/client/parameters.go | 7 + plugins/analysis/client/plugin.go | 114 ++++++++ plugins/analysis/client/types.go | 6 + plugins/analysis/plugin.go | 41 +++ plugins/analysis/server/constants.go | 23 ++ plugins/analysis/server/events.go | 85 ++++++ plugins/analysis/server/parameters.go | 7 + plugins/analysis/server/plugin.go | 270 ++++++++++++++++++ plugins/analysis/server/types.go | 3 + plugins/analysis/types/addnode/constants.go | 15 + plugins/analysis/types/addnode/packet.go | 30 ++ .../analysis/types/connectnodes/constants.go | 19 ++ plugins/analysis/types/connectnodes/packet.go | 34 +++ .../types/disconnectnodes/constants.go | 19 ++ .../analysis/types/disconnectnodes/packet.go | 34 +++ plugins/analysis/types/ping/constants.go | 11 + plugins/analysis/types/ping/packet.go | 23 ++ .../analysis/types/removenode/constants.go | 15 + plugins/analysis/types/removenode/packet.go | 31 ++ .../webinterface/httpserver/data_stream.go | 46 +++ .../analysis/webinterface/httpserver/index.go | 131 +++++++++ .../webinterface/httpserver/plugin.go | 36 +++ plugins/analysis/webinterface/plugin.go | 16 ++ .../recordedevents/recorded_events.go | 46 +++ plugins/analysis/webinterface/types/types.go | 12 + .../chosenneighborcandidates/instance.go | 17 +- .../protocol/incoming_request_processor.go | 6 +- plugins/autopeering/server/tcp/server.go | 6 +- .../autopeering/types/peerlist/peer_list.go | 2 +- 31 files changed, 1097 insertions(+), 12 deletions(-) create mode 100644 plugins/analysis/client/parameters.go create mode 100644 plugins/analysis/client/plugin.go create mode 100644 plugins/analysis/client/types.go create mode 100644 plugins/analysis/plugin.go create mode 100644 plugins/analysis/server/constants.go create mode 100644 plugins/analysis/server/events.go create mode 100644 plugins/analysis/server/parameters.go create mode 100644 plugins/analysis/server/plugin.go create mode 100644 plugins/analysis/server/types.go create mode 100644 plugins/analysis/types/addnode/constants.go create mode 100644 plugins/analysis/types/addnode/packet.go create mode 100644 plugins/analysis/types/connectnodes/constants.go create mode 100644 plugins/analysis/types/connectnodes/packet.go create mode 100644 plugins/analysis/types/disconnectnodes/constants.go create mode 100644 plugins/analysis/types/disconnectnodes/packet.go create mode 100644 plugins/analysis/types/ping/constants.go create mode 100644 plugins/analysis/types/ping/packet.go create mode 100644 plugins/analysis/types/removenode/constants.go create mode 100644 plugins/analysis/types/removenode/packet.go create mode 100644 plugins/analysis/webinterface/httpserver/data_stream.go create mode 100644 plugins/analysis/webinterface/httpserver/index.go create mode 100644 plugins/analysis/webinterface/httpserver/plugin.go create mode 100644 plugins/analysis/webinterface/plugin.go create mode 100644 plugins/analysis/webinterface/recordedevents/recorded_events.go create mode 100644 plugins/analysis/webinterface/types/types.go diff --git a/main.go b/main.go index 76a9ca87..ea8dfe65 100644 --- a/main.go +++ b/main.go @@ -2,6 +2,7 @@ package main import ( "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/plugins/analysis" "github.com/iotaledger/goshimmer/plugins/autopeering" "github.com/iotaledger/goshimmer/plugins/cli" "github.com/iotaledger/goshimmer/plugins/gracefulshutdown" @@ -14,5 +15,6 @@ func main() { autopeering.PLUGIN, statusscreen.PLUGIN, gracefulshutdown.PLUGIN, + analysis.PLUGIN, ) } diff --git a/packages/network/events.go b/packages/network/events.go index 188ddea1..fff2621d 100644 --- a/packages/network/events.go +++ b/packages/network/events.go @@ -52,7 +52,7 @@ func (this *dataEvent) Attach(callback DataConsumer) { this.callbacks[reflect.ValueOf(callback).Pointer()] = callback } -func (this *dataEvent) Detach(callback ErrorConsumer) { +func (this *dataEvent) Detach(callback DataConsumer) { delete(this.callbacks, reflect.ValueOf(callback).Pointer()) } diff --git a/plugins/analysis/client/parameters.go b/plugins/analysis/client/parameters.go new file mode 100644 index 00000000..18558bbb --- /dev/null +++ b/plugins/analysis/client/parameters.go @@ -0,0 +1,7 @@ +package client + +import "github.com/iotaledger/goshimmer/packages/parameter" + +var ( + SERVER_ADDRESS = parameter.AddString("ANALYSIS/SERVER-ADDRESS", "82.165.29.179:188", "tcp server for collecting analysis information") +) diff --git a/plugins/analysis/client/plugin.go b/plugins/analysis/client/plugin.go new file mode 100644 index 00000000..3e589f47 --- /dev/null +++ b/plugins/analysis/client/plugin.go @@ -0,0 +1,114 @@ +package client + +import ( + "github.com/iotaledger/goshimmer/packages/accountability" + "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/network" + "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/plugins/analysis/types/addnode" + "github.com/iotaledger/goshimmer/plugins/analysis/types/connectnodes" + "github.com/iotaledger/goshimmer/plugins/analysis/types/ping" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol" + "github.com/iotaledger/goshimmer/plugins/autopeering/types/peer" + "github.com/iotaledger/goshimmer/plugins/autopeering/types/request" + "github.com/iotaledger/goshimmer/plugins/autopeering/types/response" + "github.com/iotaledger/goshimmer/plugins/gossip/neighbormanager" + "net" + "time" +) + +func Run(plugin *node.Plugin) { + daemon.BackgroundWorker(func() { + shuttingDown := false + for !shuttingDown { + if conn, err := net.Dial("tcp", *SERVER_ADDRESS.Value); err != nil { + plugin.LogDebug("Could not connect to reporting server: " + err.Error()) + } else { + managedConn := network.NewManagedConnection(conn) + eventDispatchers := getEventDispatchers(managedConn) + + reportCurrentStatus(eventDispatchers) + setupHooks(managedConn, eventDispatchers) + + shuttingDown = keepConnectionAlive(managedConn) + } + } + }) +} + +func getEventDispatchers(conn *network.ManagedConnection) *EventDispatchers { + return &EventDispatchers{ + AddNode: func(nodeId []byte) { + conn.Write((&addnode.Packet{ NodeId: nodeId }).Marshal()) + }, + ConnectNodes: func(sourceId []byte, targetId []byte) { + conn.Write((&connectnodes.Packet{ SourceId: sourceId, TargetId: targetId }).Marshal()) + }, + } +} + +func reportCurrentStatus(eventDispatchers *EventDispatchers) { + eventDispatchers.AddNode(accountability.OWN_ID.Identifier) + + reportChosenNeighbors(eventDispatchers) +} + +func setupHooks(conn *network.ManagedConnection, eventDispatchers *EventDispatchers) { + // define hooks //////////////////////////////////////////////////////////////////////////////////////////////////// + + onDiscoverPeer := func(p *peer.Peer) { + eventDispatchers.AddNode(p.Identity.Identifier) + } + + onIncomingRequestAccepted := func(req *request.Request) { + eventDispatchers.ConnectNodes(req.Issuer.Identity.Identifier, accountability.OWN_ID.Identifier) + } + + onOutgoingRequestAccepted := func(res *response.Response) { + eventDispatchers.ConnectNodes(accountability.OWN_ID.Identifier, res.Issuer.Identity.Identifier) + } + + // setup hooks ///////////////////////////////////////////////////////////////////////////////////////////////////// + + protocol.Events.DiscoverPeer.Attach(onDiscoverPeer) + protocol.Events.IncomingRequestAccepted.Attach(onIncomingRequestAccepted) + protocol.Events.OutgoingRequestAccepted.Attach(onOutgoingRequestAccepted) + + // clean up hooks on close ///////////////////////////////////////////////////////////////////////////////////////// + + var onClose func() + onClose = func() { + protocol.Events.DiscoverPeer.Detach(onDiscoverPeer) + protocol.Events.IncomingRequestAccepted.Detach(onIncomingRequestAccepted) + protocol.Events.OutgoingRequestAccepted.Detach(onOutgoingRequestAccepted) + + conn.Events.Close.Detach(onClose) + } + conn.Events.Close.Attach(onClose) +} + +func reportChosenNeighbors(dispatchers *EventDispatchers) { + for _, chosenNeighbor := range neighbormanager.CHOSEN_NEIGHBORS { + dispatchers.AddNode(chosenNeighbor.Identity.Identifier) + } + for _, chosenNeighbor := range neighbormanager.CHOSEN_NEIGHBORS { + dispatchers.ConnectNodes(accountability.OWN_ID.Identifier, chosenNeighbor.Identity.Identifier) + } +} + +func keepConnectionAlive(conn *network.ManagedConnection) bool { + go conn.Read(make([]byte, 1)) + + ticker := time.NewTicker(1 * time.Second) + for { + select { + case <- daemon.ShutdownSignal: + return true + + case <- ticker.C: + if _, err := conn.Write((&ping.Packet{}).Marshal()); err != nil { + return false + } + } + } +} diff --git a/plugins/analysis/client/types.go b/plugins/analysis/client/types.go new file mode 100644 index 00000000..c6e19fd4 --- /dev/null +++ b/plugins/analysis/client/types.go @@ -0,0 +1,6 @@ +package client + +type EventDispatchers struct { + AddNode func(nodeId []byte) + ConnectNodes func(sourceId []byte, targetId []byte) +} diff --git a/plugins/analysis/plugin.go b/plugins/analysis/plugin.go new file mode 100644 index 00000000..15c5d0b3 --- /dev/null +++ b/plugins/analysis/plugin.go @@ -0,0 +1,41 @@ +package analysis + +import ( + "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/plugins/analysis/client" + "github.com/iotaledger/goshimmer/plugins/analysis/server" + "github.com/iotaledger/goshimmer/plugins/analysis/webinterface" +) + +var PLUGIN = node.NewPlugin("Analysis", configure, run) + +func configure(plugin *node.Plugin) { + if *server.SERVER_PORT.Value != 0 { + webinterface.Configure(plugin) + server.Configure(plugin) + + server.Events.AddNode.Attach(func(nodeId string) { + + }) + + daemon.Events.Shutdown.Attach(func() { + server.Shutdown(plugin) + }) + } +} + +func run(plugin *node.Plugin) { + if *server.SERVER_PORT.Value != 0 { + webinterface.Run(plugin) + server.Run(plugin) + } else { + plugin.Node.LogSuccess("Node", "Starting Plugin: Analysis ... server is disabled (server-port is 0)") + } + + if *client.SERVER_ADDRESS.Value != "" { + client.Run(plugin) + } else { + plugin.Node.LogSuccess("Node", "Starting Plugin: Analysis ... client is disabled (server-address is empty)") + } +} diff --git a/plugins/analysis/server/constants.go b/plugins/analysis/server/constants.go new file mode 100644 index 00000000..a0cd26fc --- /dev/null +++ b/plugins/analysis/server/constants.go @@ -0,0 +1,23 @@ +package server + +import ( + "github.com/iotaledger/goshimmer/plugins/analysis/types/addnode" + "github.com/iotaledger/goshimmer/plugins/analysis/types/connectnodes" + "github.com/iotaledger/goshimmer/plugins/analysis/types/disconnectnodes" + "github.com/iotaledger/goshimmer/plugins/analysis/types/ping" + "github.com/iotaledger/goshimmer/plugins/analysis/types/removenode" + "time" +) + +const ( + IDLE_TIMEOUT = 5 * time.Second + + STATE_INITIAL = byte(255) + STATE_INITIAL_ADDNODE = byte(254) + STATE_CONSECUTIVE = byte(253) + STATE_PING = ping.MARSHALLED_PACKET_HEADER + STATE_ADD_NODE = addnode.MARSHALLED_PACKET_HEADER + STATE_REMOVE_NODE = removenode.MARSHALLED_PACKET_HEADER + STATE_CONNECT_NODES = connectnodes.MARSHALLED_PACKET_HEADER + STATE_DISCONNECT_NODES = disconnectnodes.MARSHALLED_PACKET_HEADER +) diff --git a/plugins/analysis/server/events.go b/plugins/analysis/server/events.go new file mode 100644 index 00000000..c133e4a4 --- /dev/null +++ b/plugins/analysis/server/events.go @@ -0,0 +1,85 @@ +package server + +import ( + "reflect" +) + +var Events = &pluginEvents{ + AddNode: &nodeIdEvent{make(map[uintptr]StringConsumer)}, + RemoveNode: &nodeIdEvent{make(map[uintptr]StringConsumer)}, + ConnectNodes: &nodeIdsEvent{make(map[uintptr]StringStringConsumer)}, + DisconnectNodes: &nodeIdsEvent{make(map[uintptr]StringStringConsumer)}, + NodeOnline: &nodeIdEvent{make(map[uintptr]StringConsumer)}, + NodeOffline: &nodeIdEvent{make(map[uintptr]StringConsumer)}, + Error: &errorEvent{make(map[uintptr]ErrorConsumer)}, +} + +type pluginEvents struct { + AddNode *nodeIdEvent + RemoveNode *nodeIdEvent + ConnectNodes *nodeIdsEvent + DisconnectNodes *nodeIdsEvent + NodeOnline *nodeIdEvent + NodeOffline *nodeIdEvent + Error *errorEvent +} + +type nodeIdEvent struct { + callbacks map[uintptr]StringConsumer +} + +func (this *nodeIdEvent) Attach(callback StringConsumer) { + this.callbacks[reflect.ValueOf(callback).Pointer()] = callback +} + +func (this *nodeIdEvent) Detach(callback StringConsumer) { + delete(this.callbacks, reflect.ValueOf(callback).Pointer()) +} + +func (this *nodeIdEvent) Trigger(nodeId string) { + for _, callback := range this.callbacks { + callback(nodeId) + } +} + +type nodeIdsEvent struct { + callbacks map[uintptr]StringStringConsumer +} + +func (this *nodeIdsEvent) Attach(callback StringStringConsumer) { + this.callbacks[reflect.ValueOf(callback).Pointer()] = callback +} + +func (this *nodeIdsEvent) Detach(callback StringStringConsumer) { + delete(this.callbacks, reflect.ValueOf(callback).Pointer()) +} + +func (this *nodeIdsEvent) Trigger(sourceId string, targetId string) { + for _, callback := range this.callbacks { + callback(sourceId, targetId) + } +} + +type errorEvent struct { + callbacks map[uintptr]ErrorConsumer +} + +func (this *errorEvent) Attach(callback ErrorConsumer) { + this.callbacks[reflect.ValueOf(callback).Pointer()] = callback +} + +func (this *errorEvent) Detach(callback ErrorConsumer) { + delete(this.callbacks, reflect.ValueOf(callback).Pointer()) +} + +func (this *errorEvent) Trigger(err error) { + for _, callback := range this.callbacks { + callback(err) + } +} + +type ErrorConsumer = func(err error) + +type StringConsumer = func(str string) + +type StringStringConsumer = func(sourceId string, targetId string) diff --git a/plugins/analysis/server/parameters.go b/plugins/analysis/server/parameters.go new file mode 100644 index 00000000..63fcaab9 --- /dev/null +++ b/plugins/analysis/server/parameters.go @@ -0,0 +1,7 @@ +package server + +import "github.com/iotaledger/goshimmer/packages/parameter" + +var ( + SERVER_PORT = parameter.AddInt("ANALYSIS/SERVER-PORT", 0, "tcp port for incoming analysis packets") +) diff --git a/plugins/analysis/server/plugin.go b/plugins/analysis/server/plugin.go new file mode 100644 index 00000000..7fb373b9 --- /dev/null +++ b/plugins/analysis/server/plugin.go @@ -0,0 +1,270 @@ +package server + +import ( + "encoding/hex" + "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/network" + "github.com/iotaledger/goshimmer/packages/network/tcp" + "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/plugins/analysis/types/addnode" + "github.com/iotaledger/goshimmer/plugins/analysis/types/connectnodes" + "github.com/iotaledger/goshimmer/plugins/analysis/types/disconnectnodes" + "github.com/iotaledger/goshimmer/plugins/analysis/types/ping" + "github.com/iotaledger/goshimmer/plugins/analysis/types/removenode" + "github.com/pkg/errors" + "math" + "strconv" +) + +var server *tcp.Server + +func Configure(plugin *node.Plugin) { + server = tcp.NewServer() + + server.Events.Connect.Attach(HandleConnection) + server.Events.Error.Attach(func(err error) { + plugin.LogFailure("error in server: " + err.Error()) + }) + server.Events.Start.Attach(func() { + plugin.LogSuccess("Starting Server (port " + strconv.Itoa(*SERVER_PORT.Value) + ") ... done") + }) + server.Events.Shutdown.Attach(func() { + plugin.LogSuccess("Stopping Server ... done") + }) +} + +func Run(plugin *node.Plugin) { + daemon.BackgroundWorker(func() { + plugin.LogInfo("Starting Server (port " + strconv.Itoa(*SERVER_PORT.Value) + ") ...") + + server.Listen(*SERVER_PORT.Value) + }) +} + +func Shutdown(plugin *node.Plugin) { + plugin.LogInfo("Stopping Server ...") + + server.Shutdown() +} + +func HandleConnection(conn *network.ManagedConnection) { + conn.SetTimeout(IDLE_TIMEOUT) + + var connectionState = STATE_INITIAL + var receiveBuffer []byte + var offset int + var connectedNodeId string + + var onReceiveData func(data []byte) + var onDisconnect func() + + onReceiveData = func(data []byte) { + processIncomingPacket(&connectionState, &receiveBuffer, conn, data, &offset, &connectedNodeId) + } + onDisconnect = func() { + Events.NodeOffline.Trigger(connectedNodeId) + + conn.Events.ReceiveData.Detach(onReceiveData) + conn.Events.Close.Detach(onDisconnect) + } + + conn.Events.ReceiveData.Attach(onReceiveData) + conn.Events.Close.Attach(onDisconnect) + + maxPacketsSize := getMaxPacketSize( + ping.MARSHALLED_TOTAL_SIZE, + addnode.MARSHALLED_TOTAL_SIZE, + removenode.MARSHALLED_TOTAL_SIZE, + connectnodes.MARSHALLED_TOTAL_SIZE, + disconnectnodes.MARSHALLED_PACKET_HEADER, + ) + + go conn.Read(make([]byte, maxPacketsSize)) +} + +func getMaxPacketSize(packetSizes ...int) int { + maxPacketSize := 0 + + for _, packetSize := range packetSizes { + if packetSize > maxPacketSize { + maxPacketSize = packetSize + } + } + + return maxPacketSize +} + +func processIncomingPacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte, offset *int, connectedNodeId *string) { + firstPackage := *connectionState == STATE_INITIAL + + if firstPackage || *connectionState == STATE_CONSECUTIVE { + var err error + if *connectionState, *receiveBuffer, err = parsePackageHeader(data); err != nil { + Events.Error.Trigger(err) + + conn.Close() + + return + } + + *offset = 0 + + switch *connectionState { + case STATE_ADD_NODE: + *receiveBuffer = make([]byte, addnode.MARSHALLED_TOTAL_SIZE) + + case STATE_PING: + *receiveBuffer = make([]byte, ping.MARSHALLED_TOTAL_SIZE) + + case STATE_CONNECT_NODES: + *receiveBuffer = make([]byte, connectnodes.MARSHALLED_TOTAL_SIZE) + + case STATE_REMOVE_NODE: + *receiveBuffer = make([]byte, removenode.MARSHALLED_TOTAL_SIZE) + } + } + + if firstPackage { + if *connectionState != STATE_ADD_NODE { + Events.Error.Trigger(errors.New("expected initial add node package")) + } else { + *connectionState = STATE_INITIAL_ADDNODE + } + } + + switch *connectionState { + case STATE_INITIAL_ADDNODE: + processIncomingAddNodePacket(connectionState, receiveBuffer, conn, data, offset, connectedNodeId) + + case STATE_ADD_NODE: + processIncomingAddNodePacket(connectionState, receiveBuffer, conn, data, offset, connectedNodeId) + + case STATE_PING: + processIncomingPingPacket(connectionState, receiveBuffer, conn, data, offset, connectedNodeId) + + case STATE_CONNECT_NODES: + processIncomingConnectNodesPacket(connectionState, receiveBuffer, conn, data, offset, connectedNodeId) + + case STATE_REMOVE_NODE: + processIncomingAddNodePacket(connectionState, receiveBuffer, conn, data, offset, connectedNodeId) + } +} + +func parsePackageHeader(data []byte) (ConnectionState, []byte, error) { + var connectionState ConnectionState + var receiveBuffer []byte + + switch data[0] { + case ping.MARSHALLED_PACKET_HEADER: + receiveBuffer = make([]byte, ping.MARSHALLED_TOTAL_SIZE) + + connectionState = STATE_PING + + case addnode.MARSHALLED_PACKET_HEADER: + receiveBuffer = make([]byte, addnode.MARSHALLED_TOTAL_SIZE) + + connectionState = STATE_ADD_NODE + + case connectnodes.MARSHALLED_PACKET_HEADER: + receiveBuffer = make([]byte, connectnodes.MARSHALLED_TOTAL_SIZE) + + connectionState = STATE_CONNECT_NODES + + case removenode.MARSHALLED_PACKET_HEADER: + receiveBuffer = make([]byte, removenode.MARSHALLED_TOTAL_SIZE) + + connectionState = STATE_REMOVE_NODE + + default: + return 0, nil, errors.New("invalid package header") + } + + return connectionState, receiveBuffer, nil +} + +func processIncomingAddNodePacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte, offset *int, connectedNodeId *string) { + remainingCapacity := int(math.Min(float64(addnode.MARSHALLED_TOTAL_SIZE- *offset), float64(len(data)))) + + copy((*receiveBuffer)[*offset:], data[:remainingCapacity]) + + if *offset + len(data) < addnode.MARSHALLED_TOTAL_SIZE { + *offset += len(data) + } else { + if addNodePacket, err := addnode.Unmarshal(*receiveBuffer); err != nil { + Events.Error.Trigger(err) + + conn.Close() + + return + } else { + nodeId := hex.EncodeToString(addNodePacket.NodeId) + + Events.AddNode.Trigger(nodeId) + + if *connectionState == STATE_INITIAL_ADDNODE { + *connectedNodeId = nodeId + + Events.NodeOnline.Trigger(nodeId) + } + } + + *connectionState = STATE_CONSECUTIVE + + if *offset + len(data) > addnode.MARSHALLED_TOTAL_SIZE { + processIncomingPacket(connectionState, receiveBuffer, conn, data[remainingCapacity:], offset, connectedNodeId) + } + } +} + +func processIncomingPingPacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte, offset *int, connectedNodeId *string) { + remainingCapacity := int(math.Min(float64(ping.MARSHALLED_TOTAL_SIZE- *offset), float64(len(data)))) + + copy((*receiveBuffer)[*offset:], data[:remainingCapacity]) + + if *offset + len(data) < ping.MARSHALLED_TOTAL_SIZE { + *offset += len(data) + } else { + if _, err := ping.Unmarshal(*receiveBuffer); err != nil { + Events.Error.Trigger(err) + + conn.Close() + + return + } + + *connectionState = STATE_CONSECUTIVE + + if *offset + len(data) > ping.MARSHALLED_TOTAL_SIZE { + processIncomingPacket(connectionState, receiveBuffer, conn, data[remainingCapacity:], offset, connectedNodeId) + } + } +} + +func processIncomingConnectNodesPacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte, offset *int, connectedNodeId *string) { + remainingCapacity := int(math.Min(float64(connectnodes.MARSHALLED_TOTAL_SIZE- *offset), float64(len(data)))) + + copy((*receiveBuffer)[*offset:], data[:remainingCapacity]) + + if *offset + len(data) < connectnodes.MARSHALLED_TOTAL_SIZE { + *offset += len(data) + } else { + if connectNodesPacket, err := connectnodes.Unmarshal(*receiveBuffer); err != nil { + Events.Error.Trigger(err) + + conn.Close() + + return + } else { + sourceNodeId := hex.EncodeToString(connectNodesPacket.SourceId) + targetNodeId := hex.EncodeToString(connectNodesPacket.TargetId) + + Events.ConnectNodes.Trigger(sourceNodeId, targetNodeId) + } + + *connectionState = STATE_CONSECUTIVE + + if *offset + len(data) > connectnodes.MARSHALLED_TOTAL_SIZE { + processIncomingPacket(connectionState, receiveBuffer, conn, data[remainingCapacity:], offset, connectedNodeId) + } + } +} diff --git a/plugins/analysis/server/types.go b/plugins/analysis/server/types.go new file mode 100644 index 00000000..2dd1e29a --- /dev/null +++ b/plugins/analysis/server/types.go @@ -0,0 +1,3 @@ +package server + +type ConnectionState = byte diff --git a/plugins/analysis/types/addnode/constants.go b/plugins/analysis/types/addnode/constants.go new file mode 100644 index 00000000..c1154439 --- /dev/null +++ b/plugins/analysis/types/addnode/constants.go @@ -0,0 +1,15 @@ +package addnode + +const ( + MARSHALLED_PACKET_HEADER = 0x01 + + MARSHALLED_PACKET_HEADER_START = 0 + MARSHALLED_PACKET_HEADER_SIZE = 1 + MARSHALLED_PACKET_HEADER_END = MARSHALLED_PACKET_HEADER_START + MARSHALLED_PACKET_HEADER_SIZE + + MARSHALLED_ID_START = MARSHALLED_PACKET_HEADER_END + MARSHALLED_ID_SIZE = 20 + MARSHALLED_ID_END = MARSHALLED_ID_START + MARSHALLED_ID_SIZE + + MARSHALLED_TOTAL_SIZE = MARSHALLED_ID_END +) diff --git a/plugins/analysis/types/addnode/packet.go b/plugins/analysis/types/addnode/packet.go new file mode 100644 index 00000000..0fba847f --- /dev/null +++ b/plugins/analysis/types/addnode/packet.go @@ -0,0 +1,30 @@ +package addnode + +import "github.com/pkg/errors" + +type Packet struct { + NodeId []byte +} + +func Unmarshal(data []byte) (*Packet, error) { + if len(data) < MARSHALLED_TOTAL_SIZE || data[0] != MARSHALLED_PACKET_HEADER { + return nil, errors.New("malformed add node packet") + } + + unmarshalledPackage := &Packet{ + NodeId: make([]byte, MARSHALLED_ID_SIZE), + } + + copy(unmarshalledPackage.NodeId, data[MARSHALLED_ID_START:MARSHALLED_ID_END]) + + return unmarshalledPackage, nil +} + +func (packet *Packet) Marshal() []byte { + marshalledPackage := make([]byte, MARSHALLED_TOTAL_SIZE) + + marshalledPackage[MARSHALLED_PACKET_HEADER_START] = MARSHALLED_PACKET_HEADER + copy(marshalledPackage[MARSHALLED_ID_START:MARSHALLED_ID_END], packet.NodeId[:MARSHALLED_ID_SIZE]) + + return marshalledPackage +} diff --git a/plugins/analysis/types/connectnodes/constants.go b/plugins/analysis/types/connectnodes/constants.go new file mode 100644 index 00000000..ce98f574 --- /dev/null +++ b/plugins/analysis/types/connectnodes/constants.go @@ -0,0 +1,19 @@ +package connectnodes + +const ( + MARSHALLED_PACKET_HEADER = 0x03 + + MARSHALLED_PACKET_HEADER_START = 0 + MARSHALLED_PACKET_HEADER_SIZE = 1 + MARSHALLED_PACKET_HEADER_END = MARSHALLED_PACKET_HEADER_START + MARSHALLED_PACKET_HEADER_SIZE + + MARSHALLED_SOURCE_ID_START = MARSHALLED_PACKET_HEADER_END + MARSHALLED_SOURCE_ID_SIZE = 20 + MARSHALLED_SOURCE_ID_END = MARSHALLED_SOURCE_ID_START + MARSHALLED_SOURCE_ID_SIZE + + MARSHALLED_TARGET_ID_START = MARSHALLED_SOURCE_ID_END + MARSHALLED_TARGET_ID_SIZE = 20 + MARSHALLED_TARGET_ID_END = MARSHALLED_TARGET_ID_START + MARSHALLED_TARGET_ID_SIZE + + MARSHALLED_TOTAL_SIZE = MARSHALLED_TARGET_ID_END +) diff --git a/plugins/analysis/types/connectnodes/packet.go b/plugins/analysis/types/connectnodes/packet.go new file mode 100644 index 00000000..19ff5cf9 --- /dev/null +++ b/plugins/analysis/types/connectnodes/packet.go @@ -0,0 +1,34 @@ +package connectnodes + +import "github.com/pkg/errors" + +type Packet struct { + SourceId []byte + TargetId []byte +} + +func Unmarshal(data []byte) (*Packet, error) { + if len(data) < MARSHALLED_TOTAL_SIZE || data[0] != MARSHALLED_PACKET_HEADER { + return nil, errors.New("malformed connect nodes packet") + } + + unmarshalledPackage := &Packet{ + SourceId: make([]byte, MARSHALLED_SOURCE_ID_SIZE), + TargetId: make([]byte, MARSHALLED_TARGET_ID_SIZE), + } + + copy(unmarshalledPackage.SourceId, data[MARSHALLED_SOURCE_ID_START:MARSHALLED_SOURCE_ID_END]) + copy(unmarshalledPackage.TargetId, data[MARSHALLED_TARGET_ID_START:MARSHALLED_TARGET_ID_END]) + + return unmarshalledPackage, nil +} + +func (packet *Packet) Marshal() []byte { + marshalledPackage := make([]byte, MARSHALLED_TOTAL_SIZE) + + marshalledPackage[MARSHALLED_PACKET_HEADER_START] = MARSHALLED_PACKET_HEADER + copy(marshalledPackage[MARSHALLED_SOURCE_ID_START:MARSHALLED_SOURCE_ID_END], packet.SourceId[:MARSHALLED_SOURCE_ID_SIZE]) + copy(marshalledPackage[MARSHALLED_TARGET_ID_START:MARSHALLED_TARGET_ID_END], packet.TargetId[:MARSHALLED_TARGET_ID_SIZE]) + + return marshalledPackage +} diff --git a/plugins/analysis/types/disconnectnodes/constants.go b/plugins/analysis/types/disconnectnodes/constants.go new file mode 100644 index 00000000..986d8aca --- /dev/null +++ b/plugins/analysis/types/disconnectnodes/constants.go @@ -0,0 +1,19 @@ +package disconnectnodes + +const ( + MARSHALLED_PACKET_HEADER = 0x04 + + MARSHALLED_PACKET_HEADER_START = 0 + MARSHALLED_PACKET_HEADER_SIZE = 1 + MARSHALLED_PACKET_HEADER_END = MARSHALLED_PACKET_HEADER_START + MARSHALLED_PACKET_HEADER_SIZE + + MARSHALLED_SOURCE_ID_START = MARSHALLED_PACKET_HEADER_END + MARSHALLED_SOURCE_ID_SIZE = 20 + MARSHALLED_SOURCE_ID_END = MARSHALLED_SOURCE_ID_START + MARSHALLED_SOURCE_ID_SIZE + + MARSHALLED_TARGET_ID_START = MARSHALLED_SOURCE_ID_END + MARSHALLED_TARGET_ID_SIZE = 20 + MARSHALLED_TARGET_ID_END = MARSHALLED_TARGET_ID_START + MARSHALLED_TARGET_ID_SIZE + + MARSHALLED_TOTAL_SIZE = MARSHALLED_TARGET_ID_END +) diff --git a/plugins/analysis/types/disconnectnodes/packet.go b/plugins/analysis/types/disconnectnodes/packet.go new file mode 100644 index 00000000..59b413d1 --- /dev/null +++ b/plugins/analysis/types/disconnectnodes/packet.go @@ -0,0 +1,34 @@ +package disconnectnodes + +import "github.com/pkg/errors" + +type Packet struct { + SourceId []byte + TargetId []byte +} + +func Unmarshal(data []byte) (*Packet, error) { + if len(data) < MARSHALLED_TOTAL_SIZE || data[0] != MARSHALLED_PACKET_HEADER { + return nil, errors.New("malformed disconnect nodes packet") + } + + unmarshalledPackage := &Packet{ + SourceId: make([]byte, MARSHALLED_SOURCE_ID_SIZE), + TargetId: make([]byte, MARSHALLED_TARGET_ID_SIZE), + } + + copy(unmarshalledPackage.SourceId, data[MARSHALLED_SOURCE_ID_START:MARSHALLED_SOURCE_ID_END]) + copy(unmarshalledPackage.TargetId, data[MARSHALLED_TARGET_ID_START:MARSHALLED_TARGET_ID_END]) + + return unmarshalledPackage, nil +} + +func (packet *Packet) Marshal() []byte { + marshalledPackage := make([]byte, MARSHALLED_TOTAL_SIZE) + + marshalledPackage[MARSHALLED_PACKET_HEADER_START] = MARSHALLED_PACKET_HEADER + copy(marshalledPackage[MARSHALLED_SOURCE_ID_START:MARSHALLED_SOURCE_ID_END], packet.SourceId[:MARSHALLED_SOURCE_ID_SIZE]) + copy(marshalledPackage[MARSHALLED_TARGET_ID_START:MARSHALLED_TARGET_ID_END], packet.TargetId[:MARSHALLED_TARGET_ID_SIZE]) + + return marshalledPackage +} diff --git a/plugins/analysis/types/ping/constants.go b/plugins/analysis/types/ping/constants.go new file mode 100644 index 00000000..09141e17 --- /dev/null +++ b/plugins/analysis/types/ping/constants.go @@ -0,0 +1,11 @@ +package ping + +const ( + MARSHALLED_PACKET_HEADER = 0x00 + + MARSHALLED_PACKET_HEADER_START = 0 + MARSHALLED_PACKET_HEADER_SIZE = 1 + MARSHALLED_PACKET_HEADER_END = MARSHALLED_PACKET_HEADER_START + MARSHALLED_PACKET_HEADER_SIZE + + MARSHALLED_TOTAL_SIZE = MARSHALLED_PACKET_HEADER_END +) diff --git a/plugins/analysis/types/ping/packet.go b/plugins/analysis/types/ping/packet.go new file mode 100644 index 00000000..27fc47d8 --- /dev/null +++ b/plugins/analysis/types/ping/packet.go @@ -0,0 +1,23 @@ +package ping + +import "github.com/pkg/errors" + +type Packet struct {} + +func Unmarshal(data []byte) (*Packet, error) { + if len(data) < MARSHALLED_TOTAL_SIZE || data[MARSHALLED_PACKET_HEADER_START] != MARSHALLED_PACKET_HEADER { + return nil, errors.New("malformed ping packet") + } + + unmarshalledPacket := &Packet{} + + return unmarshalledPacket, nil +} + +func (packet *Packet) Marshal() []byte { + marshalledPackage := make([]byte, MARSHALLED_TOTAL_SIZE) + + marshalledPackage[MARSHALLED_PACKET_HEADER_START] = MARSHALLED_PACKET_HEADER + + return marshalledPackage +} \ No newline at end of file diff --git a/plugins/analysis/types/removenode/constants.go b/plugins/analysis/types/removenode/constants.go new file mode 100644 index 00000000..91115401 --- /dev/null +++ b/plugins/analysis/types/removenode/constants.go @@ -0,0 +1,15 @@ +package removenode + +const ( + MARSHALLED_PACKET_HEADER = 0x02 + + MARSHALLED_PACKET_HEADER_START = 0 + MARSHALLED_PACKET_HEADER_SIZE = 1 + MARSHALLED_PACKET_HEADER_END = MARSHALLED_PACKET_HEADER_START + MARSHALLED_PACKET_HEADER_SIZE + + MARSHALLED_ID_START = MARSHALLED_PACKET_HEADER_END + MARSHALLED_ID_SIZE = 20 + MARSHALLED_ID_END = MARSHALLED_ID_START + MARSHALLED_ID_SIZE + + MARSHALLED_TOTAL_SIZE = MARSHALLED_ID_END +) diff --git a/plugins/analysis/types/removenode/packet.go b/plugins/analysis/types/removenode/packet.go new file mode 100644 index 00000000..52095b9b --- /dev/null +++ b/plugins/analysis/types/removenode/packet.go @@ -0,0 +1,31 @@ +package removenode + +import "github.com/pkg/errors" + +type Packet struct { + NodeId []byte +} + +func Unmarshal(data []byte) (*Packet, error) { + if len(data) < MARSHALLED_TOTAL_SIZE || data[0] != MARSHALLED_PACKET_HEADER { + return nil, errors.New("malformed remove node packet") + } + + unmarshalledPackage := &Packet{ + NodeId: make([]byte, MARSHALLED_ID_SIZE), + } + + copy(unmarshalledPackage.NodeId, data[MARSHALLED_ID_START:MARSHALLED_ID_END]) + + return unmarshalledPackage, nil +} + +func (packet *Packet) Marshal() []byte { + marshalledPackage := make([]byte, MARSHALLED_TOTAL_SIZE) + + marshalledPackage[MARSHALLED_PACKET_HEADER_START] = MARSHALLED_PACKET_HEADER + copy(marshalledPackage[MARSHALLED_ID_START:MARSHALLED_ID_END], packet.NodeId[:MARSHALLED_ID_SIZE]) + + return marshalledPackage +} + diff --git a/plugins/analysis/webinterface/httpserver/data_stream.go b/plugins/analysis/webinterface/httpserver/data_stream.go new file mode 100644 index 00000000..a7e4bf7b --- /dev/null +++ b/plugins/analysis/webinterface/httpserver/data_stream.go @@ -0,0 +1,46 @@ +package httpserver + +import ( + "fmt" + "github.com/iotaledger/goshimmer/plugins/analysis/server" + "github.com/iotaledger/goshimmer/plugins/analysis/webinterface/recordedevents" + "github.com/iotaledger/goshimmer/plugins/analysis/webinterface/types" + "golang.org/x/net/websocket" +) + +func dataStream(ws *websocket.Conn) { + eventHandlers := &types.EventHandlers{ + AddNode: func(nodeId string) { fmt.Fprint(ws, "A"+nodeId) }, + RemoveNode: func(nodeId string) { fmt.Fprint(ws, "a"+nodeId) }, + ConnectNodes: func(sourceId string, targetId string) { fmt.Fprint(ws, "C"+sourceId+targetId) }, + DisconnectNodes: func(sourceId string, targetId string) { fmt.Fprint(ws, "c"+sourceId+targetId) }, + NodeOnline: func(nodeId string) { fmt.Fprint(ws, "O"+nodeId) }, + NodeOffline: func(nodeId string) { fmt.Fprint(ws, "o"+nodeId) }, + } + + server.Events.AddNode.Attach(eventHandlers.AddNode) + server.Events.RemoveNode.Attach(eventHandlers.RemoveNode) + server.Events.ConnectNodes.Attach(eventHandlers.ConnectNodes) + server.Events.DisconnectNodes.Attach(eventHandlers.DisconnectNodes) + server.Events.NodeOnline.Attach(eventHandlers.NodeOnline) + server.Events.NodeOffline.Attach(eventHandlers.NodeOffline) + + go recordedevents.Replay(eventHandlers, 0) + + buf := make([]byte, 1) + readFromWebsocket: + for { + if _, err := ws.Read(buf); err != nil { + break readFromWebsocket + } + + fmt.Fprint(ws, "_") + } + + server.Events.AddNode.Detach(eventHandlers.AddNode) + server.Events.RemoveNode.Detach(eventHandlers.RemoveNode) + server.Events.ConnectNodes.Detach(eventHandlers.ConnectNodes) + server.Events.DisconnectNodes.Detach(eventHandlers.DisconnectNodes) + server.Events.NodeOnline.Detach(eventHandlers.NodeOnline) + server.Events.NodeOffline.Detach(eventHandlers.NodeOffline) +} diff --git a/plugins/analysis/webinterface/httpserver/index.go b/plugins/analysis/webinterface/httpserver/index.go new file mode 100644 index 00000000..ea92cd23 --- /dev/null +++ b/plugins/analysis/webinterface/httpserver/index.go @@ -0,0 +1,131 @@ +package httpserver + +import ( + "fmt" + "net/http" +) + +func index(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, `<head> + <style> body { margin: 0; } </style> + + <script src="https://unpkg.com/3d-force-graph"></script> + <!--<script src="../../dist/3d-force-graph.js"></script>--> +</head> + +<body> + <div id="3d-graph"></div> + + <script> + var socket = new WebSocket(((window.location.protocol === "https:") ? "wss://" : "ws://") + window.location.host + "/datastream"); + + socket.onopen = function () { + console.log("Status: Connected\n"); + + setInterval(function() { + socket.send("_"); + }, 1000); + }; + + socket.onmessage = function (e) { + switch (e.data[0]) { + case "_": + // do nothing - its just a ping + break; + + case "A": + addNode(e.data.substr(1)); + break; + + case "a": + removeNode(e.data.substr(1)); + break; + + case "C": + connectNodes(e.data.substr(1, 40), e.data.substr(41, 40)); + break; + + case "c": + disconnectNodes(e.data.substr(1, 40), e.data.substr(41, 40)); + break; + + case "O": + setNodeOnline(e.data.substr(1)); + break; + + case "o": + setNodeOffline(e.data.substr(1)); + break; + } + }; + + var nodesById = {}; + + const data = { + nodes: [], + links: [] + }; + + const elem = document.getElementById("3d-graph"); + + const Graph = ForceGraph3D()(elem) + .enableNodeDrag(false) + .onNodeHover(node => elem.style.cursor = node ? 'pointer' : null) + .onNodeClick(removeNodeX) + .linkDirectionalParticles(3) + .linkDirectionalParticleWidth(0.8) + .linkDirectionalParticleSpeed(0.01) + .nodeColor(node => node.online ? 'rgba(0,255,0,1)' : 'rgba(255,255,255,1)') + .graphData(data); + + function addNode(nodeId) { + node = {id : nodeId, online: false}; + + if (!(node.id in nodesById)) { + data.nodes = [...data.nodes, node]; + + nodesById[node.id] = node; + + Graph.graphData(data); + } + } + + function removeNode(nodeId) { + data.links = data.links.filter(l => l.source.id !== nodeId && l.target.id !== nodeId); + data.nodes = data.nodes.filter(currentNode => currentNode.id != nodeId) + + delete nodesById[nodeId]; + + Graph.graphData(data); + } + + function setNodeOnline(nodeId) { + nodesById[nodeId].online = true; + + Graph.graphData(data); + } + + function setNodeOffline(nodeId) { + nodesById[nodeId].online = false; + + Graph.graphData(data); + } + + function connectNodes(sourceNodeId, targetNodeId) { + data.links = [...data.links, { source: sourceNodeId, target: targetNodeId }]; + + Graph.graphData(data); + } + + function disconnectNodes(sourceNodeId, targetNodeId) { + data.links = data.links.filter(l => !(l.source.id == sourceNodeId && l.target.id == targetNodeId) && !(l.source.id == targetNodeId && l.target.id == sourceNodeId)); + + Graph.graphData(data); + } + + function removeNodeX(node) { + removeNode(node.id) + } + </script> +</body>`) +} diff --git a/plugins/analysis/webinterface/httpserver/plugin.go b/plugins/analysis/webinterface/httpserver/plugin.go new file mode 100644 index 00000000..33c950ff --- /dev/null +++ b/plugins/analysis/webinterface/httpserver/plugin.go @@ -0,0 +1,36 @@ +package httpserver + +import ( + "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/node" + "golang.org/x/net/context" + "golang.org/x/net/websocket" + "net/http" + "time" +) + +var ( + httpServer *http.Server + router *http.ServeMux +) + +func Configure(plugin *node.Plugin) { + router = http.NewServeMux() + httpServer = &http.Server{Addr: ":80", Handler: router} + + router.Handle("/datastream", websocket.Handler(dataStream)) + router.HandleFunc("/", index) + + daemon.Events.Shutdown.Attach(func() { + ctx, cancel := context.WithTimeout(context.Background(), 0 * time.Second) + defer cancel() + + httpServer.Shutdown(ctx) + }) +} + +func Run(plugin *node.Plugin) { + daemon.BackgroundWorker(func() { + httpServer.ListenAndServe() + }) +} diff --git a/plugins/analysis/webinterface/plugin.go b/plugins/analysis/webinterface/plugin.go new file mode 100644 index 00000000..ef71b18c --- /dev/null +++ b/plugins/analysis/webinterface/plugin.go @@ -0,0 +1,16 @@ +package webinterface + +import ( + "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/plugins/analysis/webinterface/httpserver" + "github.com/iotaledger/goshimmer/plugins/analysis/webinterface/recordedevents" +) + +func Configure(plugin *node.Plugin) { + httpserver.Configure(plugin) + recordedevents.Configure(plugin) +} + +func Run(plugin *node.Plugin) { + httpserver.Run(plugin) +} diff --git a/plugins/analysis/webinterface/recordedevents/recorded_events.go b/plugins/analysis/webinterface/recordedevents/recorded_events.go new file mode 100644 index 00000000..2f63d9d3 --- /dev/null +++ b/plugins/analysis/webinterface/recordedevents/recorded_events.go @@ -0,0 +1,46 @@ +package recordedevents + +import ( + "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/plugins/analysis/server" + "github.com/iotaledger/goshimmer/plugins/analysis/webinterface/types" + "time" +) + +var recordedEvents = make([]types.EventHandlersConsumer, 0) + +func Configure(plugin *node.Plugin) { + server.Events.AddNode.Attach(func(nodeId string) { + recordedEvents = append(recordedEvents, func(handlers *types.EventHandlers) { + handlers.AddNode(nodeId) + }) + }) + + server.Events.NodeOnline.Attach(func(nodeId string) { + recordedEvents = append(recordedEvents, func(handlers *types.EventHandlers) { + handlers.NodeOnline(nodeId) + }) + }) + + server.Events.NodeOffline.Attach(func(nodeId string) { + recordedEvents = append(recordedEvents, func(handlers *types.EventHandlers) { + handlers.NodeOffline(nodeId) + }) + }) + + server.Events.ConnectNodes.Attach(func(sourceId string, targetId string) { + recordedEvents = append(recordedEvents, func(handlers *types.EventHandlers) { + handlers.ConnectNodes(sourceId, targetId) + }) + }) +} + +func Replay(handlers *types.EventHandlers, delay time.Duration) { + for _, recordedEvent := range recordedEvents { + recordedEvent(handlers) + + if delay != time.Duration(0) { + time.Sleep(delay) + } + } +} diff --git a/plugins/analysis/webinterface/types/types.go b/plugins/analysis/webinterface/types/types.go new file mode 100644 index 00000000..3f3ebed8 --- /dev/null +++ b/plugins/analysis/webinterface/types/types.go @@ -0,0 +1,12 @@ +package types + +type EventHandlers = struct { + AddNode func(nodeId string) + RemoveNode func(nodeId string) + ConnectNodes func(sourceId string, targetId string) + DisconnectNodes func(sourceId string, targetId string) + NodeOnline func(nodeId string) + NodeOffline func(nodeId string) +} + +type EventHandlersConsumer = func(handler *EventHandlers) diff --git a/plugins/autopeering/instances/chosenneighborcandidates/instance.go b/plugins/autopeering/instances/chosenneighborcandidates/instance.go index c4697d4f..e5c1af0b 100644 --- a/plugins/autopeering/instances/chosenneighborcandidates/instance.go +++ b/plugins/autopeering/instances/chosenneighborcandidates/instance.go @@ -4,15 +4,15 @@ import ( "github.com/iotaledger/goshimmer/plugins/autopeering/instances/neighborhood" "github.com/iotaledger/goshimmer/plugins/autopeering/instances/outgoingrequest" "github.com/iotaledger/goshimmer/plugins/autopeering/types/peer" - "github.com/iotaledger/goshimmer/plugins/autopeering/types/request" "github.com/iotaledger/goshimmer/plugins/autopeering/types/peerlist" + "hash/fnv" ) var INSTANCE peerlist.PeerList -var DISTANCE = func(req *request.Request) func(p *peer.Peer) float64 { - return func(p *peer.Peer) float64 { - return 1 +var DISTANCE = func(anchor *peer.Peer) func(p *peer.Peer) uint64 { + return func(p *peer.Peer) uint64 { + return hash(anchor.Identity.Identifier) ^ hash(p.Identity.Identifier) } } @@ -23,6 +23,13 @@ func init() { } func updateNeighborCandidates() { - INSTANCE = neighborhood.LIST_INSTANCE.Sort(DISTANCE(outgoingrequest.INSTANCE)) + INSTANCE = neighborhood.LIST_INSTANCE.Sort(DISTANCE(outgoingrequest.INSTANCE.Issuer)) +} + +func hash(data []byte) uint64 { + h := fnv.New64a() + h.Write(data) + + return h.Sum64() } diff --git a/plugins/autopeering/protocol/incoming_request_processor.go b/plugins/autopeering/protocol/incoming_request_processor.go index 493ba9e9..ffc2c283 100644 --- a/plugins/autopeering/protocol/incoming_request_processor.go +++ b/plugins/autopeering/protocol/incoming_request_processor.go @@ -6,8 +6,8 @@ import ( "github.com/iotaledger/goshimmer/plugins/autopeering/instances/neighborhood" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/constants" "github.com/iotaledger/goshimmer/plugins/autopeering/types/peer" - "github.com/iotaledger/goshimmer/plugins/autopeering/types/request" "github.com/iotaledger/goshimmer/plugins/autopeering/types/peerlist" + "github.com/iotaledger/goshimmer/plugins/autopeering/types/request" "github.com/iotaledger/goshimmer/plugins/gossip/neighbormanager" ) @@ -17,6 +17,8 @@ func createIncomingRequestProcessor(plugin *node.Plugin) func(req *request.Reque Events.DiscoverPeer.Trigger(req.Issuer) + //distanceFn := chosenneighborcandidates.DISTANCE(ownpeer.INSTANCE) + if len(neighbormanager.ACCEPTED_NEIGHBORS) <= constants.NEIGHBOR_COUNT / 2 { if err := req.Accept(proposedPeeringCandidates(req)); err != nil { plugin.LogDebug("error when sending response to" + req.Issuer.String()) @@ -40,5 +42,5 @@ func createIncomingRequestProcessor(plugin *node.Plugin) func(req *request.Reque func proposedPeeringCandidates(req *request.Request) peerlist.PeerList { return neighborhood.LIST_INSTANCE.Filter(func(p *peer.Peer) bool { return p.Identity.PublicKey != nil - }).Sort(chosenneighborcandidates.DISTANCE(req)) + }).Sort(chosenneighborcandidates.DISTANCE(req.Issuer)) } \ No newline at end of file diff --git a/plugins/autopeering/server/tcp/server.go b/plugins/autopeering/server/tcp/server.go index bfe9e987..7740316e 100644 --- a/plugins/autopeering/server/tcp/server.go +++ b/plugins/autopeering/server/tcp/server.go @@ -151,7 +151,7 @@ func processIncomingRequestPacket(connectionState *byte, receiveBuffer *[]byte, *connectionState = STATE_INITIAL if *offset + len(data) > request.MARSHALLED_TOTAL_SIZE { - ProcessIncomingPacket(connectionState, receiveBuffer, conn, data[request.MARSHALLED_TOTAL_SIZE:], offset) + ProcessIncomingPacket(connectionState, receiveBuffer, conn, data[remainingCapacity:], offset) } } } @@ -184,7 +184,7 @@ func processIncomingResponsePacket(connectionState *byte, receiveBuffer *[]byte, *connectionState = STATE_INITIAL if *offset + len(data) > response.MARSHALLED_TOTAL_SIZE { - ProcessIncomingPacket(connectionState, receiveBuffer, conn, data[response.MARSHALLED_TOTAL_SIZE:], offset) + ProcessIncomingPacket(connectionState, receiveBuffer, conn, data[remainingCapacity:], offset) } } } @@ -217,7 +217,7 @@ func processIncomingPingPacket(connectionState *byte, receiveBuffer *[]byte, con *connectionState = STATE_INITIAL if *offset + len(data) > ping.MARSHALLED_TOTAL_SIZE { - ProcessIncomingPacket(connectionState, receiveBuffer, conn, data[ping.MARSHALLED_TOTAL_SIZE:], offset) + ProcessIncomingPacket(connectionState, receiveBuffer, conn, data[remainingCapacity:], offset) } } } diff --git a/plugins/autopeering/types/peerlist/peer_list.go b/plugins/autopeering/types/peerlist/peer_list.go index f132e5f0..a83dd135 100644 --- a/plugins/autopeering/types/peerlist/peer_list.go +++ b/plugins/autopeering/types/peerlist/peer_list.go @@ -22,7 +22,7 @@ func (this PeerList) Filter(predicate func(p *peer.Peer) bool) PeerList { } // Sorts the PeerRegister by their distance to an anchor. -func (this PeerList) Sort(distance func(p *peer.Peer) float64) PeerList { +func (this PeerList) Sort(distance func(p *peer.Peer) uint64) PeerList { sort.Slice(this, func(i, j int) bool { return distance(this[i]) < distance(this[j]) }) -- GitLab