Skip to content
Snippets Groups Projects
Unverified Commit 2efe84ed authored by Angelo Capossele's avatar Angelo Capossele Committed by GitHub
Browse files

Merge pull request #353 from iotaledger/chores/analysis-linter

Fix the analysis plugin linter warnings
parents e96061cc 7a78cc58
No related branches found
No related tags found
No related merge requests found
Showing
with 120 additions and 60 deletions
......@@ -5,7 +5,9 @@ import (
)
const (
// CfgServerAddress defines the config flag of the analysis server address.
CfgServerAddress = "analysis.client.serverAddress"
// ReportInterval defines the interval of the reporting.
ReportInterval = 5
)
......
......@@ -7,21 +7,21 @@ import (
"sync"
"time"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/analysis/types/heartbeat"
"github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/network"
"github.com/iotaledger/hive.go/node"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/goshimmer/plugins/config"
)
var log *logger.Logger
var connLock sync.Mutex
// Run runs the plugin.
func Run(plugin *node.Plugin) {
log = logger.NewLogger("Analysis-Client")
daemon.BackgroundWorker("Analysis Client", func(shutdownSignal <-chan struct{}) {
......
......@@ -2,6 +2,8 @@ package client
import "github.com/iotaledger/goshimmer/plugins/analysis/types/heartbeat"
// EventDispatchers holds the Heartbeat function.
type EventDispatchers struct {
// Heartbeat defines the Heartbeat function.
Heartbeat func(*heartbeat.Packet)
}
package analysis
import (
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
"github.com/iotaledger/goshimmer/plugins/analysis/client"
"github.com/iotaledger/goshimmer/plugins/analysis/server"
"github.com/iotaledger/goshimmer/plugins/analysis/webinterface"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
)
// PluginName is the name of the analysis plugin.
......
......@@ -7,7 +7,8 @@ import (
)
const (
// IdleTimeout defines the idle timeout.
IdleTimeout = 10 * time.Second
// StateHeartbeat defines the state of the heartbeat.
StateHeartbeat = heartbeat.MarshaledPacketHeader
)
......@@ -5,12 +5,19 @@ import (
"github.com/iotaledger/hive.go/events"
)
// Events holds the events of the analysis server package.
var Events = struct {
// AddNode triggers when adding a new node.
AddNode *events.Event
// RemoveNode triggers when removing a node.
RemoveNode *events.Event
// ConnectNodes triggers when connecting two nodes.
ConnectNodes *events.Event
// DisconnectNodes triggers when disconnecting two nodes.
DisconnectNodes *events.Event
// Error triggers when an error occurs.
Error *events.Event
// Heartbeat triggers when an heartbeat has been received.
Heartbeat *events.Event
}{
events.NewEvent(stringCaller),
......
......@@ -5,6 +5,7 @@ import (
)
const (
// CfgServerPort defines the config flag of the analysis server TCP port.
CfgServerPort = "analysis.server.port"
)
......
......@@ -3,25 +3,27 @@ package server
import (
"errors"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/analysis/types/heartbeat"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/network"
"github.com/iotaledger/hive.go/network/tcp"
"github.com/iotaledger/hive.go/node"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/analysis/types/heartbeat"
"github.com/iotaledger/goshimmer/plugins/config"
)
var (
// ErrInvalidPackageHeader defines an invalid package header error.
ErrInvalidPackageHeader = errors.New("invalid package header")
// ErrExpectedInitialAddNodePackage defines an expected initial add node package error.
ErrExpectedInitialAddNodePackage = errors.New("expected initial add node package")
server *tcp.TCPServer
log *logger.Logger
)
// Configure configures the plugin.
func Configure(plugin *node.Plugin) {
log = logger.NewLogger("Analysis-Server")
server = tcp.NewServer()
......@@ -38,6 +40,7 @@ func Configure(plugin *node.Plugin) {
}))
}
// Run runs the plugin.
func Run(plugin *node.Plugin) {
daemon.BackgroundWorker("Analysis Server", func(shutdownSignal <-chan struct{}) {
log.Infof("Starting Server (port %d) ... done", config.Node.GetInt(CfgServerPort))
......@@ -47,12 +50,14 @@ func Run(plugin *node.Plugin) {
}, shutdown.PriorityAnalysis)
}
// Shutdown shutdowns the plugin.
func Shutdown() {
log.Info("Stopping Server ...")
server.Shutdown()
log.Info("Stopping Server ... done")
}
// HandleConnection handles the given connection.
func HandleConnection(conn *network.ManagedConnection) {
err := conn.SetTimeout(IdleTimeout)
if err != nil {
......
package server
// ConnectionState defines the type of a connection state as a byte
type ConnectionState = byte
......@@ -3,32 +3,44 @@ package heartbeat
import "crypto/sha256"
const (
// MarshaledPacketHeader unique identifier of packet
// MarshaledPacketHeader unique identifier of packet.
MarshaledPacketHeader = 0x01
// MaxOutboundNeighborCount is the maximum number of allowed neighbors in one direction
// MaxOutboundNeighborCount is the maximum number of allowed neighbors in one direction.
MaxOutboundNeighborCount = 4
// MaxInboundNeighborCount is the maximum number of allowed neighbors in one direction
// MaxInboundNeighborCount is the maximum number of allowed neighbors in one direction.
MaxInboundNeighborCount = 4
// MaxMarshaledTotalSize Maximum packet length in bytes
// MaxMarshaledTotalSize Maximum packet length in bytes.
MaxMarshaledTotalSize = MarshaledPacketHeaderSize + MarshaledOwnIDSize +
MarshaledOutboundIDsLengthSize + MaxOutboundNeighborCount*MarshaledOutboundIDSize +
MarshaledInboundIDsLengthSize + MaxInboundNeighborCount*MarshaledInboundIDSize
// MarshaledPacketHeaderStart is the beginning of the MarshaledPacketHeader.
MarshaledPacketHeaderStart = 0
// MarshaledPacketHeaderSize is the size of the MarshaledPacketHeader.
MarshaledPacketHeaderSize = 1
// MarshaledPacketHeaderEnd is the end of the MarshaledPacketHeader.
MarshaledPacketHeaderEnd = MarshaledPacketHeaderStart + MarshaledPacketHeaderSize
// MarshaledOwnIDStart is the beginning of the MarshaledOwnID.
MarshaledOwnIDStart = MarshaledPacketHeaderEnd
// MarshaledOwnIDSize is the size of the MarshaledOwnID.
MarshaledOwnIDSize = sha256.Size
// MarshaledOwnIDEnd is the end of the MarshaledOwnID.
MarshaledOwnIDEnd = MarshaledOwnIDStart + MarshaledOwnIDSize
// MarshaledOutboundIDsLengthStart is the beginning of the MarshaledOutboundIDsLength.
MarshaledOutboundIDsLengthStart = MarshaledOwnIDEnd
// MarshaledOutboundIDsLengthSize is the size of the MarshaledOutboundIDsLength.
MarshaledOutboundIDsLengthSize = 1
// MarshaledOutboundIDSize is the size of the MarshaledOutboundID.
MarshaledOutboundIDSize = sha256.Size
// MarshaledOutboundIDsLengthEnd is the end of the MarshaledOutboundIDsLength.
MarshaledOutboundIDsLengthEnd = MarshaledOutboundIDsLengthStart + MarshaledOutboundIDsLengthSize
// MarshaledInboundIDsLengthSize is the size of the MarshaledInboundIDsLength.
MarshaledInboundIDsLengthSize = 1
// MarshaledInboundIDSize is the size of the MarshaledInboundID.
MarshaledInboundIDSize = sha256.Size
)
......@@ -3,7 +3,9 @@ package heartbeat
import "errors"
var (
// ErrMalformedHeartbeatPacket defines the malformed heartbeat packet error.
ErrMalformedHeartbeatPacket = errors.New("malformed heartbeat packet")
// ErrTooManyNeighborsToReport defines the too many neighbors to report in packet error.
ErrTooManyNeighborsToReport = errors.New("too many neighbors to report in packet")
)
......@@ -14,6 +16,7 @@ type Packet struct {
InboundIDs [][]byte
}
// Unmarshal unmarshals a slice of byte and returns a *Packet and an error (nil if successful).
func Unmarshal(data []byte) (*Packet, error) {
// So far we are only sure about the static part
MarshaledTotalSize := MarshaledPacketHeaderSize + MarshaledOwnIDSize
......@@ -73,6 +76,7 @@ func Unmarshal(data []byte) (*Packet, error) {
}
// Marshal marshals a given *Packet and returns the marshaled slice of byte and an error (nil if successful).
func (packet *Packet) Marshal() ([]byte, error) {
// Calculate total needed bytes based on packet
MarshaledTotalSize := MarshaledPacketHeaderSize + MarshaledOwnIDSize +
......@@ -93,9 +97,8 @@ func (packet *Packet) Marshal() ([]byte, error) {
lengthOutboundIDs := len(packet.OutboundIDs)
if lengthOutboundIDs > MaxOutboundNeighborCount {
return nil, ErrTooManyNeighborsToReport
} else {
marshaledPackage[MarshaledOutboundIDsLengthStart] = byte(lengthOutboundIDs)
}
marshaledPackage[MarshaledOutboundIDsLengthStart] = byte(lengthOutboundIDs)
// Copy contents of packet.OutboundIDs
for i, outboundID := range packet.OutboundIDs {
......@@ -109,9 +112,8 @@ func (packet *Packet) Marshal() ([]byte, error) {
lengthInboundIDs := len(packet.InboundIDs)
if lengthInboundIDs > MaxInboundNeighborCount {
return nil, ErrTooManyNeighborsToReport
} else {
marshaledPackage[MarshaledInboundIdsLengthStart] = byte(lengthInboundIDs)
}
marshaledPackage[MarshaledInboundIdsLengthStart] = byte(lengthInboundIDs)
// End of length is the start of inbound nodeId-s
MarshaledInboundIdsLengthEnd := MarshaledInboundIdsLengthStart + MarshaledInboundIDsLengthSize
......
package ping
const (
// MarshaledPacketHeader defines the MarshaledPacketHeader.
MarshaledPacketHeader = 0x00
// MarshaledPacketHeaderStart defines the start of the MarshaledPacketHeader.
MarshaledPacketHeaderStart = 0
// MarshaledPacketHeaderSize defines the size of the MarshaledPacketHeader.
MarshaledPacketHeaderSize = 1
// MarshaledPacketHeaderEnd defines the end of the MarshaledPacketHeader.
MarshaledPacketHeaderEnd = MarshaledPacketHeaderStart + MarshaledPacketHeaderSize
// MarshaledTotalSize defines the total size of the MarshaledPacketHeader.
MarshaledTotalSize = MarshaledPacketHeaderEnd
)
......@@ -3,11 +3,14 @@ package ping
import "errors"
var (
// ErrMalformedPingPacket defines the malformed ping packet error.
ErrMalformedPingPacket = errors.New("malformed ping packet")
)
// Packet defines the Packet type as an empty struct.
type Packet struct{}
// Unmarshal unmarshals a given slice of byte and returns a *Packet and an error.
func Unmarshal(data []byte) (*Packet, error) {
if len(data) < MarshaledTotalSize || data[MarshaledPacketHeaderStart] != MarshaledPacketHeader {
return nil, ErrMalformedPingPacket
......@@ -18,6 +21,7 @@ func Unmarshal(data []byte) (*Packet, error) {
return unmarshaledPacket, nil
}
// Marshal marshals a given *Packet and returns the marshaled slice of byte.
func (packet *Packet) Marshal() []byte {
marshaledPackage := make([]byte, MarshaledTotalSize)
......
......@@ -5,7 +5,9 @@ import (
)
const (
// CfgBindAddress defines the config flag of the analysis http server binding address.
CfgBindAddress = "analysis.httpServer.bindAddress"
// CfgDev defines the config flag of the analysis http server dev mode.
CfgDev = "analysis.httpServer.dev"
)
......
......@@ -6,14 +6,13 @@ import (
"time"
"github.com/gobuffalo/packr/v2"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/logger"
"github.com/labstack/echo"
"golang.org/x/net/context"
"golang.org/x/net/websocket"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/config"
)
var (
......@@ -25,6 +24,7 @@ const name = "Analysis HTTP Server"
var assetsBox = packr.New("Assets", "./static")
// Configure configures the plugin.
func Configure() {
log = logger.NewLogger(name)
......@@ -45,6 +45,7 @@ func Configure() {
engine.GET("/datastream", echo.WrapHandler(websocket.Handler(dataStream)))
}
// Run runs the plugin.
func Run() {
log.Infof("Starting %s ...", name)
if err := daemon.BackgroundWorker(name, start, shutdown.PriorityAnalysis); err != nil {
......
......@@ -6,11 +6,13 @@ import (
"golang.org/x/net/websocket"
)
// WebSocketChannel holds the websocket connection and its channel.
type WebSocketChannel struct {
ws *websocket.Conn
send chan string
}
// NewWebSocketChannel returns a new *WebSocketChannel for the given websocket connection.
func NewWebSocketChannel(ws *websocket.Conn) *WebSocketChannel {
wsChan := &WebSocketChannel{
ws: ws,
......@@ -22,10 +24,12 @@ func NewWebSocketChannel(ws *websocket.Conn) *WebSocketChannel {
return wsChan
}
// Write writes into the given WebSocketChannel.
func (c *WebSocketChannel) Write(update string) {
c.send <- update
}
// KeepAlive keeps the websocket connection alive.
func (c *WebSocketChannel) KeepAlive() {
buf := make([]byte, 1)
for {
......@@ -37,6 +41,7 @@ func (c *WebSocketChannel) KeepAlive() {
}
}
// Close closes the WebSocketChannel.
func (c *WebSocketChannel) Close() {
close(c.send)
_ = c.ws.Close()
......
......@@ -6,11 +6,13 @@ import (
"github.com/iotaledger/hive.go/node"
)
// Configure configures the plugin.
func Configure(plugin *node.Plugin) {
httpserver.Configure()
recordedevents.Configure(plugin)
}
// Run runs the plugin.
func Run(plugin *node.Plugin) {
httpserver.Run()
recordedevents.Run()
......
......@@ -23,6 +23,7 @@ var links = make(map[string]map[string]time.Time)
var lock sync.Mutex
// Configure configures the plugin.
func Configure(plugin *node.Plugin) {
server.Events.Heartbeat.Attach(events.NewClosure(func(packet heartbeat.Packet) {
var out strings.Builder
......@@ -42,15 +43,15 @@ func Configure(plugin *node.Plugin) {
lock.Lock()
defer lock.Unlock()
nodeIdString := hex.EncodeToString(packet.OwnID)
nodeIDString := hex.EncodeToString(packet.OwnID)
timestamp := time.Now()
// When node is new, add to graph
if _, isAlready := nodes[nodeIdString]; !isAlready {
server.Events.AddNode.Trigger(nodeIdString)
if _, isAlready := nodes[nodeIDString]; !isAlready {
server.Events.AddNode.Trigger(nodeIDString)
}
// Save it + update timestamp
nodes[nodeIdString] = timestamp
nodes[nodeIDString] = timestamp
// Outgoing neighbor links update
for _, outgoingNeighbor := range packet.OutboundIDs {
......@@ -65,17 +66,17 @@ func Configure(plugin *node.Plugin) {
nodes[outgoingNeighborString] = timestamp
// Do we have any links already with src=nodeIdString?
if _, isAlready := links[nodeIdString]; !isAlready {
if _, isAlready := links[nodeIDString]; !isAlready {
// Nope, so we have to allocate an empty map to be nested in links for nodeIdString
links[nodeIdString] = make(map[string]time.Time)
links[nodeIDString] = make(map[string]time.Time)
}
// Update graph when connection hasn't been seen before
if _, isAlready := links[nodeIdString][outgoingNeighborString]; !isAlready {
server.Events.ConnectNodes.Trigger(nodeIdString, outgoingNeighborString)
if _, isAlready := links[nodeIDString][outgoingNeighborString]; !isAlready {
server.Events.ConnectNodes.Trigger(nodeIDString, outgoingNeighborString)
}
// Update links
links[nodeIdString][outgoingNeighborString] = timestamp
links[nodeIDString][outgoingNeighborString] = timestamp
}
// Incoming neighbor links update
......@@ -97,15 +98,16 @@ func Configure(plugin *node.Plugin) {
}
// Update graph when connection hasn't been seen before
if _, isAlready := links[incomingNeighborString][nodeIdString]; !isAlready {
server.Events.ConnectNodes.Trigger(incomingNeighborString, nodeIdString)
if _, isAlready := links[incomingNeighborString][nodeIDString]; !isAlready {
server.Events.ConnectNodes.Trigger(incomingNeighborString, nodeIDString)
}
// Update links map
links[incomingNeighborString][nodeIdString] = timestamp
links[incomingNeighborString][nodeIDString] = timestamp
}
}))
}
// Run runs the plugin.
func Run() {
daemon.BackgroundWorker("Analysis Server Record Manager", func(shutdownSignal <-chan struct{}) {
ticker := time.NewTicker(CleanUpPeriod)
......@@ -171,6 +173,7 @@ func getEventsToReplay() (map[string]time.Time, map[string]map[string]time.Time)
return copiedNodes, copiedLinks
}
// Replay runs the handlers on the events to replay.
func Replay(handlers *types.EventHandlers) {
copiedNodes, copiedLinks := getEventsToReplay()
......
package types
// EventHandlers holds the handler for each event.
type EventHandlers = struct {
// Addnode defines the handler called when adding a new node.
AddNode func(nodeId string)
// RemoveNode defines the handler called when adding removing a node.
RemoveNode func(nodeId string)
// ConnectNodes defines the handler called when connecting two nodes.
ConnectNodes func(sourceId string, targetId string)
// DisconnectNodes defines the handler called when connecting two nodes.
DisconnectNodes func(sourceId string, targetId string)
}
// EventHandlersConsumer defines the consumer function of an *EventHandlers.
type EventHandlersConsumer = func(handler *EventHandlers)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment