Skip to content
Snippets Groups Projects
Unverified Commit 4554a1ff authored by Wolfgang Welz's avatar Wolfgang Welz Committed by GitHub
Browse files

Merge pull request #631 from iotaledger/fix/race-sync

Improve syncing
parents 2119f233 840a6c2a
No related branches found
No related tags found
No related merge requests found
package tangle
import (
"time"
)
const (
// MaxMissingTimeBeforeCleanup defines how long a transaction can be "missing", before we start pruning its future
// cone.
MaxMissingTimeBeforeCleanup = 30 * time.Second
// MissingCheckInterval defines how often we check if missing transactions have been received.
MissingCheckInterval = 5 * time.Second
)
......@@ -13,7 +13,7 @@ require (
github.com/gobuffalo/packr/v2 v2.8.0
github.com/golang/protobuf v1.4.2
github.com/gorilla/websocket v1.4.2
github.com/iotaledger/hive.go v0.0.0-20200625105326-310ea88f1337
github.com/iotaledger/hive.go v0.0.0-20200713104016-55eaecb9d028
github.com/labstack/echo v3.3.10+incompatible
github.com/labstack/gommon v0.3.0
github.com/magiconair/properties v1.8.1
......
package messagerequester
import (
"runtime"
"sync"
"time"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
"github.com/iotaledger/hive.go/async"
"github.com/iotaledger/hive.go/events"
)
var (
// DefaultRequestWorkerCount defines the Default Request Worker Count of the message requester.
DefaultRequestWorkerCount = runtime.GOMAXPROCS(0)
)
// MessageRequester takes care of requesting messages.
type MessageRequester struct {
scheduledRequests map[message.Id]*time.Timer
requestWorker async.NonBlockingWorkerPool
options *Options
Events Events
scheduledRequestsMutex sync.RWMutex
scheduledRequestsMutex sync.Mutex
}
// New creates a new message requester.
func New(optionalOptions ...Option) *MessageRequester {
requester := &MessageRequester{
return &MessageRequester{
scheduledRequests: make(map[message.Id]*time.Timer),
options: newOptions(optionalOptions),
Events: Events{
......@@ -36,46 +28,44 @@ func New(optionalOptions ...Option) *MessageRequester {
}),
},
}
requester.requestWorker.Tune(requester.options.workerCount)
return requester
}
// ScheduleRequest schedules a request for the given message.
func (requester *MessageRequester) ScheduleRequest(messageId message.Id) {
var retryRequest func(bool)
retryRequest = func(initialRequest bool) {
requester.requestWorker.Submit(func() {
requester.scheduledRequestsMutex.RLock()
if _, requestExists := requester.scheduledRequests[messageId]; !initialRequest && !requestExists {
requester.scheduledRequestsMutex.RUnlock()
return
}
requester.scheduledRequestsMutex.RUnlock()
// StartRequest initiates a regular triggering of the StartRequest event until it has been stopped using StopRequest.
func (requester *MessageRequester) StartRequest(id message.Id) {
requester.scheduledRequestsMutex.Lock()
defer requester.scheduledRequestsMutex.Unlock()
requester.Events.SendRequest.Trigger(messageId)
requester.scheduledRequestsMutex.Lock()
requester.scheduledRequests[messageId] = time.AfterFunc(requester.options.retryInterval, func() { retryRequest(false) })
requester.scheduledRequestsMutex.Unlock()
})
// ignore already scheduled requests
if _, exists := requester.scheduledRequests[id]; exists {
return
}
retryRequest(true)
// trigger the event and schedule the next request
// make this atomic to be sure that a successive call of StartRequest does not trigger again
requester.Events.SendRequest.Trigger(id)
requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, func() { requester.reRequest(id) })
}
// StopRequest stops requests for the given message to further happen.
func (requester *MessageRequester) StopRequest(messageId message.Id) {
requester.scheduledRequestsMutex.RLock()
if timer, timerExists := requester.scheduledRequests[messageId]; timerExists {
requester.scheduledRequestsMutex.RUnlock()
func (requester *MessageRequester) StopRequest(id message.Id) {
requester.scheduledRequestsMutex.Lock()
defer requester.scheduledRequestsMutex.Unlock()
if timer, ok := requester.scheduledRequests[id]; ok {
timer.Stop()
delete(requester.scheduledRequests, id)
}
}
requester.scheduledRequestsMutex.Lock()
delete(requester.scheduledRequests, messageId)
requester.scheduledRequestsMutex.Unlock()
return
func (requester *MessageRequester) reRequest(id message.Id) {
// as we schedule a request at most once per id we do not need to make the trigger and the re-schedule atomic
requester.Events.SendRequest.Trigger(id)
requester.scheduledRequestsMutex.Lock()
defer requester.scheduledRequestsMutex.Unlock()
// reschedule, if the request has not been stopped in the meantime
if _, exists := requester.scheduledRequests[id]; exists {
requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, func() { requester.reRequest(id) })
}
requester.scheduledRequestsMutex.RUnlock()
}
......@@ -7,13 +7,11 @@ import (
// Options holds options for a message requester.
type Options struct {
retryInterval time.Duration
workerCount int
}
func newOptions(optionalOptions []Option) *Options {
result := &Options{
retryInterval: 10 * time.Second,
workerCount: DefaultRequestWorkerCount,
}
for _, optionalOption := range optionalOptions {
......@@ -32,10 +30,3 @@ func RetryInterval(interval time.Duration) Option {
args.retryInterval = interval
}
}
// WorkerCount creates an option which sets the worker count to the given value.
func WorkerCount(workerCount int) Option {
return func(args *Options) {
args.workerCount = workerCount
}
}
......@@ -5,24 +5,15 @@ import (
"runtime"
"time"
"github.com/iotaledger/hive.go/kvstore"
"github.com/iotaledger/hive.go/types"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
"github.com/iotaledger/goshimmer/packages/binary/storageprefix"
"github.com/iotaledger/hive.go/async"
"github.com/iotaledger/hive.go/kvstore"
"github.com/iotaledger/hive.go/objectstorage"
"github.com/iotaledger/hive.go/types"
)
const (
// MaxMissingTimeBeforeCleanup is the max. amount of time a message can be marked as missing
// before it is ultimately un-marked as missing.
MaxMissingTimeBeforeCleanup = 30 * time.Second
// MissingCheckInterval is the interval on which it is checked whether a missing
// message is still missing.
MissingCheckInterval = 5 * time.Second
cacheTime = 20 * time.Second
)
......@@ -218,7 +209,10 @@ func (tangle *Tangle) isMessageSolid(msg *message.Message, msgMetadata *MessageM
return true
}
return tangle.isMessageMarkedAsSolid(msg.TrunkId()) && tangle.isMessageMarkedAsSolid(msg.BranchId())
// as missing messages are requested in isMessageMarkedAsSolid, we want to prevent short-circuit evaluation
trunkSolid := tangle.isMessageMarkedAsSolid(msg.TrunkId())
branchSolid := tangle.isMessageMarkedAsSolid(msg.BranchId())
return trunkSolid && branchSolid
}
// builds up a stack from the given message and tries to solidify into the present.
......@@ -268,46 +262,6 @@ func (tangle *Tangle) checkMessageSolidityAndPropagate(cachedMessage *message.Ca
}
}
// MonitorMissingMessages continuously monitors for missing messages and eventually deletes them if they
// don't become available in a certain time frame.
func (tangle *Tangle) MonitorMissingMessages(shutdownSignal <-chan struct{}) {
reCheckInterval := time.NewTicker(MissingCheckInterval)
defer reCheckInterval.Stop()
for {
select {
case <-reCheckInterval.C:
var toDelete []message.Id
var toUnmark []message.Id
tangle.missingMessageStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool {
defer cachedObject.Release()
missingMessage := cachedObject.Get().(*MissingMessage)
if tangle.messageStorage.Contains(missingMessage.messageId.Bytes()) {
toUnmark = append(toUnmark, missingMessage.MessageId())
return true
}
// check whether message is missing since over our max time delta
if time.Since(missingMessage.MissingSince()) >= MaxMissingTimeBeforeCleanup {
toDelete = append(toDelete, missingMessage.MessageId())
}
return true
})
for _, msgID := range toUnmark {
tangle.missingMessageStorage.DeleteIfPresent(msgID.Bytes())
}
for _, msgID := range toDelete {
// delete the future cone of the missing message
tangle.Events.MessageUnsolidifiable.Trigger(msgID)
// TODO: obvious race condition between receiving the message and it getting deleted here
tangle.deleteFutureCone(msgID)
}
case <-shutdownSignal:
return
}
}
}
// deletes the given approver association for the given approvee to its approver.
func (tangle *Tangle) deleteApprover(approvedMessageId message.Id, approvingMessage message.Id) {
idToDelete := make([]byte, message.IdLength+message.IdLength)
......
......@@ -113,7 +113,7 @@ func configure(*node.Plugin) {
}))
// setup messageRequester
_tangle.Events.MessageMissing.Attach(events.NewClosure(messageRequester.ScheduleRequest))
_tangle.Events.MessageMissing.Attach(events.NewClosure(messageRequester.StartRequest))
_tangle.Events.MissingMessageReceived.Attach(events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) {
cachedMessageMetadata.Release()
cachedMessage.Consume(func(msg *message.Message) {
......@@ -129,13 +129,6 @@ func configure(*node.Plugin) {
}
func run(*node.Plugin) {
if err := daemon.BackgroundWorker("Tangle[MissingMessagesMonitor]", func(shutdownSignal <-chan struct{}) {
_tangle.MonitorMissingMessages(shutdownSignal)
}, shutdown.PriorityMissingMessagesMonitoring); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
if err := daemon.BackgroundWorker("Tangle", func(shutdownSignal <-chan struct{}) {
<-shutdownSignal
messageFactory.Shutdown()
......@@ -144,5 +137,4 @@ func run(*node.Plugin) {
}, shutdown.PriorityTangle); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
}
......@@ -10,7 +10,7 @@ require (
github.com/docker/go-units v0.4.0 // indirect
github.com/drand/drand v0.9.1
github.com/iotaledger/goshimmer v0.1.3
github.com/iotaledger/hive.go v0.0.0-20200625105326-310ea88f1337
github.com/iotaledger/hive.go v0.0.0-20200713104016-55eaecb9d028
github.com/mr-tron/base58 v1.2.0
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/stretchr/testify v1.6.1
......
......@@ -260,8 +260,8 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
github.com/huin/goupnp v1.0.0/go.mod h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7Bnc=
github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150/go.mod h1:PpLOETDnJ0o3iZrZfqZzyLl6l7F3c6L1oWn7OICBi6o=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/iotaledger/hive.go v0.0.0-20200625105326-310ea88f1337 h1:F6PzAkymPcKr1vJVK3/80wiVovjkL47c9FMjUOesXGA=
github.com/iotaledger/hive.go v0.0.0-20200625105326-310ea88f1337/go.mod h1:42UvBc41QBsuM7z1P1fABMonTJb7kGqkzshRebClQvA=
github.com/iotaledger/hive.go v0.0.0-20200713104016-55eaecb9d028 h1:GUKeANC04c4NE94Gs6rkbWfo9bfOkqtLwbnCGFLL+TY=
github.com/iotaledger/hive.go v0.0.0-20200713104016-55eaecb9d028/go.mod h1:42UvBc41QBsuM7z1P1fABMonTJb7kGqkzshRebClQvA=
github.com/iotaledger/iota.go v1.0.0-beta.15/go.mod h1:Rn6v5hLAn8YBaJlRu1ZQdPAgKlshJR1PTeLQaft2778=
github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
......
......@@ -119,7 +119,7 @@ func CheckForMessageIds(t *testing.T, peers []*framework.Peer, ids map[string]Da
// check that the peer sees itself as synchronized
info, err := peer.Info()
require.NoError(t, err)
require.True(t, info.Synced)
assert.Truef(t, info.Synced, "Node %s is not synced", peer)
}
resp, err := peer.FindMessageByID(idsSlice)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment