Skip to content
Snippets Groups Projects
Unverified Commit db5b9068 authored by Luca Moser's avatar Luca Moser Committed by GitHub
Browse files

spawn a background worker to handle missing messages (#421)

parent 7841117f
No related branches found
No related tags found
No related merge requests found
...@@ -34,7 +34,7 @@ type Tangle struct { ...@@ -34,7 +34,7 @@ type Tangle struct {
storeMessageWorkerPool async.WorkerPool storeMessageWorkerPool async.WorkerPool
solidifierWorkerPool async.WorkerPool solidifierWorkerPool async.WorkerPool
cleanupWorkerPool async.WorkerPool shutdown chan struct{}
} }
func messageFactory(key []byte) (objectstorage.StorableObject, int, error) { func messageFactory(key []byte) (objectstorage.StorableObject, int, error) {
...@@ -54,6 +54,7 @@ func New(badgerInstance *badger.DB) (result *Tangle) { ...@@ -54,6 +54,7 @@ func New(badgerInstance *badger.DB) (result *Tangle) {
osFactory := objectstorage.NewFactory(badgerInstance, storageprefix.MessageLayer) osFactory := objectstorage.NewFactory(badgerInstance, storageprefix.MessageLayer)
result = &Tangle{ result = &Tangle{
shutdown: make(chan struct{}),
messageStorage: osFactory.New(PrefixMessage, messageFactory, objectstorage.CacheTime(10*time.Second), objectstorage.LeakDetectionEnabled(false)), messageStorage: osFactory.New(PrefixMessage, messageFactory, objectstorage.CacheTime(10*time.Second), objectstorage.LeakDetectionEnabled(false)),
messageMetadataStorage: osFactory.New(PrefixMessageMetadata, MessageMetadataFromStorageKey, objectstorage.CacheTime(10*time.Second), objectstorage.LeakDetectionEnabled(false)), messageMetadataStorage: osFactory.New(PrefixMessageMetadata, MessageMetadataFromStorageKey, objectstorage.CacheTime(10*time.Second), objectstorage.LeakDetectionEnabled(false)),
approverStorage: osFactory.New(PrefixApprovers, approverFactory, objectstorage.CacheTime(10*time.Second), objectstorage.PartitionKey(message.IdLength, message.IdLength), objectstorage.LeakDetectionEnabled(false)), approverStorage: osFactory.New(PrefixApprovers, approverFactory, objectstorage.CacheTime(10*time.Second), objectstorage.PartitionKey(message.IdLength, message.IdLength), objectstorage.LeakDetectionEnabled(false)),
...@@ -114,12 +115,12 @@ func (tangle *Tangle) DeleteMessage(messageId message.Id) { ...@@ -114,12 +115,12 @@ func (tangle *Tangle) DeleteMessage(messageId message.Id) {
func (tangle *Tangle) Shutdown() *Tangle { func (tangle *Tangle) Shutdown() *Tangle {
tangle.storeMessageWorkerPool.ShutdownGracefully() tangle.storeMessageWorkerPool.ShutdownGracefully()
tangle.solidifierWorkerPool.ShutdownGracefully() tangle.solidifierWorkerPool.ShutdownGracefully()
tangle.cleanupWorkerPool.ShutdownGracefully()
tangle.messageStorage.Shutdown() tangle.messageStorage.Shutdown()
tangle.messageMetadataStorage.Shutdown() tangle.messageMetadataStorage.Shutdown()
tangle.approverStorage.Shutdown() tangle.approverStorage.Shutdown()
tangle.missingMessageStorage.Shutdown() tangle.missingMessageStorage.Shutdown()
close(tangle.shutdown)
return tangle return tangle
} }
...@@ -190,8 +191,7 @@ func (tangle *Tangle) isMessageMarkedAsSolid(messageId message.Id) bool { ...@@ -190,8 +191,7 @@ func (tangle *Tangle) isMessageMarkedAsSolid(messageId message.Id) bool {
missingMessage := NewMissingMessage(messageId) missingMessage := NewMissingMessage(messageId)
if cachedMissingMessage, stored := tangle.missingMessageStorage.StoreIfAbsent(missingMessage); stored { if cachedMissingMessage, stored := tangle.missingMessageStorage.StoreIfAbsent(missingMessage); stored {
cachedMissingMessage.Consume(func(object objectstorage.StorableObject) { cachedMissingMessage.Consume(func(object objectstorage.StorableObject) {
// TODO: perhaps make it a separate worker for better efficiency tangle.Events.MessageMissing.Trigger(messageId)
go tangle.monitorMissingMessage(object.(*MissingMessage).MessageId())
}) })
} }
return false return false
...@@ -265,29 +265,34 @@ func (tangle *Tangle) checkMessageSolidityAndPropagate(cachedMessage *message.Ca ...@@ -265,29 +265,34 @@ func (tangle *Tangle) checkMessageSolidityAndPropagate(cachedMessage *message.Ca
} }
} }
// periodically checks whether the given message is missing and un-marks it as missing // MonitorMissingMessages continuously monitors for missing messages and eventually deletes them if they
// if it didn't become known after MaxMissingTimeBeforeCleanup. note that the message is not deleted // don't become available in a certain time frame.
// from the missing message storage within this function but up on arrival. func (tangle *Tangle) MonitorMissingMessages(shutdownSignal <-chan struct{}) {
func (tangle *Tangle) monitorMissingMessage(messageId message.Id) {
tangle.Events.MessageMissing.Trigger(messageId)
reCheckInterval := time.NewTicker(MissingCheckInterval) reCheckInterval := time.NewTicker(MissingCheckInterval)
defer reCheckInterval.Stop() defer reCheckInterval.Stop()
for range reCheckInterval.C { for {
tangle.missingMessageStorage.Load(messageId[:]).Consume(func(object objectstorage.StorableObject) { select {
missingMessage := object.(*MissingMessage) case <-reCheckInterval.C:
var toDelete []message.Id
// check whether message is missing since over our max time delta tangle.missingMessageStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool {
if time.Since(missingMessage.MissingSince()) >= MaxMissingTimeBeforeCleanup { defer cachedObject.Release()
tangle.cleanupWorkerPool.Submit(func() { missingMessage := cachedObject.Get().(*MissingMessage)
tangle.Events.MessageUnsolidifiable.Trigger(messageId)
// delete the future cone of the missing message // check whether message is missing since over our max time delta
tangle.deleteFutureCone(missingMessage.MessageId()) if time.Since(missingMessage.MissingSince()) >= MaxMissingTimeBeforeCleanup {
}) toDelete = append(toDelete, missingMessage.MessageId())
return }
return true
})
for _, msgID := range toDelete {
// delete the future cone of the missing message
tangle.Events.MessageUnsolidifiable.Trigger(msgID)
// TODO: obvious race condition between receiving the message and it getting deleted here
tangle.deleteFutureCone(msgID)
} }
case <-shutdownSignal:
// TODO: should a StillMissing event be triggered? return
}) }
} }
} }
......
...@@ -4,6 +4,7 @@ const ( ...@@ -4,6 +4,7 @@ const (
PriorityDatabase = iota PriorityDatabase = iota
PriorityFPC PriorityFPC
PriorityTangle PriorityTangle
PriorityMissingMessagesMonitoring
PriorityRemoteLog PriorityRemoteLog
PriorityAnalysis PriorityAnalysis
PriorityMetrics PriorityMetrics
......
...@@ -72,10 +72,15 @@ func configure(*node.Plugin) { ...@@ -72,10 +72,15 @@ func configure(*node.Plugin) {
} }
func run(*node.Plugin) { func run(*node.Plugin) {
_ = daemon.BackgroundWorker("Tangle[MissingMessagesMonitor]", func(shutdownSignal <-chan struct{}) {
Tangle.MonitorMissingMessages(shutdownSignal)
}, shutdown.PriorityMissingMessagesMonitoring)
_ = daemon.BackgroundWorker("Tangle", func(shutdownSignal <-chan struct{}) { _ = daemon.BackgroundWorker("Tangle", func(shutdownSignal <-chan struct{}) {
<-shutdownSignal <-shutdownSignal
MessageFactory.Shutdown() MessageFactory.Shutdown()
MessageParser.Shutdown() MessageParser.Shutdown()
Tangle.Shutdown() Tangle.Shutdown()
}, shutdown.PriorityTangle) }, shutdown.PriorityTangle)
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment