Newer
Older
Hans Moog
committed
import (
"sync"
"time"
"github.com/iotaledger/hive.go/async"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
Hans Moog
committed
)
// MessageRequester takes care of requesting messages.
type MessageRequester struct {
scheduledRequests map[message.Id]*time.Timer
Hans Moog
committed
requestWorker async.NonBlockingWorkerPool
options *Options
Events Events
scheduledRequestsMutex sync.RWMutex
}
// New creates a new message requester.
func New(optionalOptions ...Option) *MessageRequester {
requester := &MessageRequester{
scheduledRequests: make(map[message.Id]*time.Timer),
Hans Moog
committed
options: newOptions(optionalOptions),
Events: Events{
SendRequest: events.NewEvent(func(handler interface{}, params ...interface{}) {
handler.(func(message.Id))(params[0].(message.Id))
Hans Moog
committed
}),
},
}
requester.requestWorker.Tune(requester.options.workerCount)
return requester
}
// ScheduleRequest schedules a request for the given message.
func (requester *MessageRequester) ScheduleRequest(messageId message.Id) {
Hans Moog
committed
var retryRequest func(bool)
retryRequest = func(initialRequest bool) {
requester.requestWorker.Submit(func() {
requester.scheduledRequestsMutex.RLock()
if _, requestExists := requester.scheduledRequests[messageId]; !initialRequest && !requestExists {
Hans Moog
committed
requester.scheduledRequestsMutex.RUnlock()
return
}
requester.scheduledRequestsMutex.RUnlock()
requester.Events.SendRequest.Trigger(messageId)
Hans Moog
committed
requester.scheduledRequestsMutex.Lock()
requester.scheduledRequests[messageId] = time.AfterFunc(requester.options.retryInterval, func() { retryRequest(false) })
Hans Moog
committed
requester.scheduledRequestsMutex.Unlock()
})
}
retryRequest(true)
}
// StopRequest stops requests for the given message to further happen.
func (requester *MessageRequester) StopRequest(messageId message.Id) {
Hans Moog
committed
requester.scheduledRequestsMutex.RLock()
if timer, timerExists := requester.scheduledRequests[messageId]; timerExists {
Hans Moog
committed
requester.scheduledRequestsMutex.RUnlock()
timer.Stop()
requester.scheduledRequestsMutex.Lock()
delete(requester.scheduledRequests, messageId)
Hans Moog
committed
requester.scheduledRequestsMutex.Unlock()
Hans Moog
committed
}
requester.scheduledRequestsMutex.RUnlock()