Skip to content
Snippets Groups Projects
Unverified Commit a1def20e authored by Jonas Theis's avatar Jonas Theis Committed by GitHub
Browse files

Fix missing message mystery (#669)

* Missing message investigation

* Delete suddenly appeared messages

* :construction: Add IsDeleted check

* :construction: Undo deletion

* Add metadata check to messageExists

* :bug: Fix bug

* :construction: Move sendRequest and ensure timer is stopped

* :construction: Remove count check

* :construction: Undo request deletion and only use StopRequest

* :construction: Move MissingMessageAppeared to StopRequest

* :construction: Force MissingMessageReceived triggering

* :construction: Enable deadlock mutex for debug

* :construction: refactor reRequest lock-unlock

* :construction: re-enable messageExists check

* :construction: Remove waiting for the timer to be stopped

* :construction: Re-enable count

* :construction: Move IO access out of the lock

* :construction: Remove MissingMessageAppeared

* :poop: Remove messageExistsFunc check

* :poop::poop: Refactor reRequest

* :art: Increase threshold and remove print

* :rotating_light:

 Fix linter warnings

Co-authored-by: default avatarLevente Pap <levente.pap@iota.org>
Co-authored-by: default avatarcapossele <angelocapossele@gmail.com>
parent b1b78a60
No related branches found
No related tags found
No related merge requests found
......@@ -8,4 +8,6 @@ import (
type Events struct {
// Fired when a request for a given message should be sent.
SendRequest *events.Event
// MissingMessageAppeared is triggered when a message is actually present in the node's db although it was still being requested.
MissingMessageAppeared *events.Event
}
......@@ -8,24 +8,34 @@ import (
"github.com/iotaledger/hive.go/events"
)
const messageExistCheckThreshold = 21
// MessageRequester takes care of requesting messages.
type MessageRequester struct {
scheduledRequests map[message.Id]*time.Timer
options *Options
messageExistsFunc MessageExistsFunc
Events Events
scheduledRequestsMutex sync.RWMutex
}
// MessageExistsFunc is a function that tells if a message exists.
type MessageExistsFunc func(messageId message.Id) bool
// New creates a new message requester.
func New(optionalOptions ...Option) *MessageRequester {
func New(messageExists MessageExistsFunc, optionalOptions ...Option) *MessageRequester {
return &MessageRequester{
scheduledRequests: make(map[message.Id]*time.Timer),
options: newOptions(optionalOptions),
messageExistsFunc: messageExists,
Events: Events{
SendRequest: events.NewEvent(func(handler interface{}, params ...interface{}) {
handler.(func(message.Id))(params[0].(message.Id))
}),
MissingMessageAppeared: events.NewEvent(func(handler interface{}, params ...interface{}) {
handler.(func(message.Id))(params[0].(message.Id))
}),
},
}
}
......@@ -41,7 +51,7 @@ func (requester *MessageRequester) StartRequest(id message.Id) {
}
// schedule the next request and trigger the event
requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, func() { requester.reRequest(id) })
requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, func() { requester.reRequest(id, 0) })
requester.scheduledRequestsMutex.Unlock()
requester.Events.SendRequest.Trigger(id)
}
......@@ -57,16 +67,26 @@ func (requester *MessageRequester) StopRequest(id message.Id) {
}
}
func (requester *MessageRequester) reRequest(id message.Id) {
// as we schedule a request at most once per id we do not need to make the trigger and the re-schedule atomic
func (requester *MessageRequester) reRequest(id message.Id, count int) {
requester.Events.SendRequest.Trigger(id)
count++
stopRequest := count > messageExistCheckThreshold && requester.messageExistsFunc(id)
// as we schedule a request at most once per id we do not need to make the trigger and the re-schedule atomic
requester.scheduledRequestsMutex.Lock()
defer requester.scheduledRequestsMutex.Unlock()
// reschedule, if the request has not been stopped in the meantime
if _, exists := requester.scheduledRequests[id]; exists {
requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, func() { requester.reRequest(id) })
if stopRequest {
// if found message tangle: stop request and delete from missingMessageStorage (via event)
delete(requester.scheduledRequests, id)
requester.Events.MissingMessageAppeared.Trigger(id)
return
}
requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, func() { requester.reRequest(id, count) })
return
}
}
......
......@@ -105,6 +105,11 @@ func (tangle *Tangle) DeleteMessage(messageId message.Id) {
})
}
// DeleteMissingMessage deletes a message from the missingMessageStorage.
func (tangle *Tangle) DeleteMissingMessage(messageID message.Id) {
tangle.missingMessageStorage.Delete(messageID[:])
}
// Shutdown marks the tangle as stopped, so it will not accept any new messages (waits for all backgroundTasks to finish).
func (tangle *Tangle) Shutdown() *Tangle {
tangle.storeMessageWorkerPool.ShutdownGracefully()
......@@ -164,6 +169,20 @@ func (tangle *Tangle) DBStats() (solidCount int, messageCount int, avgSolidifica
return
}
// MissingMessages return the ids of messages in missingMessageStorage
func (tangle *Tangle) MissingMessages() (ids []message.Id) {
tangle.missingMessageStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool {
cachedObject.Consume(func(object objectstorage.StorableObject) {
missingMsg := object.(*MissingMessage)
if !missingMsg.IsDeleted() {
ids = append(ids, missingMsg.messageId)
}
})
return true
})
return
}
// worker that stores the message and calls the corresponding storage events.
func (tangle *Tangle) storeMessageWorker(msg *message.Message) {
// store message
......@@ -191,6 +210,7 @@ func (tangle *Tangle) storeMessageWorker(msg *message.Message) {
if tangle.missingMessageStorage.DeleteIfPresent(messageId[:]) {
tangle.Events.MissingMessageReceived.Trigger(cachedMessage, cachedMsgMetadata)
}
tangle.Events.MessageAttached.Trigger(cachedMessage, cachedMsgMetadata)
// check message solidity
......
......@@ -85,7 +85,7 @@ func MessageFactory() *messagefactory.MessageFactory {
// MessageRequester gets the messageRequester instance.
func MessageRequester() *messagerequester.MessageRequester {
msgReqOnce.Do(func() {
messageRequester = messagerequester.New()
messageRequester = messagerequester.New(messageExists)
})
return messageRequester
}
......@@ -126,6 +126,10 @@ func configure(*node.Plugin) {
cachedMessageMetadata.Release()
cachedMessage.Consume(tipSelector.AddTip)
}))
MessageRequester().Events.MissingMessageAppeared.Attach(events.NewClosure(func(id message.Id) {
_tangle.DeleteMissingMessage(id)
}))
}
func run(*node.Plugin) {
......@@ -137,3 +141,10 @@ func run(*node.Plugin) {
log.Panicf("Failed to start as daemon: %s", err)
}
}
// messageExists tells if a given message is present in the node
func messageExists(msgID message.Id) bool {
cachedMessage := Tangle().Message(msgID)
defer cachedMessage.Release()
return cachedMessage.Exists()
}
......@@ -35,6 +35,7 @@ func Plugin() *node.Plugin {
func configure(_ *node.Plugin) {
log = logger.NewLogger(PluginName)
webapi.Server().GET("tools/pastcone", pastCone)
webapi.Server().GET("tools/missing", missing)
}
func pastCone(c echo.Context) error {
......@@ -114,3 +115,19 @@ type PastConeResponse struct {
PastConeSize int `json:"pastConeSize,omitempty"`
Error string `json:"error,omitempty"`
}
func missing(c echo.Context) error {
res := &MissingResponse{}
missingIDs := messagelayer.Tangle().MissingMessages()
for _, msg := range missingIDs {
res.IDs = append(res.IDs, msg.String())
}
res.Count = len(missingIDs)
return c.JSON(http.StatusOK, res)
}
// MissingResponse is the HTTP response containing all the missing messages and their count.
type MissingResponse struct {
IDs []string `json:"ids,omitempty"`
Count int `json:"count,omitempty"`
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment