Skip to content
Snippets Groups Projects
Commit 3797ffc8 authored by capossele's avatar capossele
Browse files

Merge remote-tracking branch 'origin/use-hive-logger-daemon-node' into feat/new-autopeering

parents fa696c32 1ddd1aaf
Branches
Tags
No related merge requests found
Showing
with 41 additions and 532 deletions
package main
import (
"github.com/iotaledger/goshimmer/packages/node"
"github.com/iotaledger/goshimmer/plugins/analysis"
"github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/bundleprocessor"
......@@ -21,6 +20,7 @@ import (
webapi_spammer "github.com/iotaledger/goshimmer/plugins/webapi-spammer"
"github.com/iotaledger/goshimmer/plugins/webauth"
"github.com/iotaledger/goshimmer/plugins/zeromq"
"github.com/iotaledger/hive.go/node"
)
func main() {
......
package daemon
import (
"sync"
)
var (
running bool
wg sync.WaitGroup
ShutdownSignal = make(chan int, 1)
backgroundWorkers = make([]func(), 0)
backgroundWorkerNames = make([]string, 0)
runningBackgroundWorkers = make(map[string]bool)
lock = sync.Mutex{}
)
func GetRunningBackgroundWorkers() []string {
lock.Lock()
result := make([]string, 0)
for runningBackgroundWorker := range runningBackgroundWorkers {
result = append(result, runningBackgroundWorker)
}
lock.Unlock()
return result
}
func runBackgroundWorker(name string, backgroundWorker func()) {
wg.Add(1)
go func() {
lock.Lock()
runningBackgroundWorkers[name] = true
lock.Unlock()
backgroundWorker()
lock.Lock()
delete(runningBackgroundWorkers, name)
lock.Unlock()
wg.Done()
}()
}
func BackgroundWorker(name string, handler func()) {
lock.Lock()
if IsRunning() {
runBackgroundWorker(name, handler)
} else {
backgroundWorkerNames = append(backgroundWorkerNames, name)
backgroundWorkers = append(backgroundWorkers, handler)
}
lock.Unlock()
}
func Start() {
if !running {
lock.Lock()
if !running {
ShutdownSignal = make(chan int, 1)
running = true
Events.Run.Trigger()
for i, backgroundWorker := range backgroundWorkers {
runBackgroundWorker(backgroundWorkerNames[i], backgroundWorker)
}
}
lock.Unlock()
}
}
func Run() {
Start()
wg.Wait()
}
func Shutdown() {
if running {
lock.Lock()
if running {
close(ShutdownSignal)
running = false
Events.Shutdown.Trigger()
}
lock.Unlock()
}
}
func ShutdownAndWait() {
if running {
lock.Lock()
if running {
close(ShutdownSignal)
running = false
Events.Shutdown.Trigger()
}
lock.Unlock()
}
wg.Wait()
}
func IsRunning() bool {
return running
}
package daemon
import (
"github.com/iotaledger/hive.go/events"
)
var Events = struct {
Run *events.Event
Shutdown *events.Event
}{
Run: events.NewEvent(events.CallbackCaller),
Shutdown: events.NewEvent(events.CallbackCaller),
}
package daemon
type Callback = func()
package node
const (
LOG_LEVEL_FAILURE = 0
LOG_LEVEL_WARNING = 1
LOG_LEVEL_SUCCESS = 2
LOG_LEVEL_INFO = 3
LOG_LEVEL_DEBUG = 4
)
package node
import (
"github.com/iotaledger/hive.go/events"
)
type pluginEvents struct {
Configure *events.Event
Run *events.Event
}
func pluginCaller(handler interface{}, params ...interface{}) {
handler.(func(*Plugin))(params[0].(*Plugin))
}
package node
import (
"fmt"
"sync"
)
type Logger struct {
enabled bool
enabledMutex sync.RWMutex
LogInfo func(pluginName string, message string)
LogSuccess func(pluginName string, message string)
LogWarning func(pluginName string, message string)
LogFailure func(pluginName string, message string)
LogDebug func(pluginName string, message string)
}
func (logger *Logger) SetEnabled(value bool) {
logger.enabledMutex.Lock()
logger.enabled = value
logger.enabledMutex.Unlock()
}
func (logger *Logger) GetEnabled() (result bool) {
logger.enabledMutex.RLock()
result = logger.enabled
logger.enabledMutex.RUnlock()
return
}
func pluginPrefix(pluginName string) string {
var pluginPrefix string
if pluginName == "Node" {
pluginPrefix = ""
} else {
pluginPrefix = pluginName + ": "
}
return pluginPrefix
}
var DEFAULT_LOGGER = &Logger{
enabled: true,
LogSuccess: func(pluginName string, message string) {
fmt.Println("[ OK ] " + pluginPrefix(pluginName) + message)
},
LogInfo: func(pluginName string, message string) {
fmt.Println("[ INFO ] " + pluginPrefix(pluginName) + message)
},
LogWarning: func(pluginName string, message string) {
fmt.Println("[ WARN ] " + pluginPrefix(pluginName) + message)
},
LogFailure: func(pluginName string, message string) {
fmt.Println("[ FAIL ] " + pluginPrefix(pluginName) + message)
},
LogDebug: func(pluginName string, message string) {
fmt.Println("[ NOTE ] " + pluginPrefix(pluginName) + message)
},
}
package node
import (
"sync"
"github.com/iotaledger/hive.go/parameter"
"github.com/iotaledger/goshimmer/packages/daemon"
)
type Node struct {
wg *sync.WaitGroup
loggers []*Logger
loadedPlugins []*Plugin
}
var DisabledPlugins = make(map[string]bool)
var EnabledPlugins = make(map[string]bool)
func New(plugins ...*Plugin) *Node {
node := &Node{
loggers: make([]*Logger, 0),
wg: &sync.WaitGroup{},
loadedPlugins: make([]*Plugin, 0),
}
node.AddLogger(DEFAULT_LOGGER)
// configure the enabled plugins
node.configure(plugins...)
return node
}
func Start(plugins ...*Plugin) *Node {
node := New(plugins...)
node.Start()
return node
}
func Run(plugins ...*Plugin) *Node {
node := New(plugins...)
node.Run()
return node
}
func Shutdown() {
daemon.ShutdownAndWait()
}
func (node *Node) AddLogger(logger *Logger) {
node.loggers = append(node.loggers, logger)
}
func (node *Node) LogSuccess(pluginName string, message string) {
if parameter.NodeConfig.GetInt(CFG_LOG_LEVEL) >= LOG_LEVEL_SUCCESS {
for _, logger := range node.loggers {
if logger.GetEnabled() {
logger.LogSuccess(pluginName, message)
}
}
}
}
func (node *Node) LogInfo(pluginName string, message string) {
if parameter.NodeConfig.GetInt(CFG_LOG_LEVEL) >= LOG_LEVEL_INFO {
for _, logger := range node.loggers {
if logger.GetEnabled() {
logger.LogInfo(pluginName, message)
}
}
}
}
func (node *Node) LogDebug(pluginName string, message string) {
if parameter.NodeConfig.GetInt(CFG_LOG_LEVEL) >= LOG_LEVEL_DEBUG {
for _, logger := range node.loggers {
if logger.GetEnabled() {
logger.LogDebug(pluginName, message)
}
}
}
}
func (node *Node) LogWarning(pluginName string, message string) {
if parameter.NodeConfig.GetInt(CFG_LOG_LEVEL) >= LOG_LEVEL_WARNING {
for _, logger := range node.loggers {
if logger.GetEnabled() {
logger.LogWarning(pluginName, message)
}
}
}
}
func (node *Node) LogFailure(pluginName string, message string) {
if parameter.NodeConfig.GetInt(CFG_LOG_LEVEL) >= LOG_LEVEL_FAILURE {
for _, logger := range node.loggers {
if logger.GetEnabled() {
logger.LogFailure(pluginName, message)
}
}
}
}
func isDisabled(plugin *Plugin) bool {
_, exists := DisabledPlugins[GetPluginIdentifier(plugin.Name)]
return exists
}
func isEnabled(plugin *Plugin) bool {
_, exists := EnabledPlugins[GetPluginIdentifier(plugin.Name)]
return exists
}
func (node *Node) configure(plugins ...*Plugin) {
for _, plugin := range plugins {
status := plugin.Status
if (status == Enabled && !isDisabled(plugin)) ||
(status == Disabled && isEnabled(plugin)) {
plugin.wg = node.wg
plugin.Node = node
plugin.Events.Configure.Trigger(plugin)
node.loadedPlugins = append(node.loadedPlugins, plugin)
node.LogInfo("Node", "Loading Plugin: "+plugin.Name+" ... done")
} else {
node.LogInfo("Node", "Skipping Plugin: "+plugin.Name)
}
}
}
func (node *Node) Start() {
node.LogInfo("Node", "Executing plugins ...")
for _, plugin := range node.loadedPlugins {
plugin.Events.Run.Trigger(plugin)
node.LogSuccess("Node", "Starting Plugin: "+plugin.Name+" ... done")
}
node.LogSuccess("Node", "Starting background workers ...")
daemon.Start()
}
func (node *Node) Run() {
node.LogInfo("Node", "Executing plugins ...")
for _, plugin := range node.loadedPlugins {
plugin.Events.Run.Trigger(plugin)
node.LogSuccess("Node", "Starting Plugin: "+plugin.Name+" ... done")
}
node.LogSuccess("Node", "Starting background workers ...")
daemon.Run()
node.LogSuccess("Node", "Shutdown complete!")
}
package node
import (
flag "github.com/spf13/pflag"
)
const (
CFG_LOG_LEVEL = "node.logLevel"
CFG_DISABLE_PLUGINS = "node.disablePlugins"
CFG_ENABLE_PLGUINS = "node.enablePlugins"
)
func init() {
flag.Int(CFG_LOG_LEVEL, LOG_LEVEL_INFO, "controls the log types that are shown")
flag.String(CFG_DISABLE_PLUGINS, "", "a list of plugins that shall be disabled")
flag.String(CFG_ENABLE_PLGUINS, "", "a list of plugins that shall be enabled")
}
package node
import (
"strings"
"sync"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/parameter"
)
const (
Disabled = iota
Enabled
)
type Plugin struct {
Node *Node
Name string
Status int
Events pluginEvents
wg *sync.WaitGroup
}
// Creates a new plugin with the given name, default status and callbacks.
// The last specified callback is the mandatory run callback, while all other callbacks are configure callbacks.
func NewPlugin(name string, status int, callback Callback, callbacks ...Callback) *Plugin {
plugin := &Plugin{
Name: name,
Status: status,
Events: pluginEvents{
Configure: events.NewEvent(pluginCaller),
Run: events.NewEvent(pluginCaller),
},
}
// make the plugin known to the parameters
parameter.AddPlugin(name, status)
if len(callbacks) >= 1 {
plugin.Events.Configure.Attach(events.NewClosure(callback))
for _, callback = range callbacks[:len(callbacks)-1] {
plugin.Events.Configure.Attach(events.NewClosure(callback))
}
plugin.Events.Run.Attach(events.NewClosure(callbacks[len(callbacks)-1]))
} else {
plugin.Events.Run.Attach(events.NewClosure(callback))
}
return plugin
}
func GetPluginIdentifier(name string) string {
return strings.ToLower(strings.Replace(name, " ", "", -1))
}
func (plugin *Plugin) LogSuccess(message string) {
plugin.Node.LogSuccess(plugin.Name, message)
}
func (plugin *Plugin) LogInfo(message string) {
plugin.Node.LogInfo(plugin.Name, message)
}
func (plugin *Plugin) LogWarning(message string) {
plugin.Node.LogWarning(plugin.Name, message)
}
func (plugin *Plugin) LogFailure(message string) {
plugin.Node.LogFailure(plugin.Name, message)
}
func (plugin *Plugin) LogDebug(message string) {
plugin.Node.LogDebug(plugin.Name, message)
}
package node
type Callback = func(plugin *Plugin)
......@@ -3,7 +3,7 @@ package timeutil
import (
"time"
"github.com/iotaledger/goshimmer/packages/daemon"
"github.com/iotaledger/hive.go/daemon"
)
func Sleep(interval time.Duration) bool {
......
......@@ -3,7 +3,7 @@ package timeutil
import (
"time"
"github.com/iotaledger/goshimmer/packages/daemon"
"github.com/iotaledger/hive.go/daemon"
)
func Ticker(handler func(), interval time.Duration) {
......
......@@ -6,9 +6,9 @@ import (
"github.com/iotaledger/goshimmer/plugins/gossip"
"github.com/iotaledger/goshimmer/packages/daemon"
"github.com/iotaledger/goshimmer/packages/model/value_transaction"
"github.com/iotaledger/goshimmer/plugins/tipselection"
"github.com/iotaledger/hive.go/daemon"
)
var spamming = false
......
......@@ -2,15 +2,16 @@ package client
import (
"encoding/hex"
"github.com/iotaledger/hive.go/parameter"
"net"
"time"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/parameter"
"github.com/iotaledger/autopeering-sim/discover"
"github.com/iotaledger/autopeering-sim/selection"
"github.com/iotaledger/goshimmer/packages/daemon"
"github.com/iotaledger/goshimmer/packages/network"
"github.com/iotaledger/goshimmer/packages/node"
"github.com/iotaledger/goshimmer/packages/timeutil"
"github.com/iotaledger/goshimmer/plugins/analysis/types/addnode"
"github.com/iotaledger/goshimmer/plugins/analysis/types/connectnodes"
......@@ -20,12 +21,12 @@ import (
"github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/node"
)
var debug *node.Plugin
var log = logger.NewLogger("Analysis-Client")
func Run(plugin *node.Plugin) {
debug = plugin
daemon.BackgroundWorker("Analysis Client", func() {
shuttingDown := false
......@@ -36,7 +37,7 @@ func Run(plugin *node.Plugin) {
default:
if conn, err := net.Dial("tcp", parameter.NodeConfig.GetString(CFG_SERVER_ADDRESS)); err != nil {
plugin.LogDebug("Could not connect to reporting server: " + err.Error())
log.Debugf("Could not connect to reporting server: %s", err.Error())
timeutil.Sleep(1 * time.Second)
} else {
......@@ -56,28 +57,16 @@ func Run(plugin *node.Plugin) {
func getEventDispatchers(conn *network.ManagedConnection) *EventDispatchers {
return &EventDispatchers{
AddNode: func(nodeId []byte) {
_, err := conn.Write((&addnode.Packet{NodeId: nodeId}).Marshal())
if err != nil {
debug.LogFailure(err.Error())
}
_, _ = conn.Write((&addnode.Packet{NodeId: nodeId}).Marshal())
},
RemoveNode: func(nodeId []byte) {
_, err := conn.Write((&removenode.Packet{NodeId: nodeId}).Marshal())
if err != nil {
debug.LogFailure(err.Error())
}
_, _ = conn.Write((&removenode.Packet{NodeId: nodeId}).Marshal())
},
ConnectNodes: func(sourceId []byte, targetId []byte) {
_, err := conn.Write((&connectnodes.Packet{SourceId: sourceId, TargetId: targetId}).Marshal())
if err != nil {
debug.LogFailure(err.Error())
}
_, _ = conn.Write((&connectnodes.Packet{SourceId: sourceId, TargetId: targetId}).Marshal())
},
DisconnectNodes: func(sourceId []byte, targetId []byte) {
_, err := conn.Write((&disconnectnodes.Packet{SourceId: sourceId, TargetId: targetId}).Marshal())
if err != nil {
debug.LogFailure(err.Error())
}
_, _ = conn.Write((&disconnectnodes.Packet{SourceId: sourceId, TargetId: targetId}).Marshal())
},
}
}
......@@ -94,27 +83,27 @@ func setupHooks(plugin *node.Plugin, conn *network.ManagedConnection, eventDispa
// define hooks ////////////////////////////////////////////////////////////////////////////////////////////////////
onDiscoverPeer := events.NewClosure(func(ev *discover.DiscoveredEvent) {
plugin.LogInfo("onDiscoverPeer: " + hex.EncodeToString(ev.Peer.ID().Bytes()))
log.Info("onDiscoverPeer: " + hex.EncodeToString(ev.Peer.ID().Bytes()))
eventDispatchers.AddNode(ev.Peer.ID().Bytes())
})
onDeletePeer := events.NewClosure(func(ev *discover.DeletedEvent) {
plugin.LogInfo("onDeletePeer: " + hex.EncodeToString(ev.Peer.ID().Bytes()))
log.Info("onDeletePeer: " + hex.EncodeToString(ev.Peer.ID().Bytes()))
eventDispatchers.RemoveNode(ev.Peer.ID().Bytes())
})
onAddAcceptedNeighbor := events.NewClosure(func(ev *selection.PeeringEvent) {
plugin.LogInfo("onAddAcceptedNeighbor: " + hex.EncodeToString(ev.Peer.ID().Bytes()) + " - " + hex.EncodeToString(local.INSTANCE.ID().Bytes()))
log.Info("onAddAcceptedNeighbor: " + hex.EncodeToString(ev.Peer.ID().Bytes()) + " - " + hex.EncodeToString(local.INSTANCE.ID().Bytes()))
eventDispatchers.ConnectNodes(ev.Peer.ID().Bytes(), local.INSTANCE.ID().Bytes())
})
onRemoveNeighbor := events.NewClosure(func(ev *selection.DroppedEvent) {
plugin.LogInfo("onRemoveNeighbor: " + hex.EncodeToString(ev.DroppedID.Bytes()) + " - " + hex.EncodeToString(local.INSTANCE.ID().Bytes()))
log.Info("onRemoveNeighbor: " + hex.EncodeToString(ev.DroppedID.Bytes()) + " - " + hex.EncodeToString(local.INSTANCE.ID().Bytes()))
eventDispatchers.DisconnectNodes(ev.DroppedID.Bytes(), local.INSTANCE.ID().Bytes())
})
onAddChosenNeighbor := events.NewClosure(func(ev *selection.PeeringEvent) {
plugin.LogInfo("onAddChosenNeighbor: " + hex.EncodeToString(local.INSTANCE.ID().Bytes()) + " - " + hex.EncodeToString(ev.Peer.ID().Bytes()))
log.Info("onAddChosenNeighbor: " + hex.EncodeToString(local.INSTANCE.ID().Bytes()) + " - " + hex.EncodeToString(ev.Peer.ID().Bytes()))
eventDispatchers.ConnectNodes(local.INSTANCE.ID().Bytes(), ev.Peer.ID().Bytes())
})
......
package analysis
import (
"github.com/iotaledger/goshimmer/packages/daemon"
"github.com/iotaledger/goshimmer/packages/node"
"github.com/iotaledger/goshimmer/plugins/analysis/client"
"github.com/iotaledger/goshimmer/plugins/analysis/server"
"github.com/iotaledger/goshimmer/plugins/analysis/webinterface"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
"github.com/iotaledger/hive.go/parameter"
)
var PLUGIN = node.NewPlugin("Analysis", node.Enabled, configure, run)
var log = logger.NewLogger("Analysis")
func configure(plugin *node.Plugin) {
if parameter.NodeConfig.GetInt(server.CFG_SERVER_PORT) != 0 {
......@@ -28,12 +30,12 @@ func run(plugin *node.Plugin) {
webinterface.Run(plugin)
server.Run(plugin)
} else {
plugin.Node.LogSuccess("Node", "Starting Plugin: Analysis ... server is disabled (server-port is 0)")
log.Info("Starting Plugin: Analysis ... server is disabled (server-port is 0)")
}
if parameter.NodeConfig.GetString(client.CFG_SERVER_ADDRESS) != "" {
client.Run(plugin)
} else {
plugin.Node.LogSuccess("Node", "Starting Plugin: Analysis ... client is disabled (server-address is empty)")
log.Info("Starting Plugin: Analysis ... client is disabled (server-address is empty)")
}
}
......@@ -2,51 +2,51 @@ package server
import (
"encoding/hex"
"github.com/iotaledger/goshimmer/packages/daemon"
"math"
"github.com/iotaledger/goshimmer/packages/network"
"github.com/iotaledger/goshimmer/packages/network/tcp"
"github.com/iotaledger/goshimmer/packages/node"
"github.com/iotaledger/goshimmer/plugins/analysis/types/addnode"
"github.com/iotaledger/goshimmer/plugins/analysis/types/connectnodes"
"github.com/iotaledger/goshimmer/plugins/analysis/types/disconnectnodes"
"github.com/iotaledger/goshimmer/plugins/analysis/types/ping"
"github.com/iotaledger/goshimmer/plugins/analysis/types/removenode"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
"github.com/iotaledger/hive.go/parameter"
"github.com/pkg/errors"
"math"
)
var server *tcp.Server
var debug *node.Plugin
var log = logger.NewLogger("Analysis-Server")
func Configure(plugin *node.Plugin) {
debug = plugin
server = tcp.NewServer()
server.Events.Connect.Attach(events.NewClosure(HandleConnection))
server.Events.Error.Attach(events.NewClosure(func(err error) {
plugin.LogFailure("error in server: " + err.Error())
log.Errorf("error in server: %s", err.Error())
}))
server.Events.Start.Attach(events.NewClosure(func() {
plugin.LogSuccess("Starting Server (port " + string(parameter.NodeConfig.GetInt(CFG_SERVER_PORT)) + ") ... done")
log.Infof("Starting Server (port %d) ... done", parameter.NodeConfig.GetInt(CFG_SERVER_PORT))
}))
server.Events.Shutdown.Attach(events.NewClosure(func() {
plugin.LogSuccess("Stopping Server ... done")
log.Info("Stopping Server ... done")
}))
}
func Run(plugin *node.Plugin) {
daemon.BackgroundWorker("Analysis Server", func() {
plugin.LogInfo("Starting Server (port " + string(parameter.NodeConfig.GetInt(CFG_SERVER_PORT)) + ") ...")
log.Infof("Starting Server (port %d) ... done", parameter.NodeConfig.GetInt(CFG_SERVER_PORT))
server.Listen(parameter.NodeConfig.GetInt(CFG_SERVER_PORT))
})
}
func Shutdown(plugin *node.Plugin) {
plugin.LogInfo("Stopping Server ...")
log.Info("Stopping Server ...")
server.Shutdown()
}
......
......@@ -4,9 +4,9 @@ import (
"net/http"
"time"
"github.com/iotaledger/goshimmer/packages/daemon"
"github.com/iotaledger/goshimmer/packages/node"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/node"
"golang.org/x/net/context"
"golang.org/x/net/websocket"
)
......
package webinterface
import (
"github.com/iotaledger/goshimmer/packages/node"
"github.com/iotaledger/goshimmer/plugins/analysis/webinterface/httpserver"
"github.com/iotaledger/goshimmer/plugins/analysis/webinterface/recordedevents"
"github.com/iotaledger/hive.go/node"
)
func Configure(plugin *node.Plugin) {
......
......@@ -4,10 +4,10 @@ import (
"strconv"
"sync"
"github.com/iotaledger/goshimmer/packages/node"
"github.com/iotaledger/goshimmer/plugins/analysis/server"
"github.com/iotaledger/goshimmer/plugins/analysis/webinterface/types"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/node"
)
var nodes = make(map[string]bool)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment