Newer
Older
Hans Moog
committed
import (
"sync"
"time"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
Hans Moog
committed
"github.com/iotaledger/hive.go/events"
Hans Moog
committed
// MessageRequester takes care of requesting messages.
type MessageRequester struct {
Hans Moog
committed
options *Options
Hans Moog
committed
Events Events
Hans Moog
committed
}
// MessageExistsFunc is a function that tells if a message exists.
func createReRequest(requester *MessageRequester, msgID message.ID, count int) func() {
return func() { requester.reRequest(msgID, count) }
}
// New creates a new message requester.
func New(messageExists MessageExistsFunc, missingMessages []message.ID, optionalOptions ...Option) *MessageRequester {
Hans Moog
committed
options: newOptions(optionalOptions),
Hans Moog
committed
Events: Events{
SendRequest: events.NewEvent(func(handler interface{}, params ...interface{}) {
Hans Moog
committed
}),
MissingMessageAppeared: events.NewEvent(func(handler interface{}, params ...interface{}) {
Hans Moog
committed
},
}
// add requests for all missing messages
requester.scheduledRequestsMutex.Lock()
defer requester.scheduledRequestsMutex.Unlock()
for _, id := range missingMessages {
requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, createReRequest(requester, id, 0))
}
return requester
Hans Moog
committed
}
// StartRequest initiates a regular triggering of the StartRequest event until it has been stopped using StopRequest.
func (requester *MessageRequester) StartRequest(id message.ID) {
requester.scheduledRequestsMutex.Lock()
Hans Moog
committed
// ignore already scheduled requests
if _, exists := requester.scheduledRequests[id]; exists {
requester.scheduledRequestsMutex.Unlock()
Hans Moog
committed
}
// schedule the next request and trigger the event
requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, createReRequest(requester, id, 0))
requester.scheduledRequestsMutex.Unlock()
requester.Events.SendRequest.Trigger(id)
Hans Moog
committed
}
// StopRequest stops requests for the given message to further happen.
func (requester *MessageRequester) StopRequest(id message.ID) {
requester.scheduledRequestsMutex.Lock()
defer requester.scheduledRequestsMutex.Unlock()
Hans Moog
committed
if timer, ok := requester.scheduledRequests[id]; ok {
Hans Moog
committed
timer.Stop()
delete(requester.scheduledRequests, id)
}
}
Hans Moog
committed
func (requester *MessageRequester) reRequest(id message.ID, count int) {
requester.Events.SendRequest.Trigger(id)
count++
stopRequest := count > messageExistCheckThreshold && requester.messageExistsFunc(id)
// as we schedule a request at most once per id we do not need to make the trigger and the re-schedule atomic
requester.scheduledRequestsMutex.Lock()
defer requester.scheduledRequestsMutex.Unlock()
// reschedule, if the request has not been stopped in the meantime
if _, exists := requester.scheduledRequests[id]; exists {
if stopRequest {
// if found message tangle: stop request and delete from missingMessageStorage (via event)
delete(requester.scheduledRequests, id)
requester.Events.MissingMessageAppeared.Trigger(id)
return
}
requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, createReRequest(requester, id, count))
Hans Moog
committed
}
}
// RequestQueueSize returns the number of scheduled message requests.
func (requester *MessageRequester) RequestQueueSize() int {
requester.scheduledRequestsMutex.RLock()
defer requester.scheduledRequestsMutex.RUnlock()
return len(requester.scheduledRequests)
}