Skip to content
Snippets Groups Projects
Unverified Commit 99a43562 authored by Angelo Capossele's avatar Angelo Capossele Committed by GitHub
Browse files

Add panic if backgroundWorker fails (#444)

* :scream: Add panic if backgroundWorker fails

* :art: Use log.Panicf instead of panic
parent b8f0b053
No related branches found
No related tags found
No related merge requests found
Showing
with 69 additions and 40 deletions
...@@ -99,10 +99,12 @@ func configure(_ *node.Plugin) { ...@@ -99,10 +99,12 @@ func configure(_ *node.Plugin) {
} }
func run(*node.Plugin) { func run(*node.Plugin) {
_ = daemon.BackgroundWorker("Tangle", func(shutdownSignal <-chan struct{}) { if err := daemon.BackgroundWorker("Tangle", func(shutdownSignal <-chan struct{}) {
<-shutdownSignal <-shutdownSignal
Tangle.Shutdown() Tangle.Shutdown()
}, shutdown.PriorityTangle) }, shutdown.PriorityTangle); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
runFPC() runFPC()
} }
......
...@@ -99,7 +99,7 @@ func configureFPC() { ...@@ -99,7 +99,7 @@ func configureFPC() {
} }
func runFPC() { func runFPC() {
daemon.BackgroundWorker("FPCVoterServer", func(shutdownSignal <-chan struct{}) { if err := daemon.BackgroundWorker("FPCVoterServer", func(shutdownSignal <-chan struct{}) {
voterServer = votenet.New(Voter(), func(id string) vote.Opinion { voterServer = votenet.New(Voter(), func(id string) vote.Opinion {
branchID, err := branchmanager.BranchIDFromBase58(id) branchID, err := branchmanager.BranchIDFromBase58(id)
if err != nil { if err != nil {
...@@ -133,9 +133,11 @@ func runFPC() { ...@@ -133,9 +133,11 @@ func runFPC() {
<-shutdownSignal <-shutdownSignal
voterServer.Shutdown() voterServer.Shutdown()
log.Info("Stopped vote server") log.Info("Stopped vote server")
}, shutdown.PriorityFPC) }, shutdown.PriorityFPC); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
daemon.BackgroundWorker("FPCRoundsInitiator", func(shutdownSignal <-chan struct{}) { if err := daemon.BackgroundWorker("FPCRoundsInitiator", func(shutdownSignal <-chan struct{}) {
log.Infof("Started FPC round initiator") log.Infof("Started FPC round initiator")
unixTsPRNG := prng.NewUnixTimestampPRNG(roundIntervalSeconds) unixTsPRNG := prng.NewUnixTimestampPRNG(roundIntervalSeconds)
defer unixTsPRNG.Stop() defer unixTsPRNG.Stop()
...@@ -151,7 +153,9 @@ func runFPC() { ...@@ -151,7 +153,9 @@ func runFPC() {
} }
} }
log.Infof("Stopped FPC round initiator") log.Infof("Stopped FPC round initiator")
}, shutdown.PriorityFPC) }, shutdown.PriorityFPC); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
} }
// PeerOpinionGiver implements the OpinionGiver interface based on a peer. // PeerOpinionGiver implements the OpinionGiver interface based on a peer.
......
...@@ -61,7 +61,7 @@ func run(_ *node.Plugin) { ...@@ -61,7 +61,7 @@ func run(_ *node.Plugin) {
} }
} }
}, shutdown.PriorityAnalysis); err != nil { }, shutdown.PriorityAnalysis); err != nil {
log.Panic(err) log.Panicf("Failed to start as daemon: %s", err)
} }
} }
......
...@@ -119,7 +119,7 @@ func runAutopeeringFeed() { ...@@ -119,7 +119,7 @@ func runAutopeeringFeed() {
autopeeringWorkerPool.Stop() autopeeringWorkerPool.Stop()
log.Info("Stopping Analysis-Dashboard[AutopeeringVisualizer] ... done") log.Info("Stopping Analysis-Dashboard[AutopeeringVisualizer] ... done")
}, shutdown.PriorityDashboard); err != nil { }, shutdown.PriorityDashboard); err != nil {
panic(err) log.Panicf("Failed to start as daemon: %s", err)
} }
} }
......
...@@ -67,7 +67,7 @@ func runFPCLiveFeed() { ...@@ -67,7 +67,7 @@ func runFPCLiveFeed() {
analysis.Events.FPCHeartbeat.Detach(onFPCHeartbeatReceived) analysis.Events.FPCHeartbeat.Detach(onFPCHeartbeatReceived)
log.Info("Stopping Analysis[FPCUpdater] ... done") log.Info("Stopping Analysis[FPCUpdater] ... done")
}, shutdown.PriorityDashboard); err != nil { }, shutdown.PriorityDashboard); err != nil {
panic(err) log.Panicf("Failed to start as daemon: %s", err)
} }
} }
......
...@@ -63,7 +63,7 @@ func run(*node.Plugin) { ...@@ -63,7 +63,7 @@ func run(*node.Plugin) {
log.Infof("Starting %s ...", PluginName) log.Infof("Starting %s ...", PluginName)
if err := daemon.BackgroundWorker(PluginName, worker, shutdown.PriorityAnalysis); err != nil { if err := daemon.BackgroundWorker(PluginName, worker, shutdown.PriorityAnalysis); err != nil {
log.Errorf("Error starting as daemon: %s", err) log.Panicf("Error starting as daemon: %s", err)
} }
} }
......
...@@ -110,7 +110,7 @@ func configureEventsRecording() { ...@@ -110,7 +110,7 @@ func configureEventsRecording() {
// starts record manager that initiates a record cleanup periodically // starts record manager that initiates a record cleanup periodically
func runEventsRecordManager() { func runEventsRecordManager() {
_ = daemon.BackgroundWorker("Dashboard Analysis Server Record Manager", func(shutdownSignal <-chan struct{}) { if err := daemon.BackgroundWorker("Dashboard Analysis Server Record Manager", func(shutdownSignal <-chan struct{}) {
ticker := time.NewTicker(cleanUpPeriod) ticker := time.NewTicker(cleanUpPeriod)
defer ticker.Stop() defer ticker.Stop()
for { for {
...@@ -121,7 +121,9 @@ func runEventsRecordManager() { ...@@ -121,7 +121,9 @@ func runEventsRecordManager() {
cleanUp(cleanUpPeriod) cleanUp(cleanUpPeriod)
} }
} }
}, shutdown.PriorityAnalysis) }, shutdown.PriorityAnalysis); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
} }
// removes nodes and links we haven't seen for at least 3 times the heartbeat interval. // removes nodes and links we haven't seen for at least 3 times the heartbeat interval.
......
...@@ -69,7 +69,7 @@ func run(_ *node.Plugin) { ...@@ -69,7 +69,7 @@ func run(_ *node.Plugin) {
log.Info("Stopping Server ...") log.Info("Stopping Server ...")
server.Shutdown() server.Shutdown()
}, shutdown.PriorityAnalysis); err != nil { }, shutdown.PriorityAnalysis); err != nil {
log.Panic(err) log.Panicf("Failed to start as daemon: %s", err)
} }
} }
......
...@@ -30,7 +30,7 @@ func configure(*node.Plugin) { ...@@ -30,7 +30,7 @@ func configure(*node.Plugin) {
func run(*node.Plugin) { func run(*node.Plugin) {
if err := daemon.BackgroundWorker(PluginName, start, shutdown.PriorityAutopeering); err != nil { if err := daemon.BackgroundWorker(PluginName, start, shutdown.PriorityAutopeering); err != nil {
log.Errorf("Failed to start as daemon: %s", err) log.Panicf("Failed to start as daemon: %s", err)
} }
} }
......
...@@ -51,7 +51,7 @@ func run(_ *node.Plugin) { ...@@ -51,7 +51,7 @@ func run(_ *node.Plugin) {
issuancePeriod := time.Duration(issuancePeriodSec) * time.Second issuancePeriod := time.Duration(issuancePeriodSec) * time.Second
// issue messages on top of the genesis // issue messages on top of the genesis
_ = daemon.BackgroundWorker("Bootstrapping-Issuer", func(shutdownSignal <-chan struct{}) { if err := daemon.BackgroundWorker("Bootstrapping-Issuer", func(shutdownSignal <-chan struct{}) {
messageSpammer.Start(initialIssuanceMPS) messageSpammer.Start(initialIssuanceMPS)
defer messageSpammer.Shutdown() defer messageSpammer.Shutdown()
// don't stop issuing messages if in continuous mode // don't stop issuing messages if in continuous mode
...@@ -65,5 +65,7 @@ func run(_ *node.Plugin) { ...@@ -65,5 +65,7 @@ func run(_ *node.Plugin) {
case <-time.After(issuancePeriod): case <-time.After(issuancePeriod):
case <-shutdownSignal: case <-shutdownSignal:
} }
}, shutdown.PriorityBootstrap) }, shutdown.PriorityBootstrap); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
} }
...@@ -40,7 +40,7 @@ func configureDrngLiveFeed() { ...@@ -40,7 +40,7 @@ func configureDrngLiveFeed() {
} }
func runDrngLiveFeed() { func runDrngLiveFeed() {
daemon.BackgroundWorker("Dashboard[DRNGUpdater]", func(shutdownSignal <-chan struct{}) { if err := daemon.BackgroundWorker("Dashboard[DRNGUpdater]", func(shutdownSignal <-chan struct{}) {
newMsgRateLimiter := time.NewTicker(time.Second / 10) newMsgRateLimiter := time.NewTicker(time.Second / 10)
defer newMsgRateLimiter.Stop() defer newMsgRateLimiter.Stop()
...@@ -60,5 +60,7 @@ func runDrngLiveFeed() { ...@@ -60,5 +60,7 @@ func runDrngLiveFeed() {
log.Info("Stopping Dashboard[DRNGUpdater] ...") log.Info("Stopping Dashboard[DRNGUpdater] ...")
drng.Instance().Events.Randomness.Detach(notifyNewRandomness) drng.Instance().Events.Randomness.Detach(notifyNewRandomness)
log.Info("Stopping Dashboard[DRNGUpdater] ... done") log.Info("Stopping Dashboard[DRNGUpdater] ... done")
}, shutdown.PriorityDashboard) }, shutdown.PriorityDashboard); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
} }
...@@ -39,7 +39,7 @@ func runLiveFeed() { ...@@ -39,7 +39,7 @@ func runLiveFeed() {
} }
}) })
daemon.BackgroundWorker("Dashboard[MsgUpdater]", func(shutdownSignal <-chan struct{}) { if err := daemon.BackgroundWorker("Dashboard[MsgUpdater]", func(shutdownSignal <-chan struct{}) {
messagelayer.Tangle.Events.MessageAttached.Attach(notifyNewMsg) messagelayer.Tangle.Events.MessageAttached.Attach(notifyNewMsg)
liveFeedWorkerPool.Start() liveFeedWorkerPool.Start()
<-shutdownSignal <-shutdownSignal
...@@ -48,5 +48,7 @@ func runLiveFeed() { ...@@ -48,5 +48,7 @@ func runLiveFeed() {
newMsgRateLimiter.Stop() newMsgRateLimiter.Stop()
liveFeedWorkerPool.Stop() liveFeedWorkerPool.Stop()
log.Info("Stopping Dashboard[MsgUpdater] ... done") log.Info("Stopping Dashboard[MsgUpdater] ... done")
}, shutdown.PriorityDashboard) }, shutdown.PriorityDashboard); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
} }
...@@ -81,7 +81,7 @@ func run(*node.Plugin) { ...@@ -81,7 +81,7 @@ func run(*node.Plugin) {
log.Infof("Starting %s ...", PluginName) log.Infof("Starting %s ...", PluginName)
if err := daemon.BackgroundWorker(PluginName, worker, shutdown.PriorityAnalysis); err != nil { if err := daemon.BackgroundWorker(PluginName, worker, shutdown.PriorityAnalysis); err != nil {
log.Errorf("Error starting as daemon: %s", err) log.Panicf("Error starting as daemon: %s", err)
} }
} }
......
...@@ -79,7 +79,7 @@ func runVisualizer() { ...@@ -79,7 +79,7 @@ func runVisualizer() {
visualizerWorkerPool.TrySubmit(messageId, false) visualizerWorkerPool.TrySubmit(messageId, false)
}) })
daemon.BackgroundWorker("Dashboard[Visualizer]", func(shutdownSignal <-chan struct{}) { if err := daemon.BackgroundWorker("Dashboard[Visualizer]", func(shutdownSignal <-chan struct{}) {
messagelayer.Tangle.Events.MessageAttached.Attach(notifyNewMsg) messagelayer.Tangle.Events.MessageAttached.Attach(notifyNewMsg)
defer messagelayer.Tangle.Events.MessageAttached.Detach(notifyNewMsg) defer messagelayer.Tangle.Events.MessageAttached.Detach(notifyNewMsg)
messagelayer.Tangle.Events.MessageSolid.Attach(notifyNewMsg) messagelayer.Tangle.Events.MessageSolid.Attach(notifyNewMsg)
...@@ -93,5 +93,7 @@ func runVisualizer() { ...@@ -93,5 +93,7 @@ func runVisualizer() {
log.Info("Stopping Dashboard[Visualizer] ...") log.Info("Stopping Dashboard[Visualizer] ...")
visualizerWorkerPool.Stop() visualizerWorkerPool.Stop()
log.Info("Stopping Dashboard[Visualizer] ... done") log.Info("Stopping Dashboard[Visualizer] ... done")
}, shutdown.PriorityDashboard) }, shutdown.PriorityDashboard); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
} }
...@@ -58,7 +58,7 @@ func runWebSocketStreams() { ...@@ -58,7 +58,7 @@ func runWebSocketStreams() {
wsSendWorkerPool.TrySubmit(mps) wsSendWorkerPool.TrySubmit(mps)
}) })
daemon.BackgroundWorker("Dashboard[StatusUpdate]", func(shutdownSignal <-chan struct{}) { if err := daemon.BackgroundWorker("Dashboard[StatusUpdate]", func(shutdownSignal <-chan struct{}) {
metrics.Events.ReceivedMPSUpdated.Attach(updateStatus) metrics.Events.ReceivedMPSUpdated.Attach(updateStatus)
wsSendWorkerPool.Start() wsSendWorkerPool.Start()
<-shutdownSignal <-shutdownSignal
...@@ -66,7 +66,9 @@ func runWebSocketStreams() { ...@@ -66,7 +66,9 @@ func runWebSocketStreams() {
metrics.Events.ReceivedMPSUpdated.Detach(updateStatus) metrics.Events.ReceivedMPSUpdated.Detach(updateStatus)
wsSendWorkerPool.Stop() wsSendWorkerPool.Stop()
log.Info("Stopping Dashboard[StatusUpdate] ... done") log.Info("Stopping Dashboard[StatusUpdate] ... done")
}, shutdown.PriorityDashboard) }, shutdown.PriorityDashboard); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
} }
// reigsters and creates a new websocket client. // reigsters and creates a new websocket client.
......
...@@ -79,7 +79,7 @@ func configure(_ *node.Plugin) { ...@@ -79,7 +79,7 @@ func configure(_ *node.Plugin) {
func run(_ *node.Plugin) { func run(_ *node.Plugin) {
if err := daemon.BackgroundWorker(PluginName+"[GC]", runGC, shutdown.PriorityBadgerGarbageCollection); err != nil { if err := daemon.BackgroundWorker(PluginName+"[GC]", runGC, shutdown.PriorityBadgerGarbageCollection); err != nil {
log.Errorf("Failed to start as daemon: %s", err) log.Panicf("Failed to start as daemon: %s", err)
} }
} }
......
...@@ -35,7 +35,7 @@ func configure(*node.Plugin) { ...@@ -35,7 +35,7 @@ func configure(*node.Plugin) {
func run(*node.Plugin) { func run(*node.Plugin) {
if err := daemon.BackgroundWorker(PluginName, start, shutdown.PriorityGossip); err != nil { if err := daemon.BackgroundWorker(PluginName, start, shutdown.PriorityGossip); err != nil {
log.Errorf("Failed to start as daemon: %s", err) log.Panicf("Failed to start as daemon: %s", err)
} }
} }
......
...@@ -73,15 +73,19 @@ func configure(*node.Plugin) { ...@@ -73,15 +73,19 @@ func configure(*node.Plugin) {
} }
func run(*node.Plugin) { func run(*node.Plugin) {
_ = daemon.BackgroundWorker("Tangle[MissingMessagesMonitor]", func(shutdownSignal <-chan struct{}) { if err := daemon.BackgroundWorker("Tangle[MissingMessagesMonitor]", func(shutdownSignal <-chan struct{}) {
Tangle.MonitorMissingMessages(shutdownSignal) Tangle.MonitorMissingMessages(shutdownSignal)
}, shutdown.PriorityMissingMessagesMonitoring) }, shutdown.PriorityMissingMessagesMonitoring); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
_ = daemon.BackgroundWorker("Tangle", func(shutdownSignal <-chan struct{}) { if err := daemon.BackgroundWorker("Tangle", func(shutdownSignal <-chan struct{}) {
<-shutdownSignal <-shutdownSignal
MessageFactory.Shutdown() MessageFactory.Shutdown()
MessageParser.Shutdown() MessageParser.Shutdown()
Tangle.Shutdown() Tangle.Shutdown()
}, shutdown.PriorityTangle) }, shutdown.PriorityTangle); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
} }
...@@ -3,15 +3,15 @@ package metrics ...@@ -3,15 +3,15 @@ package metrics
import ( import (
"time" "time"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/node"
"github.com/iotaledger/hive.go/timeutil"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/tangle" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/tangle"
"github.com/iotaledger/goshimmer/packages/shutdown" "github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/messagelayer" "github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
"github.com/iotaledger/hive.go/timeutil"
) )
// PluginName is the name of the metrics plugin. // PluginName is the name of the metrics plugin.
...@@ -20,7 +20,10 @@ const PluginName = "Metrics" ...@@ -20,7 +20,10 @@ const PluginName = "Metrics"
// Plugin is the plugin instance of the metrics plugin. // Plugin is the plugin instance of the metrics plugin.
var Plugin = node.NewPlugin(PluginName, node.Enabled, configure, run) var Plugin = node.NewPlugin(PluginName, node.Enabled, configure, run)
var log *logger.Logger
func configure(_ *node.Plugin) { func configure(_ *node.Plugin) {
log = logger.NewLogger(PluginName)
// increase received MPS counter whenever we attached a message // increase received MPS counter whenever we attached a message
messagelayer.Tangle.Events.MessageAttached.Attach(events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) { messagelayer.Tangle.Events.MessageAttached.Attach(events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) {
cachedMessage.Release() cachedMessage.Release()
...@@ -31,7 +34,9 @@ func configure(_ *node.Plugin) { ...@@ -31,7 +34,9 @@ func configure(_ *node.Plugin) {
func run(_ *node.Plugin) { func run(_ *node.Plugin) {
// create a background worker that "measures" the MPS value every second // create a background worker that "measures" the MPS value every second
daemon.BackgroundWorker("Metrics MPS Updater", func(shutdownSignal <-chan struct{}) { if err := daemon.BackgroundWorker("Metrics MPS Updater", func(shutdownSignal <-chan struct{}) {
timeutil.Ticker(measureReceivedMPS, 1*time.Second, shutdownSignal) timeutil.Ticker(measureReceivedMPS, 1*time.Second, shutdownSignal)
}, shutdown.PriorityMetrics) }, shutdown.PriorityMetrics); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
} }
...@@ -95,7 +95,7 @@ func run(plugin *node.Plugin) { ...@@ -95,7 +95,7 @@ func run(plugin *node.Plugin) {
workerPool.TrySubmit(level, name, msg) workerPool.TrySubmit(level, name, msg)
}) })
daemon.BackgroundWorker(PluginName, func(shutdownSignal <-chan struct{}) { if err := daemon.BackgroundWorker(PluginName, func(shutdownSignal <-chan struct{}) {
logger.Events.AnyMsg.Attach(logEvent) logger.Events.AnyMsg.Attach(logEvent)
workerPool.Start() workerPool.Start()
<-shutdownSignal <-shutdownSignal
...@@ -103,7 +103,9 @@ func run(plugin *node.Plugin) { ...@@ -103,7 +103,9 @@ func run(plugin *node.Plugin) {
logger.Events.AnyMsg.Detach(logEvent) logger.Events.AnyMsg.Detach(logEvent)
workerPool.Stop() workerPool.Stop()
log.Infof("Stopping %s ... done", PluginName) log.Infof("Stopping %s ... done", PluginName)
}, shutdown.PriorityRemoteLog) }, shutdown.PriorityRemoteLog); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
} }
func sendLogMsg(level logger.Level, name string, msg string) { func sendLogMsg(level logger.Level, name string, msg string) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment