Skip to content
Snippets Groups Projects
Commit 2b27de92 authored by capossele's avatar capossele
Browse files

:rotating_light: Fixes linter warnings.

parent 35a0c58c
No related branches found
No related tags found
No related merge requests found
Showing
with 123 additions and 57 deletions
......@@ -8,5 +8,5 @@ import (
var PLUGINS = node.Plugins(
remotelog.PLUGIN,
analysis.PLUGIN,
analysis.Plugin,
)
......@@ -5,8 +5,10 @@ import (
)
const (
// CfgServerAddress defines the config flag of the analysis server address.
CfgServerAddress = "analysis.client.serverAddress"
ReportInterval = 5
// ReportInterval defines the interval of the reporting.
ReportInterval = 5
)
func init() {
......
......@@ -7,21 +7,21 @@ import (
"sync"
"time"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/analysis/types/heartbeat"
"github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/network"
"github.com/iotaledger/hive.go/node"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/goshimmer/plugins/config"
)
var log *logger.Logger
var connLock sync.Mutex
// Run runs the plugin.
func Run(plugin *node.Plugin) {
log = logger.NewLogger("Analysis-Client")
daemon.BackgroundWorker("Analysis Client", func(shutdownSignal <-chan struct{}) {
......
......@@ -2,6 +2,8 @@ package client
import "github.com/iotaledger/goshimmer/plugins/analysis/types/heartbeat"
// EventDispatchers holds the Heartbeat function.
type EventDispatchers struct {
// Heartbeat defines the Heartbeat function.
Heartbeat func(*heartbeat.Packet)
}
......@@ -10,11 +10,15 @@ import (
"github.com/iotaledger/goshimmer/plugins/config"
)
var PLUGIN = node.NewPlugin("Analysis", node.Enabled, configure, run)
var pluginName = "Analysis"
// Plugin defines the analysis plugin.
var Plugin = node.NewPlugin(pluginName, node.Enabled, configure, run)
var log *logger.Logger
func configure(plugin *node.Plugin) {
log = logger.NewLogger("Analysis")
log = logger.NewLogger(pluginName)
if config.Node.GetInt(server.CfgServerPort) != 0 {
webinterface.Configure(plugin)
server.Configure(plugin)
......
......@@ -7,7 +7,8 @@ import (
)
const (
// IdleTimeout defines the idle timeout.
IdleTimeout = 10 * time.Second
// StateHeartbeat defines the state of the heartbeat.
StateHeartbeat = heartbeat.MarshaledPacketHeader
)
......@@ -5,13 +5,20 @@ import (
"github.com/iotaledger/hive.go/events"
)
// Events holds the events of the analysis server package.
var Events = struct {
AddNode *events.Event
RemoveNode *events.Event
ConnectNodes *events.Event
// AddNode triggers when adding a new node.
AddNode *events.Event
// RemoveNode triggers when removing a node.
RemoveNode *events.Event
// ConnectNodes triggers when connecting two nodes.
ConnectNodes *events.Event
// DisconnectNodes triggers when disconnecting two nodes.
DisconnectNodes *events.Event
Error *events.Event
Heartbeat *events.Event
// Error triggers when an error occurs.
Error *events.Event
// Heartbeat triggers when an heartbeat has been received.
Heartbeat *events.Event
}{
events.NewEvent(stringCaller),
events.NewEvent(stringCaller),
......
......@@ -5,6 +5,7 @@ import (
)
const (
// CfgServerPort defines the config flag of the analysis server TCP port.
CfgServerPort = "analysis.server.port"
)
......
......@@ -3,25 +3,27 @@ package server
import (
"errors"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/analysis/types/heartbeat"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/network"
"github.com/iotaledger/hive.go/network/tcp"
"github.com/iotaledger/hive.go/node"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/analysis/types/heartbeat"
"github.com/iotaledger/goshimmer/plugins/config"
)
var (
ErrInvalidPackageHeader = errors.New("invalid package header")
// ErrInvalidPackageHeader defines an invalid package header error.
ErrInvalidPackageHeader = errors.New("invalid package header")
// ErrExpectedInitialAddNodePackage defines an expected initial add node package error.
ErrExpectedInitialAddNodePackage = errors.New("expected initial add node package")
server *tcp.TCPServer
log *logger.Logger
)
// Configure configures the plugin.
func Configure(plugin *node.Plugin) {
log = logger.NewLogger("Analysis-Server")
server = tcp.NewServer()
......@@ -38,6 +40,7 @@ func Configure(plugin *node.Plugin) {
}))
}
// Run runs the plugin.
func Run(plugin *node.Plugin) {
daemon.BackgroundWorker("Analysis Server", func(shutdownSignal <-chan struct{}) {
log.Infof("Starting Server (port %d) ... done", config.Node.GetInt(CfgServerPort))
......@@ -47,15 +50,17 @@ func Run(plugin *node.Plugin) {
}, shutdown.PriorityAnalysis)
}
// Shutdown shutdowns the plugin.
func Shutdown() {
log.Info("Stopping Server ...")
server.Shutdown()
log.Info("Stopping Server ... done")
}
// HandleConnection handles the given connection.
func HandleConnection(conn *network.ManagedConnection) {
err := conn.SetTimeout(IdleTimeout)
if err!=nil {
if err != nil {
log.Errorf(err.Error())
}
......
package server
// ConnectionState defines the type of a connection state as a byte
type ConnectionState = byte
......@@ -3,32 +3,44 @@ package heartbeat
import "crypto/sha256"
const (
// MarshaledPacketHeader unique identifier of packet
// MarshaledPacketHeader unique identifier of packet.
MarshaledPacketHeader = 0x01
// MaxOutboundNeighborCount is the maximum number of allowed neighbors in one direction
// MaxOutboundNeighborCount is the maximum number of allowed neighbors in one direction.
MaxOutboundNeighborCount = 4
// MaxInboundNeighborCount is the maximum number of allowed neighbors in one direction
MaxInboundNeighborCount = 4
// MaxInboundNeighborCount is the maximum number of allowed neighbors in one direction.
MaxInboundNeighborCount = 4
// MaxMarshaledTotalSize Maximum packet length in bytes
// MaxMarshaledTotalSize Maximum packet length in bytes.
MaxMarshaledTotalSize = MarshaledPacketHeaderSize + MarshaledOwnIDSize +
MarshaledOutboundIDsLengthSize + MaxOutboundNeighborCount*MarshaledOutboundIDSize +
MarshaledInboundIDsLengthSize + MaxInboundNeighborCount*MarshaledInboundIDSize
// MarshaledPacketHeaderStart is the beginning of the MarshaledPacketHeader.
MarshaledPacketHeaderStart = 0
MarshaledPacketHeaderSize = 1
MarshaledPacketHeaderEnd = MarshaledPacketHeaderStart + MarshaledPacketHeaderSize
// MarshaledPacketHeaderSize is the size of the MarshaledPacketHeader.
MarshaledPacketHeaderSize = 1
// MarshaledPacketHeaderEnd is the end of the MarshaledPacketHeader.
MarshaledPacketHeaderEnd = MarshaledPacketHeaderStart + MarshaledPacketHeaderSize
// MarshaledOwnIDStart is the beginning of the MarshaledOwnID.
MarshaledOwnIDStart = MarshaledPacketHeaderEnd
MarshaledOwnIDSize = sha256.Size
MarshaledOwnIDEnd = MarshaledOwnIDStart + MarshaledOwnIDSize
// MarshaledOwnIDSize is the size of the MarshaledOwnID.
MarshaledOwnIDSize = sha256.Size
// MarshaledOwnIDEnd is the end of the MarshaledOwnID.
MarshaledOwnIDEnd = MarshaledOwnIDStart + MarshaledOwnIDSize
// MarshaledOutboundIDsLengthStart is the beginning of the MarshaledOutboundIDsLength.
MarshaledOutboundIDsLengthStart = MarshaledOwnIDEnd
MarshaledOutboundIDsLengthSize = 1
MarshaledOutboundIDSize = sha256.Size
MarshaledOutboundIDsLengthEnd = MarshaledOutboundIDsLengthStart + MarshaledOutboundIDsLengthSize
// MarshaledOutboundIDsLengthSize is the size of the MarshaledOutboundIDsLength.
MarshaledOutboundIDsLengthSize = 1
// MarshaledOutboundIDSize is the size of the MarshaledOutboundID.
MarshaledOutboundIDSize = sha256.Size
// MarshaledOutboundIDsLengthEnd is the end of the MarshaledOutboundIDsLength.
MarshaledOutboundIDsLengthEnd = MarshaledOutboundIDsLengthStart + MarshaledOutboundIDsLengthSize
// MarshaledInboundIDsLengthSize is the size of the MarshaledInboundIDsLength.
MarshaledInboundIDsLengthSize = 1
MarshaledInboundIDSize = sha256.Size
// MarshaledInboundIDSize is the size of the MarshaledInboundID.
MarshaledInboundIDSize = sha256.Size
)
......@@ -3,7 +3,9 @@ package heartbeat
import "errors"
var (
// ErrMalformedHeartbeatPacket defines the malformed heartbeat packet error.
ErrMalformedHeartbeatPacket = errors.New("malformed heartbeat packet")
// ErrTooManyNeighborsToReport defines the too many neighbors to report in packet error.
ErrTooManyNeighborsToReport = errors.New("too many neighbors to report in packet")
)
......@@ -14,6 +16,7 @@ type Packet struct {
InboundIDs [][]byte
}
// Unmarshal unmarshals a slice of byte and returns a *Packet and an error (nil if successful).
func Unmarshal(data []byte) (*Packet, error) {
// So far we are only sure about the static part
MarshaledTotalSize := MarshaledPacketHeaderSize + MarshaledOwnIDSize
......@@ -73,6 +76,7 @@ func Unmarshal(data []byte) (*Packet, error) {
}
// Marshal marshals a given *Packet and returns the marshaled slice of byte and an error (nil if successful).
func (packet *Packet) Marshal() ([]byte, error) {
// Calculate total needed bytes based on packet
MarshaledTotalSize := MarshaledPacketHeaderSize + MarshaledOwnIDSize +
......@@ -93,9 +97,8 @@ func (packet *Packet) Marshal() ([]byte, error) {
lengthOutboundIDs := len(packet.OutboundIDs)
if lengthOutboundIDs > MaxOutboundNeighborCount {
return nil, ErrTooManyNeighborsToReport
} else {
marshaledPackage[MarshaledOutboundIDsLengthStart] = byte(lengthOutboundIDs)
}
marshaledPackage[MarshaledOutboundIDsLengthStart] = byte(lengthOutboundIDs)
// Copy contents of packet.OutboundIDs
for i, outboundID := range packet.OutboundIDs {
......@@ -109,9 +112,8 @@ func (packet *Packet) Marshal() ([]byte, error) {
lengthInboundIDs := len(packet.InboundIDs)
if lengthInboundIDs > MaxInboundNeighborCount {
return nil, ErrTooManyNeighborsToReport
} else {
marshaledPackage[MarshaledInboundIdsLengthStart] = byte(lengthInboundIDs)
}
marshaledPackage[MarshaledInboundIdsLengthStart] = byte(lengthInboundIDs)
// End of length is the start of inbound nodeId-s
MarshaledInboundIdsLengthEnd := MarshaledInboundIdsLengthStart + MarshaledInboundIDsLengthSize
......
package ping
const (
// MarshaledPacketHeader defines the MarshaledPacketHeader.
MarshaledPacketHeader = 0x00
// MarshaledPacketHeaderStart defines the start of the MarshaledPacketHeader.
MarshaledPacketHeaderStart = 0
MarshaledPacketHeaderSize = 1
MarshaledPacketHeaderEnd = MarshaledPacketHeaderStart + MarshaledPacketHeaderSize
// MarshaledPacketHeaderSize defines the size of the MarshaledPacketHeader.
MarshaledPacketHeaderSize = 1
// MarshaledPacketHeaderEnd defines the end of the MarshaledPacketHeader.
MarshaledPacketHeaderEnd = MarshaledPacketHeaderStart + MarshaledPacketHeaderSize
// MarshaledTotalSize defines the total size of the MarshaledPacketHeader.
MarshaledTotalSize = MarshaledPacketHeaderEnd
)
......@@ -3,11 +3,14 @@ package ping
import "errors"
var (
// ErrMalformedPingPacket defines the malformed ping packet error.
ErrMalformedPingPacket = errors.New("malformed ping packet")
)
// Packet defines the Packet type as an empty struct.
type Packet struct{}
// Unmarshal unmarshals a given slice of byte and returns a *Packet and an error.
func Unmarshal(data []byte) (*Packet, error) {
if len(data) < MarshaledTotalSize || data[MarshaledPacketHeaderStart] != MarshaledPacketHeader {
return nil, ErrMalformedPingPacket
......@@ -18,6 +21,7 @@ func Unmarshal(data []byte) (*Packet, error) {
return unmarshaledPacket, nil
}
// Marshal marshals a given *Packet and returns the marshaled slice of byte.
func (packet *Packet) Marshal() []byte {
marshaledPackage := make([]byte, MarshaledTotalSize)
......
......@@ -5,8 +5,10 @@ import (
)
const (
// CfgBindAddress defines the config flag of the analysis http server binding address.
CfgBindAddress = "analysis.httpServer.bindAddress"
CfgDev = "analysis.httpServer.dev"
// CfgDev defines the config flag of the analysis http server dev mode.
CfgDev = "analysis.httpServer.dev"
)
func init() {
......
......@@ -25,6 +25,7 @@ const name = "Analysis HTTP Server"
var assetsBox = packr.New("Assets", "./static")
// Configure configures the plugin.
func Configure() {
log = logger.NewLogger(name)
......@@ -45,6 +46,7 @@ func Configure() {
engine.GET("/datastream", echo.WrapHandler(websocket.Handler(dataStream)))
}
// Run runs the plugin.
func Run() {
log.Infof("Starting %s ...", name)
if err := daemon.BackgroundWorker(name, start, shutdown.PriorityAnalysis); err != nil {
......
......@@ -6,11 +6,13 @@ import (
"golang.org/x/net/websocket"
)
// WebSocketChannel holds the websocket connection and its channel.
type WebSocketChannel struct {
ws *websocket.Conn
send chan string
}
// NewWebSocketChannel returns a new *WebSocketChannel for the given websocket connection.
func NewWebSocketChannel(ws *websocket.Conn) *WebSocketChannel {
wsChan := &WebSocketChannel{
ws: ws,
......@@ -22,10 +24,12 @@ func NewWebSocketChannel(ws *websocket.Conn) *WebSocketChannel {
return wsChan
}
// Write writes into the given WebSocketChannel.
func (c *WebSocketChannel) Write(update string) {
c.send <- update
}
// KeepAlive keeps the websocket connection alive.
func (c *WebSocketChannel) KeepAlive() {
buf := make([]byte, 1)
for {
......@@ -37,6 +41,7 @@ func (c *WebSocketChannel) KeepAlive() {
}
}
// Close closes the WebSocketChannel.
func (c *WebSocketChannel) Close() {
close(c.send)
_ = c.ws.Close()
......
......@@ -6,11 +6,13 @@ import (
"github.com/iotaledger/hive.go/node"
)
// Configure configures the plugin.
func Configure(plugin *node.Plugin) {
httpserver.Configure()
recordedevents.Configure(plugin)
}
// Run runs the plugin.
func Run(plugin *node.Plugin) {
httpserver.Run()
recordedevents.Run()
......
......@@ -23,6 +23,7 @@ var links = make(map[string]map[string]time.Time)
var lock sync.Mutex
// Configure configures the plugin.
func Configure(plugin *node.Plugin) {
server.Events.Heartbeat.Attach(events.NewClosure(func(packet heartbeat.Packet) {
var out strings.Builder
......@@ -42,15 +43,15 @@ func Configure(plugin *node.Plugin) {
lock.Lock()
defer lock.Unlock()
nodeIdString := hex.EncodeToString(packet.OwnID)
nodeIDString := hex.EncodeToString(packet.OwnID)
timestamp := time.Now()
// When node is new, add to graph
if _, isAlready := nodes[nodeIdString]; !isAlready {
server.Events.AddNode.Trigger(nodeIdString)
if _, isAlready := nodes[nodeIDString]; !isAlready {
server.Events.AddNode.Trigger(nodeIDString)
}
// Save it + update timestamp
nodes[nodeIdString] = timestamp
nodes[nodeIDString] = timestamp
// Outgoing neighbor links update
for _, outgoingNeighbor := range packet.OutboundIDs {
......@@ -65,17 +66,17 @@ func Configure(plugin *node.Plugin) {
nodes[outgoingNeighborString] = timestamp
// Do we have any links already with src=nodeIdString?
if _, isAlready := links[nodeIdString]; !isAlready {
if _, isAlready := links[nodeIDString]; !isAlready {
// Nope, so we have to allocate an empty map to be nested in links for nodeIdString
links[nodeIdString] = make(map[string]time.Time)
links[nodeIDString] = make(map[string]time.Time)
}
// Update graph when connection hasn't been seen before
if _, isAlready := links[nodeIdString][outgoingNeighborString]; !isAlready {
server.Events.ConnectNodes.Trigger(nodeIdString, outgoingNeighborString)
if _, isAlready := links[nodeIDString][outgoingNeighborString]; !isAlready {
server.Events.ConnectNodes.Trigger(nodeIDString, outgoingNeighborString)
}
// Update links
links[nodeIdString][outgoingNeighborString] = timestamp
links[nodeIDString][outgoingNeighborString] = timestamp
}
// Incoming neighbor links update
......@@ -97,15 +98,16 @@ func Configure(plugin *node.Plugin) {
}
// Update graph when connection hasn't been seen before
if _, isAlready := links[incomingNeighborString][nodeIdString]; !isAlready {
server.Events.ConnectNodes.Trigger(incomingNeighborString, nodeIdString)
if _, isAlready := links[incomingNeighborString][nodeIDString]; !isAlready {
server.Events.ConnectNodes.Trigger(incomingNeighborString, nodeIDString)
}
// Update links map
links[incomingNeighborString][nodeIdString] = timestamp
links[incomingNeighborString][nodeIDString] = timestamp
}
}))
}
// Run runs the plugin.
func Run() {
daemon.BackgroundWorker("Analysis Server Record Manager", func(shutdownSignal <-chan struct{}) {
ticker := time.NewTicker(CleanUpPeriod)
......@@ -171,6 +173,7 @@ func getEventsToReplay() (map[string]time.Time, map[string]map[string]time.Time)
return copiedNodes, copiedLinks
}
// Replay runs the handlers on the events to replay.
func Replay(handlers *types.EventHandlers) {
copiedNodes, copiedLinks := getEventsToReplay()
......
package types
// EventHandlers holds the handler for each event.
type EventHandlers = struct {
AddNode func(nodeId string)
RemoveNode func(nodeId string)
ConnectNodes func(sourceId string, targetId string)
// Addnode defines the handler called when adding a new node.
AddNode func(nodeId string)
// RemoveNode defines the handler called when adding removing a node.
RemoveNode func(nodeId string)
// ConnectNodes defines the handler called when connecting two nodes.
ConnectNodes func(sourceId string, targetId string)
// DisconnectNodes defines the handler called when connecting two nodes.
DisconnectNodes func(sourceId string, targetId string)
}
// EventHandlersConsumer defines the consumer function of an *EventHandlers.
type EventHandlersConsumer = func(handler *EventHandlers)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment