Skip to content
Snippets Groups Projects
Unverified Commit 840a6c2a authored by Wolfgang Welz's avatar Wolfgang Welz
Browse files

simplify MessageRequester and prevent race conditions

parent 951cb11b
No related branches found
No related tags found
No related merge requests found
package messagerequester package messagerequester
import ( import (
"runtime"
"sync" "sync"
"time" "time"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
"github.com/iotaledger/hive.go/async"
"github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/events"
) )
var (
// DefaultRequestWorkerCount defines the Default Request Worker Count of the message requester.
DefaultRequestWorkerCount = runtime.GOMAXPROCS(0)
)
// MessageRequester takes care of requesting messages. // MessageRequester takes care of requesting messages.
type MessageRequester struct { type MessageRequester struct {
scheduledRequests map[message.Id]*time.Timer scheduledRequests map[message.Id]*time.Timer
requestWorker async.NonBlockingWorkerPool
options *Options options *Options
Events Events Events Events
scheduledRequestsMutex sync.RWMutex scheduledRequestsMutex sync.Mutex
} }
// New creates a new message requester. // New creates a new message requester.
func New(optionalOptions ...Option) *MessageRequester { func New(optionalOptions ...Option) *MessageRequester {
requester := &MessageRequester{ return &MessageRequester{
scheduledRequests: make(map[message.Id]*time.Timer), scheduledRequests: make(map[message.Id]*time.Timer),
options: newOptions(optionalOptions), options: newOptions(optionalOptions),
Events: Events{ Events: Events{
...@@ -36,46 +28,44 @@ func New(optionalOptions ...Option) *MessageRequester { ...@@ -36,46 +28,44 @@ func New(optionalOptions ...Option) *MessageRequester {
}), }),
}, },
} }
requester.requestWorker.Tune(requester.options.workerCount)
return requester
} }
// ScheduleRequest schedules a request for the given message. // StartRequest initiates a regular triggering of the StartRequest event until it has been stopped using StopRequest.
func (requester *MessageRequester) ScheduleRequest(messageId message.Id) { func (requester *MessageRequester) StartRequest(id message.Id) {
var retryRequest func(bool) requester.scheduledRequestsMutex.Lock()
retryRequest = func(initialRequest bool) { defer requester.scheduledRequestsMutex.Unlock()
requester.requestWorker.Submit(func() {
requester.scheduledRequestsMutex.RLock()
if _, requestExists := requester.scheduledRequests[messageId]; !initialRequest && !requestExists {
requester.scheduledRequestsMutex.RUnlock()
return
}
requester.scheduledRequestsMutex.RUnlock()
requester.Events.SendRequest.Trigger(messageId) // ignore already scheduled requests
if _, exists := requester.scheduledRequests[id]; exists {
requester.scheduledRequestsMutex.Lock() return
requester.scheduledRequests[messageId] = time.AfterFunc(requester.options.retryInterval, func() { retryRequest(true) })
requester.scheduledRequestsMutex.Unlock()
})
} }
retryRequest(true) // trigger the event and schedule the next request
// make this atomic to be sure that a successive call of StartRequest does not trigger again
requester.Events.SendRequest.Trigger(id)
requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, func() { requester.reRequest(id) })
} }
// StopRequest stops requests for the given message to further happen. // StopRequest stops requests for the given message to further happen.
func (requester *MessageRequester) StopRequest(messageId message.Id) { func (requester *MessageRequester) StopRequest(id message.Id) {
requester.scheduledRequestsMutex.RLock() requester.scheduledRequestsMutex.Lock()
if timer, timerExists := requester.scheduledRequests[messageId]; timerExists { defer requester.scheduledRequestsMutex.Unlock()
requester.scheduledRequestsMutex.RUnlock()
if timer, ok := requester.scheduledRequests[id]; ok {
timer.Stop() timer.Stop()
delete(requester.scheduledRequests, id)
}
}
requester.scheduledRequestsMutex.Lock() func (requester *MessageRequester) reRequest(id message.Id) {
delete(requester.scheduledRequests, messageId) // as we schedule a request at most once per id we do not need to make the trigger and the re-schedule atomic
requester.scheduledRequestsMutex.Unlock() requester.Events.SendRequest.Trigger(id)
return
requester.scheduledRequestsMutex.Lock()
defer requester.scheduledRequestsMutex.Unlock()
// reschedule, if the request has not been stopped in the meantime
if _, exists := requester.scheduledRequests[id]; exists {
requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, func() { requester.reRequest(id) })
} }
requester.scheduledRequestsMutex.RUnlock()
} }
...@@ -7,13 +7,11 @@ import ( ...@@ -7,13 +7,11 @@ import (
// Options holds options for a message requester. // Options holds options for a message requester.
type Options struct { type Options struct {
retryInterval time.Duration retryInterval time.Duration
workerCount int
} }
func newOptions(optionalOptions []Option) *Options { func newOptions(optionalOptions []Option) *Options {
result := &Options{ result := &Options{
retryInterval: 10 * time.Second, retryInterval: 10 * time.Second,
workerCount: DefaultRequestWorkerCount,
} }
for _, optionalOption := range optionalOptions { for _, optionalOption := range optionalOptions {
...@@ -32,10 +30,3 @@ func RetryInterval(interval time.Duration) Option { ...@@ -32,10 +30,3 @@ func RetryInterval(interval time.Duration) Option {
args.retryInterval = interval args.retryInterval = interval
} }
} }
// WorkerCount creates an option which sets the worker count to the given value.
func WorkerCount(workerCount int) Option {
return func(args *Options) {
args.workerCount = workerCount
}
}
...@@ -113,7 +113,7 @@ func configure(*node.Plugin) { ...@@ -113,7 +113,7 @@ func configure(*node.Plugin) {
})) }))
// setup messageRequester // setup messageRequester
_tangle.Events.MessageMissing.Attach(events.NewClosure(messageRequester.ScheduleRequest)) _tangle.Events.MessageMissing.Attach(events.NewClosure(messageRequester.StartRequest))
_tangle.Events.MissingMessageReceived.Attach(events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) { _tangle.Events.MissingMessageReceived.Attach(events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) {
cachedMessageMetadata.Release() cachedMessageMetadata.Release()
cachedMessage.Consume(func(msg *message.Message) { cachedMessage.Consume(func(msg *message.Message) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment