From e53cfef013d2cb973b8a90587f611f87278ec438 Mon Sep 17 00:00:00 2001 From: Luca Moser <moser.luca@gmail.com> Date: Tue, 7 Jan 2020 13:42:00 +0100 Subject: [PATCH] removes daemon.Events.Shutdown listeners --- plugins/analysis/plugin.go | 7 ----- plugins/analysis/server/plugin.go | 8 ++++-- .../webinterface/httpserver/plugin.go | 14 ++++------ plugins/bundleprocessor/plugin.go | 20 ++++++------- plugins/dashboard/plugin.go | 12 ++++---- plugins/statusscreen/statusscreen.go | 6 ++-- plugins/tangle/solidifier.go | 10 +++---- plugins/webapi/plugin.go | 28 +++++++++---------- plugins/zeromq/plugin.go | 20 ++++++------- 9 files changed, 53 insertions(+), 72 deletions(-) diff --git a/plugins/analysis/plugin.go b/plugins/analysis/plugin.go index f7ffdbc3..0208ca1d 100644 --- a/plugins/analysis/plugin.go +++ b/plugins/analysis/plugin.go @@ -5,8 +5,6 @@ import ( "github.com/iotaledger/goshimmer/plugins/analysis/client" "github.com/iotaledger/goshimmer/plugins/analysis/server" "github.com/iotaledger/goshimmer/plugins/analysis/webinterface" - "github.com/iotaledger/hive.go/daemon" - "github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/logger" "github.com/iotaledger/hive.go/node" ) @@ -18,10 +16,6 @@ func configure(plugin *node.Plugin) { if parameter.NodeConfig.GetInt(server.CFG_SERVER_PORT) != 0 { webinterface.Configure(plugin) server.Configure(plugin) - - daemon.Events.Shutdown.Attach(events.NewClosure(func() { - server.Shutdown(plugin) - })) } } @@ -35,7 +29,6 @@ func run(plugin *node.Plugin) { if parameter.NodeConfig.GetString(client.CFG_SERVER_ADDRESS) != "" { client.Run(plugin) - log.Info("Stopping Analysis-Client ... done") } else { log.Info("Starting Plugin: Analysis ... client is disabled (server-address is empty)") } diff --git a/plugins/analysis/server/plugin.go b/plugins/analysis/server/plugin.go index 4f67de7d..40fabc28 100644 --- a/plugins/analysis/server/plugin.go +++ b/plugins/analysis/server/plugin.go @@ -41,14 +41,16 @@ func Configure(plugin *node.Plugin) { func Run(plugin *node.Plugin) { daemon.BackgroundWorker("Analysis Server", func(shutdownSignal <-chan struct{}) { log.Infof("Starting Server (port %d) ... done", parameter.NodeConfig.GetInt(CFG_SERVER_PORT)) - server.Listen(parameter.NodeConfig.GetInt(CFG_SERVER_PORT)) + go server.Listen(parameter.NodeConfig.GetInt(CFG_SERVER_PORT)) + <-shutdownSignal + Shutdown() }) } -func Shutdown(plugin *node.Plugin) { +func Shutdown() { log.Info("Stopping Server ...") - server.Shutdown() + log.Info("Stopping Server ... done") } func HandleConnection(conn *network.ManagedConnection) { diff --git a/plugins/analysis/webinterface/httpserver/plugin.go b/plugins/analysis/webinterface/httpserver/plugin.go index 20051f0f..0299e173 100644 --- a/plugins/analysis/webinterface/httpserver/plugin.go +++ b/plugins/analysis/webinterface/httpserver/plugin.go @@ -5,7 +5,6 @@ import ( "time" "github.com/iotaledger/hive.go/daemon" - "github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/node" "golang.org/x/net/context" "golang.org/x/net/websocket" @@ -22,17 +21,14 @@ func Configure(plugin *node.Plugin) { router.Handle("/datastream", websocket.Handler(dataStream)) router.HandleFunc("/", index) - - daemon.Events.Shutdown.Attach(events.NewClosure(func() { - ctx, cancel := context.WithTimeout(context.Background(), 0*time.Second) - defer cancel() - - httpServer.Shutdown(ctx) - })) } func Run(plugin *node.Plugin) { daemon.BackgroundWorker("Analysis HTTP Server", func(shutdownSignal <-chan struct{}) { - httpServer.ListenAndServe() + go httpServer.ListenAndServe() + <-shutdownSignal + ctx, cancel := context.WithTimeout(context.Background(), 0*time.Second) + defer cancel() + httpServer.Shutdown(ctx) }) } diff --git a/plugins/bundleprocessor/plugin.go b/plugins/bundleprocessor/plugin.go index d7f6dbb7..c95eeeed 100644 --- a/plugins/bundleprocessor/plugin.go +++ b/plugins/bundleprocessor/plugin.go @@ -23,16 +23,6 @@ func configure(plugin *node.Plugin) { Events.Error.Attach(events.NewClosure(func(err errors.IdentifiableError) { log.Error(err.Error()) })) - - daemon.Events.Shutdown.Attach(events.NewClosure(func() { - log.Info("Stopping Bundle Processor ...") - - workerPool.Stop() - - log.Info("Stopping Value Bundle Processor ...") - - valueBundleProcessorWorkerPool.Stop() - })) } func run(plugin *node.Plugin) { @@ -40,7 +30,10 @@ func run(plugin *node.Plugin) { daemon.BackgroundWorker("Bundle Processor", func(shutdownSignal <-chan struct{}) { log.Info("Starting Bundle Processor ... done") - workerPool.Run() + workerPool.Start() + <-shutdownSignal + log.Info("Stopping Bundle Processor ...") + workerPool.Stop() log.Info("Stopping Bundle Processor ... done") }) @@ -48,7 +41,10 @@ func run(plugin *node.Plugin) { daemon.BackgroundWorker("Value Bundle Processor", func(shutdownSignal <-chan struct{}) { log.Info("Starting Value Bundle Processor ... done") - valueBundleProcessorWorkerPool.Run() + valueBundleProcessorWorkerPool.Start() + <-shutdownSignal + log.Info("Stopping Value Bundle Processor ...") + valueBundleProcessorWorkerPool.Stop() log.Info("Stopping Value Bundle Processor ... done") }) } diff --git a/plugins/dashboard/plugin.go b/plugins/dashboard/plugin.go index 708a801e..f263b4b8 100644 --- a/plugins/dashboard/plugin.go +++ b/plugins/dashboard/plugin.go @@ -33,13 +33,6 @@ func configure(plugin *node.Plugin) { TPSQ = TPSQ[1:] } })) - - daemon.Events.Shutdown.Attach(events.NewClosure(func() { - ctx, cancel := context.WithTimeout(context.Background(), 0*time.Second) - defer cancel() - - _ = server.Shutdown(ctx) - })) } func run(plugin *node.Plugin) { @@ -49,5 +42,10 @@ func run(plugin *node.Plugin) { log.Error(err.Error()) } }() + + <-shutdownSignal + ctx, cancel := context.WithTimeout(context.Background(), 0*time.Second) + defer cancel() + _ = server.Shutdown(ctx) }) } diff --git a/plugins/statusscreen/statusscreen.go b/plugins/statusscreen/statusscreen.go index 06057fd4..e488b59a 100644 --- a/plugins/statusscreen/statusscreen.go +++ b/plugins/statusscreen/statusscreen.go @@ -36,14 +36,14 @@ func configure(plugin *node.Plugin) { }) logger.Events.AnyMsg.Attach(anyLogMsgClosure) - daemon.Events.Shutdown.Attach(events.NewClosure(func() { + daemon.BackgroundWorker("UI-Detach", func(shutdownSignal <-chan struct{}) { + <-shutdownSignal logger.InjectWriters(os.Stdout) logger.Events.AnyMsg.Detach(anyLogMsgClosure) - if app != nil { app.Stop() } - })) + }, 1) } func run(plugin *node.Plugin) { diff --git a/plugins/tangle/solidifier.go b/plugins/tangle/solidifier.go index de2526d6..af7b07c5 100644 --- a/plugins/tangle/solidifier.go +++ b/plugins/tangle/solidifier.go @@ -44,11 +44,6 @@ func configureSolidifier(plugin *node.Plugin) { workerPool.Submit(metaTx) })) - - daemon.Events.Shutdown.Attach(events.NewClosure(func() { - log.Info("Stopping Solidifier ...") - workerPool.Stop() - })) } func runSolidifier(plugin *node.Plugin) { @@ -56,7 +51,10 @@ func runSolidifier(plugin *node.Plugin) { daemon.BackgroundWorker("Tangle Solidifier", func(shutdownSignal <-chan struct{}) { log.Info("Starting Solidifier ... done") - workerPool.Run() + workerPool.Start() + <-shutdownSignal + log.Info("Stopping Solidifier ...") + workerPool.Stop() log.Info("Stopping Solidifier ... done") }) } diff --git a/plugins/webapi/plugin.go b/plugins/webapi/plugin.go index 4d3177ee..e7d7c5a3 100644 --- a/plugins/webapi/plugin.go +++ b/plugins/webapi/plugin.go @@ -5,7 +5,6 @@ import ( "time" "github.com/iotaledger/hive.go/daemon" - "github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/logger" "github.com/iotaledger/hive.go/node" "github.com/labstack/echo" @@ -20,17 +19,6 @@ func configure(plugin *node.Plugin) { Server.HideBanner = true Server.HidePort = true Server.GET("/", IndexRequest) - - daemon.Events.Shutdown.Attach(events.NewClosure(func() { - log.Info("Stopping Web Server ...") - - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - - if err := Server.Shutdown(ctx); err != nil { - log.Errorf("Couldn't stop server cleanly: %s", err.Error()) - } - })) } func run(plugin *node.Plugin) { @@ -39,8 +27,20 @@ func run(plugin *node.Plugin) { daemon.BackgroundWorker("WebAPI Server", func(shutdownSignal <-chan struct{}) { log.Info("Starting Web Server ... done") - if err := Server.Start(":8080"); err != nil { - log.Info("Stopping Web Server ... done") + go func() { + if err := Server.Start(":8080"); err != nil { + log.Info("Stopping Web Server ... done") + } + }() + + <-shutdownSignal + + log.Info("Stopping Web Server ...") + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + if err := Server.Shutdown(ctx); err != nil { + log.Errorf("Couldn't stop server cleanly: %s", err.Error()) } }) } diff --git a/plugins/zeromq/plugin.go b/plugins/zeromq/plugin.go index aa547901..c9d469a8 100644 --- a/plugins/zeromq/plugin.go +++ b/plugins/zeromq/plugin.go @@ -22,17 +22,6 @@ var emptyTag = strings.Repeat("9", 27) // Configure the zeromq plugin func configure(plugin *node.Plugin) { - - daemon.Events.Shutdown.Attach(events.NewClosure(func() { - log.Info("Stopping ZeroMQ Publisher ...") - - if err := publisher.Shutdown(); err != nil { - log.Errorf("Stopping ZeroMQ Publisher: %s", err.Error()) - } else { - log.Info("Stopping ZeroMQ Publisher ... done") - } - })) - tangle.Events.TransactionStored.Attach(events.NewClosure(func(tx *value_transaction.ValueTransaction) { // create goroutine for every event go func() { @@ -54,6 +43,15 @@ func run(plugin *node.Plugin) { } else { log.Infof("Starting ZeroMQ Publisher (port %d) ... done", zeromqPort) } + + <-shutdownSignal + + log.Info("Stopping ZeroMQ Publisher ...") + if err := publisher.Shutdown(); err != nil { + log.Errorf("Stopping ZeroMQ Publisher: %s", err.Error()) + } else { + log.Info("Stopping ZeroMQ Publisher ... done") + } }) } -- GitLab