From d3274a21d2050edea7b3beb5137fee16cb8b5abb Mon Sep 17 00:00:00 2001 From: Hans Moog <hm@mkjc.net> Date: Mon, 8 Jul 2019 01:04:04 +0200 Subject: [PATCH] Feat: daemon.BackgroundWorker has to be named --- packages/daemon/daemon.go | 44 ++++++++++++++----- packages/node/plugin.go | 13 ++++++ .../transactionspammer/transactionspammer.go | 2 +- plugins/analysis/client/plugin.go | 2 +- plugins/analysis/server/plugin.go | 2 +- .../webinterface/httpserver/plugin.go | 2 +- .../instances/neighborhood/instance.go | 2 +- .../protocol/outgoing_request_processor.go | 6 ++- plugins/autopeering/protocol/plugin.go | 8 ++-- .../autopeering/saltmanager/saltmanager.go | 2 +- plugins/autopeering/server/tcp/server.go | 2 +- plugins/autopeering/server/udp/server.go | 2 +- plugins/bundleprocessor/bundleprocessor.go | 8 +--- .../bundleprocessor/bundleprocessor_test.go | 41 +++++++++++++++++ plugins/bundleprocessor/events.go | 10 ++--- plugins/bundleprocessor/plugin.go | 2 +- plugins/gossip/neighbors.go | 2 +- plugins/gossip/send_queue.go | 10 ++--- plugins/gossip/server.go | 2 +- plugins/gracefulshutdown/plugin.go | 9 +++- plugins/statusscreen-tps/plugin.go | 2 +- plugins/statusscreen/statusscreen.go | 4 +- plugins/tangle/solidifier.go | 2 +- plugins/webapi/plugin.go | 2 +- plugins/zeromq/plugin.go | 2 +- 25 files changed, 133 insertions(+), 50 deletions(-) create mode 100644 plugins/bundleprocessor/bundleprocessor_test.go diff --git a/packages/daemon/daemon.go b/packages/daemon/daemon.go index 404fccd0..d6961424 100644 --- a/packages/daemon/daemon.go +++ b/packages/daemon/daemon.go @@ -5,29 +5,53 @@ import ( ) var ( - running bool - wg sync.WaitGroup - ShutdownSignal = make(chan int, 1) - backgroundWorkers = make([]func(), 0) - lock = sync.Mutex{} + running bool + wg sync.WaitGroup + ShutdownSignal = make(chan int, 1) + backgroundWorkers = make([]func(), 0) + backgroundWorkerNames = make([]string, 0) + runningBackgroundWorkers = make(map[string]bool) + lock = sync.Mutex{} ) -func runBackgroundWorker(backgroundWorker func()) { +func GetRunningBackgroundWorkers() []string { + lock.Lock() + + result := make([]string, 0) + for runningBackgroundWorker := range runningBackgroundWorkers { + result = append(result, runningBackgroundWorker) + } + + lock.Unlock() + + return result +} + +func runBackgroundWorker(name string, backgroundWorker func()) { wg.Add(1) go func() { + lock.Lock() + runningBackgroundWorkers[name] = true + lock.Unlock() + backgroundWorker() + lock.Lock() + delete(runningBackgroundWorkers, name) + lock.Unlock() + wg.Done() }() } -func BackgroundWorker(handler func()) { +func BackgroundWorker(name string, handler func()) { lock.Lock() if IsRunning() { - runBackgroundWorker(handler) + runBackgroundWorker(name, handler) } else { + backgroundWorkerNames = append(backgroundWorkerNames, name) backgroundWorkers = append(backgroundWorkers, handler) } @@ -45,8 +69,8 @@ func Run() { Events.Run.Trigger() - for _, backgroundWorker := range backgroundWorkers { - runBackgroundWorker(backgroundWorker) + for i, backgroundWorker := range backgroundWorkers { + runBackgroundWorker(backgroundWorkerNames[i], backgroundWorker) } } diff --git a/packages/node/plugin.go b/packages/node/plugin.go index a98a22be..1a6cb527 100644 --- a/packages/node/plugin.go +++ b/packages/node/plugin.go @@ -55,3 +55,16 @@ func (plugin *Plugin) LogFailure(message string) { func (plugin *Plugin) LogDebug(message string) { plugin.Node.LogDebug(plugin.Name, message) } + +var TestNode = &Node{ + loggers: make([]*Logger, 0), + wg: &sync.WaitGroup{}, + loadedPlugins: make([]*Plugin, 0), +} + +func (plugin *Plugin) InitTest() { + plugin.Node = TestNode + + plugin.Events.Configure.Trigger(plugin) + plugin.Events.Run.Trigger(plugin) +} diff --git a/packages/transactionspammer/transactionspammer.go b/packages/transactionspammer/transactionspammer.go index 503a0f5a..c59c6f54 100644 --- a/packages/transactionspammer/transactionspammer.go +++ b/packages/transactionspammer/transactionspammer.go @@ -23,7 +23,7 @@ func Start(tps int64) { shutdownSignal = make(chan int, 1) func(shutdownSignal chan int) { - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Transaction Spammer", func() { for { start := time.Now() sentCounter := int64(0) diff --git a/plugins/analysis/client/plugin.go b/plugins/analysis/client/plugin.go index 3a42c55a..b73dba38 100644 --- a/plugins/analysis/client/plugin.go +++ b/plugins/analysis/client/plugin.go @@ -21,7 +21,7 @@ import ( ) func Run(plugin *node.Plugin) { - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Analysis Client", func() { shuttingDown := false for !shuttingDown { diff --git a/plugins/analysis/server/plugin.go b/plugins/analysis/server/plugin.go index 86522f1c..cbcbb725 100644 --- a/plugins/analysis/server/plugin.go +++ b/plugins/analysis/server/plugin.go @@ -36,7 +36,7 @@ func Configure(plugin *node.Plugin) { } func Run(plugin *node.Plugin) { - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Analysis Server", func() { plugin.LogInfo("Starting Server (port " + strconv.Itoa(*SERVER_PORT.Value) + ") ...") server.Listen(*SERVER_PORT.Value) diff --git a/plugins/analysis/webinterface/httpserver/plugin.go b/plugins/analysis/webinterface/httpserver/plugin.go index 6f5edbb8..a6b62cbb 100644 --- a/plugins/analysis/webinterface/httpserver/plugin.go +++ b/plugins/analysis/webinterface/httpserver/plugin.go @@ -32,7 +32,7 @@ func Configure(plugin *node.Plugin) { } func Run(plugin *node.Plugin) { - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Analysis HTTP Server", func() { httpServer.ListenAndServe() }) } diff --git a/plugins/autopeering/instances/neighborhood/instance.go b/plugins/autopeering/instances/neighborhood/instance.go index 5a94afd3..d5791d22 100644 --- a/plugins/autopeering/instances/neighborhood/instance.go +++ b/plugins/autopeering/instances/neighborhood/instance.go @@ -35,7 +35,7 @@ func Configure(plugin *node.Plugin) { } func Run(plugin *node.Plugin) { - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Neighborhood Updater", func() { timeutil.Ticker(updateNeighborHood, 1*time.Second) }) } diff --git a/plugins/autopeering/protocol/outgoing_request_processor.go b/plugins/autopeering/protocol/outgoing_request_processor.go index cec618cb..31fb40cf 100644 --- a/plugins/autopeering/protocol/outgoing_request_processor.go +++ b/plugins/autopeering/protocol/outgoing_request_processor.go @@ -41,7 +41,11 @@ func createOutgoingRequestProcessor(plugin *node.Plugin) func() { func sendOutgoingRequests(plugin *node.Plugin) { for _, chosenNeighborCandidate := range chosenneighbors.CANDIDATES.Clone() { - time.Sleep(5 * time.Second) + select { + case <-daemon.ShutdownSignal: + return + case <-time.After(5 * time.Second): + } if candidateShouldBeContacted(chosenNeighborCandidate) { if dialed, err := chosenNeighborCandidate.Send(outgoingrequest.INSTANCE.Marshal(), types.PROTOCOL_TYPE_TCP, true); err != nil { diff --git a/plugins/autopeering/protocol/plugin.go b/plugins/autopeering/protocol/plugin.go index fce22265..a1b175fc 100644 --- a/plugins/autopeering/protocol/plugin.go +++ b/plugins/autopeering/protocol/plugin.go @@ -20,8 +20,8 @@ func Configure(plugin *node.Plugin) { } func Run(plugin *node.Plugin) { - daemon.BackgroundWorker(createChosenNeighborDropper(plugin)) - daemon.BackgroundWorker(createAcceptedNeighborDropper(plugin)) - daemon.BackgroundWorker(createOutgoingRequestProcessor(plugin)) - daemon.BackgroundWorker(createOutgoingPingProcessor(plugin)) + daemon.BackgroundWorker("Autopeering Chosen Neighbor Dropper", createChosenNeighborDropper(plugin)) + daemon.BackgroundWorker("Autopeering Accepted Neighbor Dropper", createAcceptedNeighborDropper(plugin)) + daemon.BackgroundWorker("Autopeering Outgoing Request Processor", createOutgoingRequestProcessor(plugin)) + daemon.BackgroundWorker("Autopeering Outgoing Ping Processor", createOutgoingPingProcessor(plugin)) } diff --git a/plugins/autopeering/saltmanager/saltmanager.go b/plugins/autopeering/saltmanager/saltmanager.go index 5127646a..0004b7f6 100644 --- a/plugins/autopeering/saltmanager/saltmanager.go +++ b/plugins/autopeering/saltmanager/saltmanager.go @@ -68,7 +68,7 @@ func scheduleUpdateForSalt(saltToUpdate *salt.Salt, settingsKey []byte, lifeSpan if saltToUpdate.ExpirationTime.Before(now) { updatePublicSalt(saltToUpdate, settingsKey, lifeSpan, callback) } else { - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Salt Updater", func() { select { case <-time.After(saltToUpdate.ExpirationTime.Sub(now)): updatePublicSalt(saltToUpdate, settingsKey, lifeSpan, callback) diff --git a/plugins/autopeering/server/tcp/server.go b/plugins/autopeering/server/tcp/server.go index 3d4977aa..165c098e 100644 --- a/plugins/autopeering/server/tcp/server.go +++ b/plugins/autopeering/server/tcp/server.go @@ -37,7 +37,7 @@ func ConfigureServer(plugin *node.Plugin) { } func RunServer(plugin *node.Plugin) { - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Autopeering TCP Server", func() { if *parameters.ADDRESS.Value == "0.0.0.0" { plugin.LogInfo("Starting TCP Server (port " + strconv.Itoa(*parameters.PORT.Value) + ") ...") } else { diff --git a/plugins/autopeering/server/udp/server.go b/plugins/autopeering/server/udp/server.go index 01e66fad..9d01a189 100644 --- a/plugins/autopeering/server/udp/server.go +++ b/plugins/autopeering/server/udp/server.go @@ -41,7 +41,7 @@ func ConfigureServer(plugin *node.Plugin) { } func RunServer(plugin *node.Plugin) { - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Autopeering UDP Server", func() { if *parameters.ADDRESS.Value == "0.0.0.0" { plugin.LogInfo("Starting UDP Server (port " + strconv.Itoa(*parameters.PORT.Value) + ") ...") } else { diff --git a/plugins/bundleprocessor/bundleprocessor.go b/plugins/bundleprocessor/bundleprocessor.go index 9e21e166..8f49b8d5 100644 --- a/plugins/bundleprocessor/bundleprocessor.go +++ b/plugins/bundleprocessor/bundleprocessor.go @@ -28,7 +28,7 @@ func ProcessSolidBundleHead(headTransaction *value_transaction.ValueTransaction) if currentTransaction.IsHead() && currentTransaction != headTransaction { newBundle.SetTransactionHashes(mapTransactionsToTransactionHashes(bundleTransactions)) - Events.InvalidBundleReceived.Trigger(newBundle, bundleTransactions) + Events.InvalidBundle.Trigger(newBundle, bundleTransactions) return nil, ErrProcessBundleFailed.Derive(errors.New("invalid bundle found"), "missing bundle tail") } @@ -52,11 +52,7 @@ func ProcessSolidBundleHead(headTransaction *value_transaction.ValueTransaction) if currentTransaction.IsTail() { newBundle.SetTransactionHashes(mapTransactionsToTransactionHashes(bundleTransactions)) - if newBundle.IsValueBundle() { - Events.ValueBundleReceived.Trigger(newBundle, bundleTransactions) - } else { - Events.DataBundleReceived.Trigger(newBundle, bundleTransactions) - } + Events.BundleSolid.Trigger(newBundle, bundleTransactions) return newBundle, nil } diff --git a/plugins/bundleprocessor/bundleprocessor_test.go b/plugins/bundleprocessor/bundleprocessor_test.go new file mode 100644 index 00000000..f2132dc9 --- /dev/null +++ b/plugins/bundleprocessor/bundleprocessor_test.go @@ -0,0 +1,41 @@ +package bundleprocessor + +import ( + "fmt" + "testing" + + "github.com/iotaledger/goshimmer/packages/events" + "github.com/iotaledger/goshimmer/packages/model/bundle" + "github.com/iotaledger/goshimmer/packages/model/value_transaction" + "github.com/iotaledger/goshimmer/packages/ternary" + "github.com/iotaledger/goshimmer/plugins/tangle" + "github.com/magiconair/properties/assert" +) + +func TestProcessSolidBundleHead(t *testing.T) { + tangle.PLUGIN.InitTest() + + tx := value_transaction.New() + tx.SetTail(true) + tx.SetValue(3) + + tx1 := value_transaction.New() + tx1.SetTrunkTransactionHash(tx.GetHash()) + tx1.SetHead(true) + + tangle.StoreTransaction(tx) + tangle.StoreTransaction(tx1) + + Events.BundleSolid.Attach(events.NewClosure(func(bundle *bundle.Bundle, transactions []*value_transaction.ValueTransaction) { + fmt.Println("IT HAPPENED") + fmt.Println(bundle.GetHash()) + })) + + result, err := ProcessSolidBundleHead(tx1) + if err != nil { + t.Error(err) + } else { + assert.Equal(t, result.GetHash(), ternary.Trytes("UFWJYEWKMEQDNSQUCUWBGOFRHVBGHVVYEZCLCGRDTRQSMAFALTIPMJEEYFDPMQCNJWLXUWFMBZGHQRO99"), "invalid bundle hash") + assert.Equal(t, result.IsValueBundle(), true, "invalid value bundle status") + } +} diff --git a/plugins/bundleprocessor/events.go b/plugins/bundleprocessor/events.go index 19f4ca04..2d67f5da 100644 --- a/plugins/bundleprocessor/events.go +++ b/plugins/bundleprocessor/events.go @@ -7,15 +7,13 @@ import ( ) var Events = pluginEvents{ - DataBundleReceived: events.NewEvent(bundleEventCaller), - ValueBundleReceived: events.NewEvent(bundleEventCaller), - InvalidBundleReceived: events.NewEvent(bundleEventCaller), + BundleSolid: events.NewEvent(bundleEventCaller), + InvalidBundle: events.NewEvent(bundleEventCaller), } type pluginEvents struct { - DataBundleReceived *events.Event - ValueBundleReceived *events.Event - InvalidBundleReceived *events.Event + BundleSolid *events.Event + InvalidBundle *events.Event } func bundleEventCaller(handler interface{}, params ...interface{}) { diff --git a/plugins/bundleprocessor/plugin.go b/plugins/bundleprocessor/plugin.go index 44e11003..cec87804 100644 --- a/plugins/bundleprocessor/plugin.go +++ b/plugins/bundleprocessor/plugin.go @@ -34,7 +34,7 @@ func configure(plugin *node.Plugin) { func run(plugin *node.Plugin) { plugin.LogInfo("Starting Bundle Processor ...") - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Bundle Processor", func() { plugin.LogSuccess("Starting Bundle Processor ... done") workerPool.Run() diff --git a/plugins/gossip/neighbors.go b/plugins/gossip/neighbors.go index f61e08d8..87577a66 100644 --- a/plugins/gossip/neighbors.go +++ b/plugins/gossip/neighbors.go @@ -47,7 +47,7 @@ func runNeighbors(plugin *node.Plugin) { } func manageConnection(plugin *node.Plugin, neighbor *Neighbor) { - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Connection Manager ("+neighbor.Identity.StringIdentifier+")", func() { failedConnectionAttempts := 0 for _, exists := GetNeighbor(neighbor.Identity.StringIdentifier); exists && failedConnectionAttempts < CONNECTION_MAX_ATTEMPTS; { diff --git a/plugins/gossip/send_queue.go b/plugins/gossip/send_queue.go index 354c0c3b..9894910e 100644 --- a/plugins/gossip/send_queue.go +++ b/plugins/gossip/send_queue.go @@ -26,7 +26,7 @@ func configureSendQueue(plugin *node.Plugin) { func runSendQueue(plugin *node.Plugin) { plugin.LogInfo("Starting Send Queue Dispatcher ...") - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Gossip Send Queue Dispatcher", func() { plugin.LogSuccess("Starting Send Queue Dispatcher ... done") for { @@ -54,7 +54,7 @@ func runSendQueue(plugin *node.Plugin) { connectedNeighborsMutex.Lock() for _, neighborQueue := range neighborQueues { - startNeighborSendQueue(neighborQueue) + startNeighborSendQueue(neighborQueue.protocol.Neighbor, neighborQueue) } connectedNeighborsMutex.Unlock() } @@ -104,13 +104,13 @@ func setupEventHandlers(neighbor *Neighbor) { })) if daemon.IsRunning() { - startNeighborSendQueue(queue) + startNeighborSendQueue(neighbor, queue) } })) } -func startNeighborSendQueue(neighborQueue *neighborQueue) { - daemon.BackgroundWorker(func() { +func startNeighborSendQueue(neighbor *Neighbor, neighborQueue *neighborQueue) { + daemon.BackgroundWorker("Gossip Send Queue ("+neighbor.Identity.StringIdentifier+")", func() { for { select { case <-daemon.ShutdownSignal: diff --git a/plugins/gossip/server.go b/plugins/gossip/server.go index 37ebc20d..07804ad0 100644 --- a/plugins/gossip/server.go +++ b/plugins/gossip/server.go @@ -73,7 +73,7 @@ func configureServer(plugin *node.Plugin) { func runServer(plugin *node.Plugin) { plugin.LogInfo("Starting TCP Server (port " + strconv.Itoa(*PORT.Value) + ") ...") - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Gossip TCP Server", func() { plugin.LogSuccess("Starting TCP Server (port " + strconv.Itoa(*PORT.Value) + ") ... done") TCPServer.Listen(*PORT.Value) diff --git a/plugins/gracefulshutdown/plugin.go b/plugins/gracefulshutdown/plugin.go index babf7571..ee3a074c 100644 --- a/plugins/gracefulshutdown/plugin.go +++ b/plugins/gracefulshutdown/plugin.go @@ -4,6 +4,7 @@ import ( "os" "os/signal" "strconv" + "strings" "syscall" "time" @@ -31,7 +32,13 @@ var PLUGIN = node.NewPlugin("Graceful Shutdown", func(plugin *node.Plugin) { secondsSinceStart := x.Sub(start).Seconds() if secondsSinceStart <= WAIT_TO_KILL_TIME_IN_SECONDS { - plugin.LogWarning("Received shutdown request - waiting (max " + strconv.Itoa(WAIT_TO_KILL_TIME_IN_SECONDS-int(secondsSinceStart)) + " seconds) to finish processing ...") + processList := "" + runningBackgroundWorkers := daemon.GetRunningBackgroundWorkers() + if len(runningBackgroundWorkers) >= 1 { + processList = "(" + strings.Join(runningBackgroundWorkers, ", ") + ") " + } + + plugin.LogWarning("Received shutdown request - waiting (max " + strconv.Itoa(WAIT_TO_KILL_TIME_IN_SECONDS-int(secondsSinceStart)) + " seconds) to finish processing " + processList + "...") } else { plugin.LogFailure("Background processes did not terminate in time! Forcing shutdown ...") diff --git a/plugins/statusscreen-tps/plugin.go b/plugins/statusscreen-tps/plugin.go index eabb868f..9b73392c 100644 --- a/plugins/statusscreen-tps/plugin.go +++ b/plugins/statusscreen-tps/plugin.go @@ -36,7 +36,7 @@ var PLUGIN = node.NewPlugin("Statusscreen TPS", func(plugin *node.Plugin) { return "TPS", strconv.FormatUint(atomic.LoadUint64(&receivedTps), 10) + " received / " + strconv.FormatUint(atomic.LoadUint64(&solidTps), 10) + " new" }) }, func(plugin *node.Plugin) { - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Statusscreen TPS Tracker", func() { ticker := time.NewTicker(time.Second) for { diff --git a/plugins/statusscreen/statusscreen.go b/plugins/statusscreen/statusscreen.go index 31b00d08..5c6d13e2 100644 --- a/plugins/statusscreen/statusscreen.go +++ b/plugins/statusscreen/statusscreen.go @@ -100,7 +100,7 @@ func run(plugin *node.Plugin) { return false }) - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Statusscreen Refresher", func() { for { select { case <-daemon.ShutdownSignal: @@ -111,7 +111,7 @@ func run(plugin *node.Plugin) { } }) - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Statusscreen App", func() { if err := app.SetRoot(frame, true).SetFocus(frame).Run(); err != nil { panic(err) } diff --git a/plugins/tangle/solidifier.go b/plugins/tangle/solidifier.go index bffbb046..2837dd64 100644 --- a/plugins/tangle/solidifier.go +++ b/plugins/tangle/solidifier.go @@ -37,7 +37,7 @@ func configureSolidifier(plugin *node.Plugin) { func runSolidifier(plugin *node.Plugin) { plugin.LogInfo("Starting Solidifier ...") - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("Tangle Solidifier", func() { plugin.LogSuccess("Starting Solidifier ... done") workerPool.Run() diff --git a/plugins/webapi/plugin.go b/plugins/webapi/plugin.go index ee2010c4..4d80bdad 100644 --- a/plugins/webapi/plugin.go +++ b/plugins/webapi/plugin.go @@ -33,7 +33,7 @@ func configure(plugin *node.Plugin) { func run(plugin *node.Plugin) { plugin.LogInfo("Starting Web Server ...") - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("WebAPI Server", func() { plugin.LogSuccess("Starting Web Server ... done") if err := Server.Start(":8080"); err != nil { diff --git a/plugins/zeromq/plugin.go b/plugins/zeromq/plugin.go index 7d4472cd..aa3f584f 100644 --- a/plugins/zeromq/plugin.go +++ b/plugins/zeromq/plugin.go @@ -45,7 +45,7 @@ func run(plugin *node.Plugin) { plugin.LogInfo("Starting ZeroMQ Publisher (port " + strconv.Itoa(*PORT.Value) + ") ...") - daemon.BackgroundWorker(func() { + daemon.BackgroundWorker("ZeroMQ Publisher", func() { if err := startPublisher(plugin); err != nil { plugin.LogFailure("Stopping ZeroMQ Publisher: " + err.Error()) } else { -- GitLab