diff --git a/packages/binary/messagelayer/tangle/tangle.go b/packages/binary/messagelayer/tangle/tangle.go index 8d3f056af3adc01cfebd2429e90a55a13d691929..702963c2239a04fca662204426fba4cf4ddef2d2 100644 --- a/packages/binary/messagelayer/tangle/tangle.go +++ b/packages/binary/messagelayer/tangle/tangle.go @@ -34,7 +34,7 @@ type Tangle struct { storeMessageWorkerPool async.WorkerPool solidifierWorkerPool async.WorkerPool - cleanupWorkerPool async.WorkerPool + shutdown chan struct{} } func messageFactory(key []byte) (objectstorage.StorableObject, int, error) { @@ -54,6 +54,7 @@ func New(badgerInstance *badger.DB) (result *Tangle) { osFactory := objectstorage.NewFactory(badgerInstance, storageprefix.MessageLayer) result = &Tangle{ + shutdown: make(chan struct{}), messageStorage: osFactory.New(PrefixMessage, messageFactory, objectstorage.CacheTime(10*time.Second), objectstorage.LeakDetectionEnabled(false)), messageMetadataStorage: osFactory.New(PrefixMessageMetadata, MessageMetadataFromStorageKey, objectstorage.CacheTime(10*time.Second), objectstorage.LeakDetectionEnabled(false)), approverStorage: osFactory.New(PrefixApprovers, approverFactory, objectstorage.CacheTime(10*time.Second), objectstorage.PartitionKey(message.IdLength, message.IdLength), objectstorage.LeakDetectionEnabled(false)), @@ -114,12 +115,12 @@ func (tangle *Tangle) DeleteMessage(messageId message.Id) { func (tangle *Tangle) Shutdown() *Tangle { tangle.storeMessageWorkerPool.ShutdownGracefully() tangle.solidifierWorkerPool.ShutdownGracefully() - tangle.cleanupWorkerPool.ShutdownGracefully() tangle.messageStorage.Shutdown() tangle.messageMetadataStorage.Shutdown() tangle.approverStorage.Shutdown() tangle.missingMessageStorage.Shutdown() + close(tangle.shutdown) return tangle } @@ -190,8 +191,7 @@ func (tangle *Tangle) isMessageMarkedAsSolid(messageId message.Id) bool { missingMessage := NewMissingMessage(messageId) if cachedMissingMessage, stored := tangle.missingMessageStorage.StoreIfAbsent(missingMessage); stored { cachedMissingMessage.Consume(func(object objectstorage.StorableObject) { - // TODO: perhaps make it a separate worker for better efficiency - go tangle.monitorMissingMessage(object.(*MissingMessage).MessageId()) + tangle.Events.MessageMissing.Trigger(messageId) }) } return false @@ -265,29 +265,34 @@ func (tangle *Tangle) checkMessageSolidityAndPropagate(cachedMessage *message.Ca } } -// periodically checks whether the given message is missing and un-marks it as missing -// if it didn't become known after MaxMissingTimeBeforeCleanup. note that the message is not deleted -// from the missing message storage within this function but up on arrival. -func (tangle *Tangle) monitorMissingMessage(messageId message.Id) { - tangle.Events.MessageMissing.Trigger(messageId) +// MonitorMissingMessages continuously monitors for missing messages and eventually deletes them if they +// don't become available in a certain time frame. +func (tangle *Tangle) MonitorMissingMessages(shutdownSignal <-chan struct{}) { reCheckInterval := time.NewTicker(MissingCheckInterval) defer reCheckInterval.Stop() - for range reCheckInterval.C { - tangle.missingMessageStorage.Load(messageId[:]).Consume(func(object objectstorage.StorableObject) { - missingMessage := object.(*MissingMessage) - - // check whether message is missing since over our max time delta - if time.Since(missingMessage.MissingSince()) >= MaxMissingTimeBeforeCleanup { - tangle.cleanupWorkerPool.Submit(func() { - tangle.Events.MessageUnsolidifiable.Trigger(messageId) - // delete the future cone of the missing message - tangle.deleteFutureCone(missingMessage.MessageId()) - }) - return + for { + select { + case <-reCheckInterval.C: + var toDelete []message.Id + tangle.missingMessageStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool { + defer cachedObject.Release() + missingMessage := cachedObject.Get().(*MissingMessage) + + // check whether message is missing since over our max time delta + if time.Since(missingMessage.MissingSince()) >= MaxMissingTimeBeforeCleanup { + toDelete = append(toDelete, missingMessage.MessageId()) + } + return true + }) + for _, msgID := range toDelete { + // delete the future cone of the missing message + tangle.Events.MessageUnsolidifiable.Trigger(msgID) + // TODO: obvious race condition between receiving the message and it getting deleted here + tangle.deleteFutureCone(msgID) } - - // TODO: should a StillMissing event be triggered? - }) + case <-shutdownSignal: + return + } } } diff --git a/packages/shutdown/order.go b/packages/shutdown/order.go index ff483327a38a2fa48dcfa7b1417e07ca4c050c2b..5a4288ee18096343bb2363ff79ed69b467b32fb4 100644 --- a/packages/shutdown/order.go +++ b/packages/shutdown/order.go @@ -4,6 +4,7 @@ const ( PriorityDatabase = iota PriorityFPC PriorityTangle + PriorityMissingMessagesMonitoring PriorityRemoteLog PriorityAnalysis PriorityMetrics diff --git a/plugins/messagelayer/plugin.go b/plugins/messagelayer/plugin.go index 3aaed4c30318351bc48bd1872dea6cc0477f912f..9541eff4a806e2442095e21880b681042bcddb5f 100644 --- a/plugins/messagelayer/plugin.go +++ b/plugins/messagelayer/plugin.go @@ -72,10 +72,15 @@ func configure(*node.Plugin) { } func run(*node.Plugin) { + _ = daemon.BackgroundWorker("Tangle[MissingMessagesMonitor]", func(shutdownSignal <-chan struct{}) { + Tangle.MonitorMissingMessages(shutdownSignal) + }, shutdown.PriorityMissingMessagesMonitoring) + _ = daemon.BackgroundWorker("Tangle", func(shutdownSignal <-chan struct{}) { <-shutdownSignal MessageFactory.Shutdown() MessageParser.Shutdown() Tangle.Shutdown() }, shutdown.PriorityTangle) + }