diff --git a/packages/binary/messagelayer/messagerequester/events.go b/packages/binary/messagelayer/messagerequester/events.go index 77e9ed89c73805d7686a9431ac7a78e813b5fa2e..d5c780558eef04c622631176e8bfaaf9192ebc94 100644 --- a/packages/binary/messagelayer/messagerequester/events.go +++ b/packages/binary/messagelayer/messagerequester/events.go @@ -8,4 +8,6 @@ import ( type Events struct { // Fired when a request for a given message should be sent. SendRequest *events.Event + // MissingMessageAppeared is triggered when a message is actually present in the node's db although it was still being requested. + MissingMessageAppeared *events.Event } diff --git a/packages/binary/messagelayer/messagerequester/messagerequester.go b/packages/binary/messagelayer/messagerequester/messagerequester.go index c202696e6be3c3242e92fb618f4c1067d65ab477..0f161a5ae84690d6e3015fe3ed664f60d9a85a99 100644 --- a/packages/binary/messagelayer/messagerequester/messagerequester.go +++ b/packages/binary/messagelayer/messagerequester/messagerequester.go @@ -8,24 +8,34 @@ import ( "github.com/iotaledger/hive.go/events" ) +const messageExistCheckThreshold = 21 + // MessageRequester takes care of requesting messages. type MessageRequester struct { scheduledRequests map[message.Id]*time.Timer options *Options + messageExistsFunc MessageExistsFunc Events Events scheduledRequestsMutex sync.RWMutex } +// MessageExistsFunc is a function that tells if a message exists. +type MessageExistsFunc func(messageId message.Id) bool + // New creates a new message requester. -func New(optionalOptions ...Option) *MessageRequester { +func New(messageExists MessageExistsFunc, optionalOptions ...Option) *MessageRequester { return &MessageRequester{ scheduledRequests: make(map[message.Id]*time.Timer), options: newOptions(optionalOptions), + messageExistsFunc: messageExists, Events: Events{ SendRequest: events.NewEvent(func(handler interface{}, params ...interface{}) { handler.(func(message.Id))(params[0].(message.Id)) }), + MissingMessageAppeared: events.NewEvent(func(handler interface{}, params ...interface{}) { + handler.(func(message.Id))(params[0].(message.Id)) + }), }, } } @@ -41,7 +51,7 @@ func (requester *MessageRequester) StartRequest(id message.Id) { } // schedule the next request and trigger the event - requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, func() { requester.reRequest(id) }) + requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, func() { requester.reRequest(id, 0) }) requester.scheduledRequestsMutex.Unlock() requester.Events.SendRequest.Trigger(id) } @@ -57,16 +67,26 @@ func (requester *MessageRequester) StopRequest(id message.Id) { } } -func (requester *MessageRequester) reRequest(id message.Id) { - // as we schedule a request at most once per id we do not need to make the trigger and the re-schedule atomic +func (requester *MessageRequester) reRequest(id message.Id, count int) { requester.Events.SendRequest.Trigger(id) + count++ + stopRequest := count > messageExistCheckThreshold && requester.messageExistsFunc(id) + + // as we schedule a request at most once per id we do not need to make the trigger and the re-schedule atomic requester.scheduledRequestsMutex.Lock() defer requester.scheduledRequestsMutex.Unlock() // reschedule, if the request has not been stopped in the meantime if _, exists := requester.scheduledRequests[id]; exists { - requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, func() { requester.reRequest(id) }) + if stopRequest { + // if found message tangle: stop request and delete from missingMessageStorage (via event) + delete(requester.scheduledRequests, id) + requester.Events.MissingMessageAppeared.Trigger(id) + return + } + requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, func() { requester.reRequest(id, count) }) + return } } diff --git a/packages/binary/messagelayer/tangle/tangle.go b/packages/binary/messagelayer/tangle/tangle.go index 3b27dfec94501e63303ec17519908878d1d87fce..d59075c549995a0394d6f2c32cd5757d27082af5 100644 --- a/packages/binary/messagelayer/tangle/tangle.go +++ b/packages/binary/messagelayer/tangle/tangle.go @@ -105,6 +105,11 @@ func (tangle *Tangle) DeleteMessage(messageId message.Id) { }) } +// DeleteMissingMessage deletes a message from the missingMessageStorage. +func (tangle *Tangle) DeleteMissingMessage(messageID message.Id) { + tangle.missingMessageStorage.Delete(messageID[:]) +} + // Shutdown marks the tangle as stopped, so it will not accept any new messages (waits for all backgroundTasks to finish). func (tangle *Tangle) Shutdown() *Tangle { tangle.storeMessageWorkerPool.ShutdownGracefully() @@ -164,6 +169,20 @@ func (tangle *Tangle) DBStats() (solidCount int, messageCount int, avgSolidifica return } +// MissingMessages return the ids of messages in missingMessageStorage +func (tangle *Tangle) MissingMessages() (ids []message.Id) { + tangle.missingMessageStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool { + cachedObject.Consume(func(object objectstorage.StorableObject) { + missingMsg := object.(*MissingMessage) + if !missingMsg.IsDeleted() { + ids = append(ids, missingMsg.messageId) + } + }) + return true + }) + return +} + // worker that stores the message and calls the corresponding storage events. func (tangle *Tangle) storeMessageWorker(msg *message.Message) { // store message @@ -191,6 +210,7 @@ func (tangle *Tangle) storeMessageWorker(msg *message.Message) { if tangle.missingMessageStorage.DeleteIfPresent(messageId[:]) { tangle.Events.MissingMessageReceived.Trigger(cachedMessage, cachedMsgMetadata) } + tangle.Events.MessageAttached.Trigger(cachedMessage, cachedMsgMetadata) // check message solidity diff --git a/plugins/messagelayer/plugin.go b/plugins/messagelayer/plugin.go index 23967033e30475adf6cfb8bb3c09e80f3a97a0f4..7deccd078be0e7f28d1c175db71a424e5d768518 100644 --- a/plugins/messagelayer/plugin.go +++ b/plugins/messagelayer/plugin.go @@ -85,7 +85,7 @@ func MessageFactory() *messagefactory.MessageFactory { // MessageRequester gets the messageRequester instance. func MessageRequester() *messagerequester.MessageRequester { msgReqOnce.Do(func() { - messageRequester = messagerequester.New() + messageRequester = messagerequester.New(messageExists) }) return messageRequester } @@ -126,6 +126,10 @@ func configure(*node.Plugin) { cachedMessageMetadata.Release() cachedMessage.Consume(tipSelector.AddTip) })) + + MessageRequester().Events.MissingMessageAppeared.Attach(events.NewClosure(func(id message.Id) { + _tangle.DeleteMissingMessage(id) + })) } func run(*node.Plugin) { @@ -137,3 +141,10 @@ func run(*node.Plugin) { log.Panicf("Failed to start as daemon: %s", err) } } + +// messageExists tells if a given message is present in the node +func messageExists(msgID message.Id) bool { + cachedMessage := Tangle().Message(msgID) + defer cachedMessage.Release() + return cachedMessage.Exists() +} diff --git a/plugins/webapi/tools/plugin.go b/plugins/webapi/tools/plugin.go index 379bf6713c9ce72ef3ca7cb85a03247a78cec9fb..01be7ca29db944227e748b9d07d5878ac08fc36d 100644 --- a/plugins/webapi/tools/plugin.go +++ b/plugins/webapi/tools/plugin.go @@ -35,6 +35,7 @@ func Plugin() *node.Plugin { func configure(_ *node.Plugin) { log = logger.NewLogger(PluginName) webapi.Server().GET("tools/pastcone", pastCone) + webapi.Server().GET("tools/missing", missing) } func pastCone(c echo.Context) error { @@ -114,3 +115,19 @@ type PastConeResponse struct { PastConeSize int `json:"pastConeSize,omitempty"` Error string `json:"error,omitempty"` } + +func missing(c echo.Context) error { + res := &MissingResponse{} + missingIDs := messagelayer.Tangle().MissingMessages() + for _, msg := range missingIDs { + res.IDs = append(res.IDs, msg.String()) + } + res.Count = len(missingIDs) + return c.JSON(http.StatusOK, res) +} + +// MissingResponse is the HTTP response containing all the missing messages and their count. +type MissingResponse struct { + IDs []string `json:"ids,omitempty"` + Count int `json:"count,omitempty"` +}