Skip to content
Snippets Groups Projects
Unverified Commit 4a9e67f6 authored by Wolfgang Welz's avatar Wolfgang Welz
Browse files

remove MonitorMissingMessages

unsolidifiable message are never removed
parent 89186c7c
No related branches found
No related tags found
No related merge requests found
......@@ -5,24 +5,15 @@ import (
"runtime"
"time"
"github.com/iotaledger/hive.go/kvstore"
"github.com/iotaledger/hive.go/types"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
"github.com/iotaledger/goshimmer/packages/binary/storageprefix"
"github.com/iotaledger/hive.go/async"
"github.com/iotaledger/hive.go/kvstore"
"github.com/iotaledger/hive.go/objectstorage"
"github.com/iotaledger/hive.go/types"
)
const (
// MaxMissingTimeBeforeCleanup is the max. amount of time a message can be marked as missing
// before it is ultimately un-marked as missing.
MaxMissingTimeBeforeCleanup = 30 * time.Second
// MissingCheckInterval is the interval on which it is checked whether a missing
// message is still missing.
MissingCheckInterval = 5 * time.Second
cacheTime = 20 * time.Second
)
......@@ -271,46 +262,6 @@ func (tangle *Tangle) checkMessageSolidityAndPropagate(cachedMessage *message.Ca
}
}
// MonitorMissingMessages continuously monitors for missing messages and eventually deletes them if they
// don't become available in a certain time frame.
func (tangle *Tangle) MonitorMissingMessages(shutdownSignal <-chan struct{}) {
reCheckInterval := time.NewTicker(MissingCheckInterval)
defer reCheckInterval.Stop()
for {
select {
case <-reCheckInterval.C:
var toDelete []message.Id
var toUnmark []message.Id
tangle.missingMessageStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool {
defer cachedObject.Release()
missingMessage := cachedObject.Get().(*MissingMessage)
if tangle.messageStorage.Contains(missingMessage.messageId.Bytes()) {
toUnmark = append(toUnmark, missingMessage.MessageId())
return true
}
// check whether message is missing since over our max time delta
if time.Since(missingMessage.MissingSince()) >= MaxMissingTimeBeforeCleanup {
toDelete = append(toDelete, missingMessage.MessageId())
}
return true
})
for _, msgID := range toUnmark {
tangle.missingMessageStorage.DeleteIfPresent(msgID.Bytes())
}
for _, msgID := range toDelete {
// delete the future cone of the missing message
tangle.Events.MessageUnsolidifiable.Trigger(msgID)
// TODO: obvious race condition between receiving the message and it getting deleted here
tangle.deleteFutureCone(msgID)
}
case <-shutdownSignal:
return
}
}
}
// deletes the given approver association for the given approvee to its approver.
func (tangle *Tangle) deleteApprover(approvedMessageId message.Id, approvingMessage message.Id) {
idToDelete := make([]byte, message.IdLength+message.IdLength)
......
......@@ -129,13 +129,6 @@ func configure(*node.Plugin) {
}
func run(*node.Plugin) {
if err := daemon.BackgroundWorker("Tangle[MissingMessagesMonitor]", func(shutdownSignal <-chan struct{}) {
_tangle.MonitorMissingMessages(shutdownSignal)
}, shutdown.PriorityMissingMessagesMonitoring); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
if err := daemon.BackgroundWorker("Tangle", func(shutdownSignal <-chan struct{}) {
<-shutdownSignal
messageFactory.Shutdown()
......@@ -144,5 +137,4 @@ func run(*node.Plugin) {
}, shutdown.PriorityTangle); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment