From 840a6c2ac328d25d38201d41d871bf5c55bca808 Mon Sep 17 00:00:00 2001 From: Wolfgang Welz <welzwo@gmail.com> Date: Thu, 16 Jul 2020 14:53:24 +0200 Subject: [PATCH] simplify MessageRequester and prevent race conditions --- .../messagerequester/messagerequester.go | 70 ++++++++----------- .../messagelayer/messagerequester/options.go | 9 --- plugins/messagelayer/plugin.go | 2 +- 3 files changed, 31 insertions(+), 50 deletions(-) diff --git a/packages/binary/messagelayer/messagerequester/messagerequester.go b/packages/binary/messagelayer/messagerequester/messagerequester.go index 7ca437c6..06845164 100644 --- a/packages/binary/messagelayer/messagerequester/messagerequester.go +++ b/packages/binary/messagelayer/messagerequester/messagerequester.go @@ -1,33 +1,25 @@ package messagerequester import ( - "runtime" "sync" "time" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" - "github.com/iotaledger/hive.go/async" "github.com/iotaledger/hive.go/events" ) -var ( - // DefaultRequestWorkerCount defines the Default Request Worker Count of the message requester. - DefaultRequestWorkerCount = runtime.GOMAXPROCS(0) -) - // MessageRequester takes care of requesting messages. type MessageRequester struct { scheduledRequests map[message.Id]*time.Timer - requestWorker async.NonBlockingWorkerPool options *Options Events Events - scheduledRequestsMutex sync.RWMutex + scheduledRequestsMutex sync.Mutex } // New creates a new message requester. func New(optionalOptions ...Option) *MessageRequester { - requester := &MessageRequester{ + return &MessageRequester{ scheduledRequests: make(map[message.Id]*time.Timer), options: newOptions(optionalOptions), Events: Events{ @@ -36,46 +28,44 @@ func New(optionalOptions ...Option) *MessageRequester { }), }, } - - requester.requestWorker.Tune(requester.options.workerCount) - return requester } -// ScheduleRequest schedules a request for the given message. -func (requester *MessageRequester) ScheduleRequest(messageId message.Id) { - var retryRequest func(bool) - retryRequest = func(initialRequest bool) { - requester.requestWorker.Submit(func() { - requester.scheduledRequestsMutex.RLock() - if _, requestExists := requester.scheduledRequests[messageId]; !initialRequest && !requestExists { - requester.scheduledRequestsMutex.RUnlock() - return - } - requester.scheduledRequestsMutex.RUnlock() +// StartRequest initiates a regular triggering of the StartRequest event until it has been stopped using StopRequest. +func (requester *MessageRequester) StartRequest(id message.Id) { + requester.scheduledRequestsMutex.Lock() + defer requester.scheduledRequestsMutex.Unlock() - requester.Events.SendRequest.Trigger(messageId) - - requester.scheduledRequestsMutex.Lock() - requester.scheduledRequests[messageId] = time.AfterFunc(requester.options.retryInterval, func() { retryRequest(true) }) - requester.scheduledRequestsMutex.Unlock() - }) + // ignore already scheduled requests + if _, exists := requester.scheduledRequests[id]; exists { + return } - retryRequest(true) + // trigger the event and schedule the next request + // make this atomic to be sure that a successive call of StartRequest does not trigger again + requester.Events.SendRequest.Trigger(id) + requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, func() { requester.reRequest(id) }) } // StopRequest stops requests for the given message to further happen. -func (requester *MessageRequester) StopRequest(messageId message.Id) { - requester.scheduledRequestsMutex.RLock() - if timer, timerExists := requester.scheduledRequests[messageId]; timerExists { - requester.scheduledRequestsMutex.RUnlock() +func (requester *MessageRequester) StopRequest(id message.Id) { + requester.scheduledRequestsMutex.Lock() + defer requester.scheduledRequestsMutex.Unlock() + if timer, ok := requester.scheduledRequests[id]; ok { timer.Stop() + delete(requester.scheduledRequests, id) + } +} - requester.scheduledRequestsMutex.Lock() - delete(requester.scheduledRequests, messageId) - requester.scheduledRequestsMutex.Unlock() - return +func (requester *MessageRequester) reRequest(id message.Id) { + // as we schedule a request at most once per id we do not need to make the trigger and the re-schedule atomic + requester.Events.SendRequest.Trigger(id) + + requester.scheduledRequestsMutex.Lock() + defer requester.scheduledRequestsMutex.Unlock() + + // reschedule, if the request has not been stopped in the meantime + if _, exists := requester.scheduledRequests[id]; exists { + requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, func() { requester.reRequest(id) }) } - requester.scheduledRequestsMutex.RUnlock() } diff --git a/packages/binary/messagelayer/messagerequester/options.go b/packages/binary/messagelayer/messagerequester/options.go index a3b3a0cb..81611814 100644 --- a/packages/binary/messagelayer/messagerequester/options.go +++ b/packages/binary/messagelayer/messagerequester/options.go @@ -7,13 +7,11 @@ import ( // Options holds options for a message requester. type Options struct { retryInterval time.Duration - workerCount int } func newOptions(optionalOptions []Option) *Options { result := &Options{ retryInterval: 10 * time.Second, - workerCount: DefaultRequestWorkerCount, } for _, optionalOption := range optionalOptions { @@ -32,10 +30,3 @@ func RetryInterval(interval time.Duration) Option { args.retryInterval = interval } } - -// WorkerCount creates an option which sets the worker count to the given value. -func WorkerCount(workerCount int) Option { - return func(args *Options) { - args.workerCount = workerCount - } -} diff --git a/plugins/messagelayer/plugin.go b/plugins/messagelayer/plugin.go index 504f1977..084df96c 100644 --- a/plugins/messagelayer/plugin.go +++ b/plugins/messagelayer/plugin.go @@ -113,7 +113,7 @@ func configure(*node.Plugin) { })) // setup messageRequester - _tangle.Events.MessageMissing.Attach(events.NewClosure(messageRequester.ScheduleRequest)) + _tangle.Events.MessageMissing.Attach(events.NewClosure(messageRequester.StartRequest)) _tangle.Events.MissingMessageReceived.Attach(events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) { cachedMessageMetadata.Release() cachedMessage.Consume(func(msg *message.Message) { -- GitLab