Skip to content
Snippets Groups Projects
Commit 907a3861 authored by capossele's avatar capossele
Browse files

Merge branch 'develop' into chores/gossip-linter

parents 0a9bcdeb 2efe84ed
Branches
Tags
No related merge requests found
Showing
with 120 additions and 60 deletions
...@@ -5,8 +5,10 @@ import ( ...@@ -5,8 +5,10 @@ import (
) )
const ( const (
// CfgServerAddress defines the config flag of the analysis server address.
CfgServerAddress = "analysis.client.serverAddress" CfgServerAddress = "analysis.client.serverAddress"
ReportInterval = 5 // ReportInterval defines the interval of the reporting.
ReportInterval = 5
) )
func init() { func init() {
......
...@@ -7,21 +7,21 @@ import ( ...@@ -7,21 +7,21 @@ import (
"sync" "sync"
"time" "time"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/analysis/types/heartbeat" "github.com/iotaledger/goshimmer/plugins/analysis/types/heartbeat"
"github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/logger" "github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/network" "github.com/iotaledger/hive.go/network"
"github.com/iotaledger/hive.go/node" "github.com/iotaledger/hive.go/node"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/goshimmer/plugins/config"
) )
var log *logger.Logger var log *logger.Logger
var connLock sync.Mutex var connLock sync.Mutex
// Run runs the plugin.
func Run(plugin *node.Plugin) { func Run(plugin *node.Plugin) {
log = logger.NewLogger("Analysis-Client") log = logger.NewLogger("Analysis-Client")
daemon.BackgroundWorker("Analysis Client", func(shutdownSignal <-chan struct{}) { daemon.BackgroundWorker("Analysis Client", func(shutdownSignal <-chan struct{}) {
......
...@@ -2,6 +2,8 @@ package client ...@@ -2,6 +2,8 @@ package client
import "github.com/iotaledger/goshimmer/plugins/analysis/types/heartbeat" import "github.com/iotaledger/goshimmer/plugins/analysis/types/heartbeat"
// EventDispatchers holds the Heartbeat function.
type EventDispatchers struct { type EventDispatchers struct {
// Heartbeat defines the Heartbeat function.
Heartbeat func(*heartbeat.Packet) Heartbeat func(*heartbeat.Packet)
} }
package analysis package analysis
import ( import (
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
"github.com/iotaledger/goshimmer/plugins/analysis/client" "github.com/iotaledger/goshimmer/plugins/analysis/client"
"github.com/iotaledger/goshimmer/plugins/analysis/server" "github.com/iotaledger/goshimmer/plugins/analysis/server"
"github.com/iotaledger/goshimmer/plugins/analysis/webinterface" "github.com/iotaledger/goshimmer/plugins/analysis/webinterface"
"github.com/iotaledger/goshimmer/plugins/config" "github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
) )
// PluginName is the name of the analysis plugin. // PluginName is the name of the analysis plugin.
......
...@@ -7,7 +7,8 @@ import ( ...@@ -7,7 +7,8 @@ import (
) )
const ( const (
// IdleTimeout defines the idle timeout.
IdleTimeout = 10 * time.Second IdleTimeout = 10 * time.Second
// StateHeartbeat defines the state of the heartbeat.
StateHeartbeat = heartbeat.MarshaledPacketHeader StateHeartbeat = heartbeat.MarshaledPacketHeader
) )
...@@ -5,13 +5,20 @@ import ( ...@@ -5,13 +5,20 @@ import (
"github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/events"
) )
// Events holds the events of the analysis server package.
var Events = struct { var Events = struct {
AddNode *events.Event // AddNode triggers when adding a new node.
RemoveNode *events.Event AddNode *events.Event
ConnectNodes *events.Event // RemoveNode triggers when removing a node.
RemoveNode *events.Event
// ConnectNodes triggers when connecting two nodes.
ConnectNodes *events.Event
// DisconnectNodes triggers when disconnecting two nodes.
DisconnectNodes *events.Event DisconnectNodes *events.Event
Error *events.Event // Error triggers when an error occurs.
Heartbeat *events.Event Error *events.Event
// Heartbeat triggers when an heartbeat has been received.
Heartbeat *events.Event
}{ }{
events.NewEvent(stringCaller), events.NewEvent(stringCaller),
events.NewEvent(stringCaller), events.NewEvent(stringCaller),
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
) )
const ( const (
// CfgServerPort defines the config flag of the analysis server TCP port.
CfgServerPort = "analysis.server.port" CfgServerPort = "analysis.server.port"
) )
......
...@@ -3,25 +3,27 @@ package server ...@@ -3,25 +3,27 @@ package server
import ( import (
"errors" "errors"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/analysis/types/heartbeat"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger" "github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/network" "github.com/iotaledger/hive.go/network"
"github.com/iotaledger/hive.go/network/tcp" "github.com/iotaledger/hive.go/network/tcp"
"github.com/iotaledger/hive.go/node" "github.com/iotaledger/hive.go/node"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/analysis/types/heartbeat"
"github.com/iotaledger/goshimmer/plugins/config"
) )
var ( var (
ErrInvalidPackageHeader = errors.New("invalid package header") // ErrInvalidPackageHeader defines an invalid package header error.
ErrInvalidPackageHeader = errors.New("invalid package header")
// ErrExpectedInitialAddNodePackage defines an expected initial add node package error.
ErrExpectedInitialAddNodePackage = errors.New("expected initial add node package") ErrExpectedInitialAddNodePackage = errors.New("expected initial add node package")
server *tcp.TCPServer server *tcp.TCPServer
log *logger.Logger log *logger.Logger
) )
// Configure configures the plugin.
func Configure(plugin *node.Plugin) { func Configure(plugin *node.Plugin) {
log = logger.NewLogger("Analysis-Server") log = logger.NewLogger("Analysis-Server")
server = tcp.NewServer() server = tcp.NewServer()
...@@ -38,6 +40,7 @@ func Configure(plugin *node.Plugin) { ...@@ -38,6 +40,7 @@ func Configure(plugin *node.Plugin) {
})) }))
} }
// Run runs the plugin.
func Run(plugin *node.Plugin) { func Run(plugin *node.Plugin) {
daemon.BackgroundWorker("Analysis Server", func(shutdownSignal <-chan struct{}) { daemon.BackgroundWorker("Analysis Server", func(shutdownSignal <-chan struct{}) {
log.Infof("Starting Server (port %d) ... done", config.Node.GetInt(CfgServerPort)) log.Infof("Starting Server (port %d) ... done", config.Node.GetInt(CfgServerPort))
...@@ -47,15 +50,17 @@ func Run(plugin *node.Plugin) { ...@@ -47,15 +50,17 @@ func Run(plugin *node.Plugin) {
}, shutdown.PriorityAnalysis) }, shutdown.PriorityAnalysis)
} }
// Shutdown shutdowns the plugin.
func Shutdown() { func Shutdown() {
log.Info("Stopping Server ...") log.Info("Stopping Server ...")
server.Shutdown() server.Shutdown()
log.Info("Stopping Server ... done") log.Info("Stopping Server ... done")
} }
// HandleConnection handles the given connection.
func HandleConnection(conn *network.ManagedConnection) { func HandleConnection(conn *network.ManagedConnection) {
err := conn.SetTimeout(IdleTimeout) err := conn.SetTimeout(IdleTimeout)
if err!=nil { if err != nil {
log.Errorf(err.Error()) log.Errorf(err.Error())
} }
......
package server package server
// ConnectionState defines the type of a connection state as a byte
type ConnectionState = byte type ConnectionState = byte
...@@ -3,32 +3,44 @@ package heartbeat ...@@ -3,32 +3,44 @@ package heartbeat
import "crypto/sha256" import "crypto/sha256"
const ( const (
// MarshaledPacketHeader unique identifier of packet // MarshaledPacketHeader unique identifier of packet.
MarshaledPacketHeader = 0x01 MarshaledPacketHeader = 0x01
// MaxOutboundNeighborCount is the maximum number of allowed neighbors in one direction // MaxOutboundNeighborCount is the maximum number of allowed neighbors in one direction.
MaxOutboundNeighborCount = 4 MaxOutboundNeighborCount = 4
// MaxInboundNeighborCount is the maximum number of allowed neighbors in one direction // MaxInboundNeighborCount is the maximum number of allowed neighbors in one direction.
MaxInboundNeighborCount = 4 MaxInboundNeighborCount = 4
// MaxMarshaledTotalSize Maximum packet length in bytes // MaxMarshaledTotalSize Maximum packet length in bytes.
MaxMarshaledTotalSize = MarshaledPacketHeaderSize + MarshaledOwnIDSize + MaxMarshaledTotalSize = MarshaledPacketHeaderSize + MarshaledOwnIDSize +
MarshaledOutboundIDsLengthSize + MaxOutboundNeighborCount*MarshaledOutboundIDSize + MarshaledOutboundIDsLengthSize + MaxOutboundNeighborCount*MarshaledOutboundIDSize +
MarshaledInboundIDsLengthSize + MaxInboundNeighborCount*MarshaledInboundIDSize MarshaledInboundIDsLengthSize + MaxInboundNeighborCount*MarshaledInboundIDSize
// MarshaledPacketHeaderStart is the beginning of the MarshaledPacketHeader.
MarshaledPacketHeaderStart = 0 MarshaledPacketHeaderStart = 0
MarshaledPacketHeaderSize = 1 // MarshaledPacketHeaderSize is the size of the MarshaledPacketHeader.
MarshaledPacketHeaderEnd = MarshaledPacketHeaderStart + MarshaledPacketHeaderSize MarshaledPacketHeaderSize = 1
// MarshaledPacketHeaderEnd is the end of the MarshaledPacketHeader.
MarshaledPacketHeaderEnd = MarshaledPacketHeaderStart + MarshaledPacketHeaderSize
// MarshaledOwnIDStart is the beginning of the MarshaledOwnID.
MarshaledOwnIDStart = MarshaledPacketHeaderEnd MarshaledOwnIDStart = MarshaledPacketHeaderEnd
MarshaledOwnIDSize = sha256.Size // MarshaledOwnIDSize is the size of the MarshaledOwnID.
MarshaledOwnIDEnd = MarshaledOwnIDStart + MarshaledOwnIDSize MarshaledOwnIDSize = sha256.Size
// MarshaledOwnIDEnd is the end of the MarshaledOwnID.
MarshaledOwnIDEnd = MarshaledOwnIDStart + MarshaledOwnIDSize
// MarshaledOutboundIDsLengthStart is the beginning of the MarshaledOutboundIDsLength.
MarshaledOutboundIDsLengthStart = MarshaledOwnIDEnd MarshaledOutboundIDsLengthStart = MarshaledOwnIDEnd
MarshaledOutboundIDsLengthSize = 1 // MarshaledOutboundIDsLengthSize is the size of the MarshaledOutboundIDsLength.
MarshaledOutboundIDSize = sha256.Size MarshaledOutboundIDsLengthSize = 1
MarshaledOutboundIDsLengthEnd = MarshaledOutboundIDsLengthStart + MarshaledOutboundIDsLengthSize // MarshaledOutboundIDSize is the size of the MarshaledOutboundID.
MarshaledOutboundIDSize = sha256.Size
// MarshaledOutboundIDsLengthEnd is the end of the MarshaledOutboundIDsLength.
MarshaledOutboundIDsLengthEnd = MarshaledOutboundIDsLengthStart + MarshaledOutboundIDsLengthSize
// MarshaledInboundIDsLengthSize is the size of the MarshaledInboundIDsLength.
MarshaledInboundIDsLengthSize = 1 MarshaledInboundIDsLengthSize = 1
MarshaledInboundIDSize = sha256.Size // MarshaledInboundIDSize is the size of the MarshaledInboundID.
MarshaledInboundIDSize = sha256.Size
) )
...@@ -3,7 +3,9 @@ package heartbeat ...@@ -3,7 +3,9 @@ package heartbeat
import "errors" import "errors"
var ( var (
// ErrMalformedHeartbeatPacket defines the malformed heartbeat packet error.
ErrMalformedHeartbeatPacket = errors.New("malformed heartbeat packet") ErrMalformedHeartbeatPacket = errors.New("malformed heartbeat packet")
// ErrTooManyNeighborsToReport defines the too many neighbors to report in packet error.
ErrTooManyNeighborsToReport = errors.New("too many neighbors to report in packet") ErrTooManyNeighborsToReport = errors.New("too many neighbors to report in packet")
) )
...@@ -14,6 +16,7 @@ type Packet struct { ...@@ -14,6 +16,7 @@ type Packet struct {
InboundIDs [][]byte InboundIDs [][]byte
} }
// Unmarshal unmarshals a slice of byte and returns a *Packet and an error (nil if successful).
func Unmarshal(data []byte) (*Packet, error) { func Unmarshal(data []byte) (*Packet, error) {
// So far we are only sure about the static part // So far we are only sure about the static part
MarshaledTotalSize := MarshaledPacketHeaderSize + MarshaledOwnIDSize MarshaledTotalSize := MarshaledPacketHeaderSize + MarshaledOwnIDSize
...@@ -73,6 +76,7 @@ func Unmarshal(data []byte) (*Packet, error) { ...@@ -73,6 +76,7 @@ func Unmarshal(data []byte) (*Packet, error) {
} }
// Marshal marshals a given *Packet and returns the marshaled slice of byte and an error (nil if successful).
func (packet *Packet) Marshal() ([]byte, error) { func (packet *Packet) Marshal() ([]byte, error) {
// Calculate total needed bytes based on packet // Calculate total needed bytes based on packet
MarshaledTotalSize := MarshaledPacketHeaderSize + MarshaledOwnIDSize + MarshaledTotalSize := MarshaledPacketHeaderSize + MarshaledOwnIDSize +
...@@ -93,9 +97,8 @@ func (packet *Packet) Marshal() ([]byte, error) { ...@@ -93,9 +97,8 @@ func (packet *Packet) Marshal() ([]byte, error) {
lengthOutboundIDs := len(packet.OutboundIDs) lengthOutboundIDs := len(packet.OutboundIDs)
if lengthOutboundIDs > MaxOutboundNeighborCount { if lengthOutboundIDs > MaxOutboundNeighborCount {
return nil, ErrTooManyNeighborsToReport return nil, ErrTooManyNeighborsToReport
} else {
marshaledPackage[MarshaledOutboundIDsLengthStart] = byte(lengthOutboundIDs)
} }
marshaledPackage[MarshaledOutboundIDsLengthStart] = byte(lengthOutboundIDs)
// Copy contents of packet.OutboundIDs // Copy contents of packet.OutboundIDs
for i, outboundID := range packet.OutboundIDs { for i, outboundID := range packet.OutboundIDs {
...@@ -109,9 +112,8 @@ func (packet *Packet) Marshal() ([]byte, error) { ...@@ -109,9 +112,8 @@ func (packet *Packet) Marshal() ([]byte, error) {
lengthInboundIDs := len(packet.InboundIDs) lengthInboundIDs := len(packet.InboundIDs)
if lengthInboundIDs > MaxInboundNeighborCount { if lengthInboundIDs > MaxInboundNeighborCount {
return nil, ErrTooManyNeighborsToReport return nil, ErrTooManyNeighborsToReport
} else {
marshaledPackage[MarshaledInboundIdsLengthStart] = byte(lengthInboundIDs)
} }
marshaledPackage[MarshaledInboundIdsLengthStart] = byte(lengthInboundIDs)
// End of length is the start of inbound nodeId-s // End of length is the start of inbound nodeId-s
MarshaledInboundIdsLengthEnd := MarshaledInboundIdsLengthStart + MarshaledInboundIDsLengthSize MarshaledInboundIdsLengthEnd := MarshaledInboundIdsLengthStart + MarshaledInboundIDsLengthSize
......
package ping package ping
const ( const (
// MarshaledPacketHeader defines the MarshaledPacketHeader.
MarshaledPacketHeader = 0x00 MarshaledPacketHeader = 0x00
// MarshaledPacketHeaderStart defines the start of the MarshaledPacketHeader.
MarshaledPacketHeaderStart = 0 MarshaledPacketHeaderStart = 0
MarshaledPacketHeaderSize = 1 // MarshaledPacketHeaderSize defines the size of the MarshaledPacketHeader.
MarshaledPacketHeaderEnd = MarshaledPacketHeaderStart + MarshaledPacketHeaderSize MarshaledPacketHeaderSize = 1
// MarshaledPacketHeaderEnd defines the end of the MarshaledPacketHeader.
MarshaledPacketHeaderEnd = MarshaledPacketHeaderStart + MarshaledPacketHeaderSize
// MarshaledTotalSize defines the total size of the MarshaledPacketHeader.
MarshaledTotalSize = MarshaledPacketHeaderEnd MarshaledTotalSize = MarshaledPacketHeaderEnd
) )
...@@ -3,11 +3,14 @@ package ping ...@@ -3,11 +3,14 @@ package ping
import "errors" import "errors"
var ( var (
// ErrMalformedPingPacket defines the malformed ping packet error.
ErrMalformedPingPacket = errors.New("malformed ping packet") ErrMalformedPingPacket = errors.New("malformed ping packet")
) )
// Packet defines the Packet type as an empty struct.
type Packet struct{} type Packet struct{}
// Unmarshal unmarshals a given slice of byte and returns a *Packet and an error.
func Unmarshal(data []byte) (*Packet, error) { func Unmarshal(data []byte) (*Packet, error) {
if len(data) < MarshaledTotalSize || data[MarshaledPacketHeaderStart] != MarshaledPacketHeader { if len(data) < MarshaledTotalSize || data[MarshaledPacketHeaderStart] != MarshaledPacketHeader {
return nil, ErrMalformedPingPacket return nil, ErrMalformedPingPacket
...@@ -18,6 +21,7 @@ func Unmarshal(data []byte) (*Packet, error) { ...@@ -18,6 +21,7 @@ func Unmarshal(data []byte) (*Packet, error) {
return unmarshaledPacket, nil return unmarshaledPacket, nil
} }
// Marshal marshals a given *Packet and returns the marshaled slice of byte.
func (packet *Packet) Marshal() []byte { func (packet *Packet) Marshal() []byte {
marshaledPackage := make([]byte, MarshaledTotalSize) marshaledPackage := make([]byte, MarshaledTotalSize)
......
...@@ -5,8 +5,10 @@ import ( ...@@ -5,8 +5,10 @@ import (
) )
const ( const (
// CfgBindAddress defines the config flag of the analysis http server binding address.
CfgBindAddress = "analysis.httpServer.bindAddress" CfgBindAddress = "analysis.httpServer.bindAddress"
CfgDev = "analysis.httpServer.dev" // CfgDev defines the config flag of the analysis http server dev mode.
CfgDev = "analysis.httpServer.dev"
) )
func init() { func init() {
......
...@@ -6,14 +6,13 @@ import ( ...@@ -6,14 +6,13 @@ import (
"time" "time"
"github.com/gobuffalo/packr/v2" "github.com/gobuffalo/packr/v2"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/logger" "github.com/iotaledger/hive.go/logger"
"github.com/labstack/echo" "github.com/labstack/echo"
"golang.org/x/net/context" "golang.org/x/net/context"
"golang.org/x/net/websocket" "golang.org/x/net/websocket"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/config"
) )
var ( var (
...@@ -25,6 +24,7 @@ const name = "Analysis HTTP Server" ...@@ -25,6 +24,7 @@ const name = "Analysis HTTP Server"
var assetsBox = packr.New("Assets", "./static") var assetsBox = packr.New("Assets", "./static")
// Configure configures the plugin.
func Configure() { func Configure() {
log = logger.NewLogger(name) log = logger.NewLogger(name)
...@@ -45,6 +45,7 @@ func Configure() { ...@@ -45,6 +45,7 @@ func Configure() {
engine.GET("/datastream", echo.WrapHandler(websocket.Handler(dataStream))) engine.GET("/datastream", echo.WrapHandler(websocket.Handler(dataStream)))
} }
// Run runs the plugin.
func Run() { func Run() {
log.Infof("Starting %s ...", name) log.Infof("Starting %s ...", name)
if err := daemon.BackgroundWorker(name, start, shutdown.PriorityAnalysis); err != nil { if err := daemon.BackgroundWorker(name, start, shutdown.PriorityAnalysis); err != nil {
......
...@@ -6,11 +6,13 @@ import ( ...@@ -6,11 +6,13 @@ import (
"golang.org/x/net/websocket" "golang.org/x/net/websocket"
) )
// WebSocketChannel holds the websocket connection and its channel.
type WebSocketChannel struct { type WebSocketChannel struct {
ws *websocket.Conn ws *websocket.Conn
send chan string send chan string
} }
// NewWebSocketChannel returns a new *WebSocketChannel for the given websocket connection.
func NewWebSocketChannel(ws *websocket.Conn) *WebSocketChannel { func NewWebSocketChannel(ws *websocket.Conn) *WebSocketChannel {
wsChan := &WebSocketChannel{ wsChan := &WebSocketChannel{
ws: ws, ws: ws,
...@@ -22,10 +24,12 @@ func NewWebSocketChannel(ws *websocket.Conn) *WebSocketChannel { ...@@ -22,10 +24,12 @@ func NewWebSocketChannel(ws *websocket.Conn) *WebSocketChannel {
return wsChan return wsChan
} }
// Write writes into the given WebSocketChannel.
func (c *WebSocketChannel) Write(update string) { func (c *WebSocketChannel) Write(update string) {
c.send <- update c.send <- update
} }
// KeepAlive keeps the websocket connection alive.
func (c *WebSocketChannel) KeepAlive() { func (c *WebSocketChannel) KeepAlive() {
buf := make([]byte, 1) buf := make([]byte, 1)
for { for {
...@@ -37,6 +41,7 @@ func (c *WebSocketChannel) KeepAlive() { ...@@ -37,6 +41,7 @@ func (c *WebSocketChannel) KeepAlive() {
} }
} }
// Close closes the WebSocketChannel.
func (c *WebSocketChannel) Close() { func (c *WebSocketChannel) Close() {
close(c.send) close(c.send)
_ = c.ws.Close() _ = c.ws.Close()
......
...@@ -6,11 +6,13 @@ import ( ...@@ -6,11 +6,13 @@ import (
"github.com/iotaledger/hive.go/node" "github.com/iotaledger/hive.go/node"
) )
// Configure configures the plugin.
func Configure(plugin *node.Plugin) { func Configure(plugin *node.Plugin) {
httpserver.Configure() httpserver.Configure()
recordedevents.Configure(plugin) recordedevents.Configure(plugin)
} }
// Run runs the plugin.
func Run(plugin *node.Plugin) { func Run(plugin *node.Plugin) {
httpserver.Run() httpserver.Run()
recordedevents.Run() recordedevents.Run()
......
...@@ -23,6 +23,7 @@ var links = make(map[string]map[string]time.Time) ...@@ -23,6 +23,7 @@ var links = make(map[string]map[string]time.Time)
var lock sync.Mutex var lock sync.Mutex
// Configure configures the plugin.
func Configure(plugin *node.Plugin) { func Configure(plugin *node.Plugin) {
server.Events.Heartbeat.Attach(events.NewClosure(func(packet heartbeat.Packet) { server.Events.Heartbeat.Attach(events.NewClosure(func(packet heartbeat.Packet) {
var out strings.Builder var out strings.Builder
...@@ -42,15 +43,15 @@ func Configure(plugin *node.Plugin) { ...@@ -42,15 +43,15 @@ func Configure(plugin *node.Plugin) {
lock.Lock() lock.Lock()
defer lock.Unlock() defer lock.Unlock()
nodeIdString := hex.EncodeToString(packet.OwnID) nodeIDString := hex.EncodeToString(packet.OwnID)
timestamp := time.Now() timestamp := time.Now()
// When node is new, add to graph // When node is new, add to graph
if _, isAlready := nodes[nodeIdString]; !isAlready { if _, isAlready := nodes[nodeIDString]; !isAlready {
server.Events.AddNode.Trigger(nodeIdString) server.Events.AddNode.Trigger(nodeIDString)
} }
// Save it + update timestamp // Save it + update timestamp
nodes[nodeIdString] = timestamp nodes[nodeIDString] = timestamp
// Outgoing neighbor links update // Outgoing neighbor links update
for _, outgoingNeighbor := range packet.OutboundIDs { for _, outgoingNeighbor := range packet.OutboundIDs {
...@@ -65,17 +66,17 @@ func Configure(plugin *node.Plugin) { ...@@ -65,17 +66,17 @@ func Configure(plugin *node.Plugin) {
nodes[outgoingNeighborString] = timestamp nodes[outgoingNeighborString] = timestamp
// Do we have any links already with src=nodeIdString? // Do we have any links already with src=nodeIdString?
if _, isAlready := links[nodeIdString]; !isAlready { if _, isAlready := links[nodeIDString]; !isAlready {
// Nope, so we have to allocate an empty map to be nested in links for nodeIdString // Nope, so we have to allocate an empty map to be nested in links for nodeIdString
links[nodeIdString] = make(map[string]time.Time) links[nodeIDString] = make(map[string]time.Time)
} }
// Update graph when connection hasn't been seen before // Update graph when connection hasn't been seen before
if _, isAlready := links[nodeIdString][outgoingNeighborString]; !isAlready { if _, isAlready := links[nodeIDString][outgoingNeighborString]; !isAlready {
server.Events.ConnectNodes.Trigger(nodeIdString, outgoingNeighborString) server.Events.ConnectNodes.Trigger(nodeIDString, outgoingNeighborString)
} }
// Update links // Update links
links[nodeIdString][outgoingNeighborString] = timestamp links[nodeIDString][outgoingNeighborString] = timestamp
} }
// Incoming neighbor links update // Incoming neighbor links update
...@@ -97,15 +98,16 @@ func Configure(plugin *node.Plugin) { ...@@ -97,15 +98,16 @@ func Configure(plugin *node.Plugin) {
} }
// Update graph when connection hasn't been seen before // Update graph when connection hasn't been seen before
if _, isAlready := links[incomingNeighborString][nodeIdString]; !isAlready { if _, isAlready := links[incomingNeighborString][nodeIDString]; !isAlready {
server.Events.ConnectNodes.Trigger(incomingNeighborString, nodeIdString) server.Events.ConnectNodes.Trigger(incomingNeighborString, nodeIDString)
} }
// Update links map // Update links map
links[incomingNeighborString][nodeIdString] = timestamp links[incomingNeighborString][nodeIDString] = timestamp
} }
})) }))
} }
// Run runs the plugin.
func Run() { func Run() {
daemon.BackgroundWorker("Analysis Server Record Manager", func(shutdownSignal <-chan struct{}) { daemon.BackgroundWorker("Analysis Server Record Manager", func(shutdownSignal <-chan struct{}) {
ticker := time.NewTicker(CleanUpPeriod) ticker := time.NewTicker(CleanUpPeriod)
...@@ -171,6 +173,7 @@ func getEventsToReplay() (map[string]time.Time, map[string]map[string]time.Time) ...@@ -171,6 +173,7 @@ func getEventsToReplay() (map[string]time.Time, map[string]map[string]time.Time)
return copiedNodes, copiedLinks return copiedNodes, copiedLinks
} }
// Replay runs the handlers on the events to replay.
func Replay(handlers *types.EventHandlers) { func Replay(handlers *types.EventHandlers) {
copiedNodes, copiedLinks := getEventsToReplay() copiedNodes, copiedLinks := getEventsToReplay()
......
package types package types
// EventHandlers holds the handler for each event.
type EventHandlers = struct { type EventHandlers = struct {
AddNode func(nodeId string) // Addnode defines the handler called when adding a new node.
RemoveNode func(nodeId string) AddNode func(nodeId string)
ConnectNodes func(sourceId string, targetId string) // RemoveNode defines the handler called when adding removing a node.
RemoveNode func(nodeId string)
// ConnectNodes defines the handler called when connecting two nodes.
ConnectNodes func(sourceId string, targetId string)
// DisconnectNodes defines the handler called when connecting two nodes.
DisconnectNodes func(sourceId string, targetId string) DisconnectNodes func(sourceId string, targetId string)
} }
// EventHandlersConsumer defines the consumer function of an *EventHandlers.
type EventHandlersConsumer = func(handler *EventHandlers) type EventHandlersConsumer = func(handler *EventHandlers)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment