diff --git a/dapps/valuetransfers/packages/tangle/constants.go b/dapps/valuetransfers/packages/tangle/constants.go deleted file mode 100644 index f862156f666f8cfd48d39ff4ac5776bc44bc9faa..0000000000000000000000000000000000000000 --- a/dapps/valuetransfers/packages/tangle/constants.go +++ /dev/null @@ -1,14 +0,0 @@ -package tangle - -import ( - "time" -) - -const ( - // MaxMissingTimeBeforeCleanup defines how long a transaction can be "missing", before we start pruning its future - // cone. - MaxMissingTimeBeforeCleanup = 30 * time.Second - - // MissingCheckInterval defines how often we check if missing transactions have been received. - MissingCheckInterval = 5 * time.Second -) diff --git a/go.mod b/go.mod index d2cccad7ef466f54e3fa85135425b4fd05fc2c79..7b0b7242e0827688a697c9e034e809735d15de9b 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/gobuffalo/packr/v2 v2.8.0 github.com/golang/protobuf v1.4.2 github.com/gorilla/websocket v1.4.2 - github.com/iotaledger/hive.go v0.0.0-20200625105326-310ea88f1337 + github.com/iotaledger/hive.go v0.0.0-20200713104016-55eaecb9d028 github.com/labstack/echo v3.3.10+incompatible github.com/labstack/gommon v0.3.0 github.com/magiconair/properties v1.8.1 diff --git a/go.sum b/go.sum index 4ad17fa5119419a6284af20f0750ce3a35bcf541..31be3a85d69dfb48f0f58631326b08507e2c2eb4 100644 --- a/go.sum +++ b/go.sum @@ -229,8 +229,8 @@ github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/J github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= -github.com/iotaledger/hive.go v0.0.0-20200625105326-310ea88f1337 h1:F6PzAkymPcKr1vJVK3/80wiVovjkL47c9FMjUOesXGA= -github.com/iotaledger/hive.go v0.0.0-20200625105326-310ea88f1337/go.mod h1:42UvBc41QBsuM7z1P1fABMonTJb7kGqkzshRebClQvA= +github.com/iotaledger/hive.go v0.0.0-20200713104016-55eaecb9d028 h1:GUKeANC04c4NE94Gs6rkbWfo9bfOkqtLwbnCGFLL+TY= +github.com/iotaledger/hive.go v0.0.0-20200713104016-55eaecb9d028/go.mod h1:42UvBc41QBsuM7z1P1fABMonTJb7kGqkzshRebClQvA= github.com/iotaledger/iota.go v1.0.0-beta.15 h1:HI8PqerEnO1CCIqmXHJ6zh1IaSFXU+S0qlUAEKshho8= github.com/iotaledger/iota.go v1.0.0-beta.15/go.mod h1:Rn6v5hLAn8YBaJlRu1ZQdPAgKlshJR1PTeLQaft2778= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A= diff --git a/packages/binary/messagelayer/messagerequester/messagerequester.go b/packages/binary/messagelayer/messagerequester/messagerequester.go index 5018520849e4a3a1b7beb0ff15626101e569059a..068451643a9ea3d239e52006e6bf63f9a9bd4733 100644 --- a/packages/binary/messagelayer/messagerequester/messagerequester.go +++ b/packages/binary/messagelayer/messagerequester/messagerequester.go @@ -1,33 +1,25 @@ package messagerequester import ( - "runtime" "sync" "time" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" - "github.com/iotaledger/hive.go/async" "github.com/iotaledger/hive.go/events" ) -var ( - // DefaultRequestWorkerCount defines the Default Request Worker Count of the message requester. - DefaultRequestWorkerCount = runtime.GOMAXPROCS(0) -) - // MessageRequester takes care of requesting messages. type MessageRequester struct { scheduledRequests map[message.Id]*time.Timer - requestWorker async.NonBlockingWorkerPool options *Options Events Events - scheduledRequestsMutex sync.RWMutex + scheduledRequestsMutex sync.Mutex } // New creates a new message requester. func New(optionalOptions ...Option) *MessageRequester { - requester := &MessageRequester{ + return &MessageRequester{ scheduledRequests: make(map[message.Id]*time.Timer), options: newOptions(optionalOptions), Events: Events{ @@ -36,46 +28,44 @@ func New(optionalOptions ...Option) *MessageRequester { }), }, } - - requester.requestWorker.Tune(requester.options.workerCount) - return requester } -// ScheduleRequest schedules a request for the given message. -func (requester *MessageRequester) ScheduleRequest(messageId message.Id) { - var retryRequest func(bool) - retryRequest = func(initialRequest bool) { - requester.requestWorker.Submit(func() { - requester.scheduledRequestsMutex.RLock() - if _, requestExists := requester.scheduledRequests[messageId]; !initialRequest && !requestExists { - requester.scheduledRequestsMutex.RUnlock() - return - } - requester.scheduledRequestsMutex.RUnlock() +// StartRequest initiates a regular triggering of the StartRequest event until it has been stopped using StopRequest. +func (requester *MessageRequester) StartRequest(id message.Id) { + requester.scheduledRequestsMutex.Lock() + defer requester.scheduledRequestsMutex.Unlock() - requester.Events.SendRequest.Trigger(messageId) - - requester.scheduledRequestsMutex.Lock() - requester.scheduledRequests[messageId] = time.AfterFunc(requester.options.retryInterval, func() { retryRequest(false) }) - requester.scheduledRequestsMutex.Unlock() - }) + // ignore already scheduled requests + if _, exists := requester.scheduledRequests[id]; exists { + return } - retryRequest(true) + // trigger the event and schedule the next request + // make this atomic to be sure that a successive call of StartRequest does not trigger again + requester.Events.SendRequest.Trigger(id) + requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, func() { requester.reRequest(id) }) } // StopRequest stops requests for the given message to further happen. -func (requester *MessageRequester) StopRequest(messageId message.Id) { - requester.scheduledRequestsMutex.RLock() - if timer, timerExists := requester.scheduledRequests[messageId]; timerExists { - requester.scheduledRequestsMutex.RUnlock() +func (requester *MessageRequester) StopRequest(id message.Id) { + requester.scheduledRequestsMutex.Lock() + defer requester.scheduledRequestsMutex.Unlock() + if timer, ok := requester.scheduledRequests[id]; ok { timer.Stop() + delete(requester.scheduledRequests, id) + } +} - requester.scheduledRequestsMutex.Lock() - delete(requester.scheduledRequests, messageId) - requester.scheduledRequestsMutex.Unlock() - return +func (requester *MessageRequester) reRequest(id message.Id) { + // as we schedule a request at most once per id we do not need to make the trigger and the re-schedule atomic + requester.Events.SendRequest.Trigger(id) + + requester.scheduledRequestsMutex.Lock() + defer requester.scheduledRequestsMutex.Unlock() + + // reschedule, if the request has not been stopped in the meantime + if _, exists := requester.scheduledRequests[id]; exists { + requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, func() { requester.reRequest(id) }) } - requester.scheduledRequestsMutex.RUnlock() } diff --git a/packages/binary/messagelayer/messagerequester/options.go b/packages/binary/messagelayer/messagerequester/options.go index a3b3a0cb6c79a17e36e1726423124ede19e9c025..816118147af126a781a8e696c8835e455858af5f 100644 --- a/packages/binary/messagelayer/messagerequester/options.go +++ b/packages/binary/messagelayer/messagerequester/options.go @@ -7,13 +7,11 @@ import ( // Options holds options for a message requester. type Options struct { retryInterval time.Duration - workerCount int } func newOptions(optionalOptions []Option) *Options { result := &Options{ retryInterval: 10 * time.Second, - workerCount: DefaultRequestWorkerCount, } for _, optionalOption := range optionalOptions { @@ -32,10 +30,3 @@ func RetryInterval(interval time.Duration) Option { args.retryInterval = interval } } - -// WorkerCount creates an option which sets the worker count to the given value. -func WorkerCount(workerCount int) Option { - return func(args *Options) { - args.workerCount = workerCount - } -} diff --git a/packages/binary/messagelayer/tangle/tangle.go b/packages/binary/messagelayer/tangle/tangle.go index 240935094518e00300173b0535636634ecb3cb82..d750ecbd59f1fd63af673b32456681aaef36e2b9 100644 --- a/packages/binary/messagelayer/tangle/tangle.go +++ b/packages/binary/messagelayer/tangle/tangle.go @@ -5,24 +5,15 @@ import ( "runtime" "time" - "github.com/iotaledger/hive.go/kvstore" - "github.com/iotaledger/hive.go/types" - "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" "github.com/iotaledger/goshimmer/packages/binary/storageprefix" - "github.com/iotaledger/hive.go/async" + "github.com/iotaledger/hive.go/kvstore" "github.com/iotaledger/hive.go/objectstorage" + "github.com/iotaledger/hive.go/types" ) const ( - // MaxMissingTimeBeforeCleanup is the max. amount of time a message can be marked as missing - // before it is ultimately un-marked as missing. - MaxMissingTimeBeforeCleanup = 30 * time.Second - // MissingCheckInterval is the interval on which it is checked whether a missing - // message is still missing. - MissingCheckInterval = 5 * time.Second - cacheTime = 20 * time.Second ) @@ -218,7 +209,10 @@ func (tangle *Tangle) isMessageSolid(msg *message.Message, msgMetadata *MessageM return true } - return tangle.isMessageMarkedAsSolid(msg.TrunkId()) && tangle.isMessageMarkedAsSolid(msg.BranchId()) + // as missing messages are requested in isMessageMarkedAsSolid, we want to prevent short-circuit evaluation + trunkSolid := tangle.isMessageMarkedAsSolid(msg.TrunkId()) + branchSolid := tangle.isMessageMarkedAsSolid(msg.BranchId()) + return trunkSolid && branchSolid } // builds up a stack from the given message and tries to solidify into the present. @@ -268,46 +262,6 @@ func (tangle *Tangle) checkMessageSolidityAndPropagate(cachedMessage *message.Ca } } -// MonitorMissingMessages continuously monitors for missing messages and eventually deletes them if they -// don't become available in a certain time frame. -func (tangle *Tangle) MonitorMissingMessages(shutdownSignal <-chan struct{}) { - reCheckInterval := time.NewTicker(MissingCheckInterval) - defer reCheckInterval.Stop() - for { - select { - case <-reCheckInterval.C: - var toDelete []message.Id - var toUnmark []message.Id - tangle.missingMessageStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool { - defer cachedObject.Release() - missingMessage := cachedObject.Get().(*MissingMessage) - - if tangle.messageStorage.Contains(missingMessage.messageId.Bytes()) { - toUnmark = append(toUnmark, missingMessage.MessageId()) - return true - } - - // check whether message is missing since over our max time delta - if time.Since(missingMessage.MissingSince()) >= MaxMissingTimeBeforeCleanup { - toDelete = append(toDelete, missingMessage.MessageId()) - } - return true - }) - for _, msgID := range toUnmark { - tangle.missingMessageStorage.DeleteIfPresent(msgID.Bytes()) - } - for _, msgID := range toDelete { - // delete the future cone of the missing message - tangle.Events.MessageUnsolidifiable.Trigger(msgID) - // TODO: obvious race condition between receiving the message and it getting deleted here - tangle.deleteFutureCone(msgID) - } - case <-shutdownSignal: - return - } - } -} - // deletes the given approver association for the given approvee to its approver. func (tangle *Tangle) deleteApprover(approvedMessageId message.Id, approvingMessage message.Id) { idToDelete := make([]byte, message.IdLength+message.IdLength) diff --git a/plugins/messagelayer/plugin.go b/plugins/messagelayer/plugin.go index 2286db7e2b0b0c4da55d953d8bc9fe719174f65a..084df96ce53f45d156a0122cb868eac6ea1cc871 100644 --- a/plugins/messagelayer/plugin.go +++ b/plugins/messagelayer/plugin.go @@ -113,7 +113,7 @@ func configure(*node.Plugin) { })) // setup messageRequester - _tangle.Events.MessageMissing.Attach(events.NewClosure(messageRequester.ScheduleRequest)) + _tangle.Events.MessageMissing.Attach(events.NewClosure(messageRequester.StartRequest)) _tangle.Events.MissingMessageReceived.Attach(events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) { cachedMessageMetadata.Release() cachedMessage.Consume(func(msg *message.Message) { @@ -129,13 +129,6 @@ func configure(*node.Plugin) { } func run(*node.Plugin) { - - if err := daemon.BackgroundWorker("Tangle[MissingMessagesMonitor]", func(shutdownSignal <-chan struct{}) { - _tangle.MonitorMissingMessages(shutdownSignal) - }, shutdown.PriorityMissingMessagesMonitoring); err != nil { - log.Panicf("Failed to start as daemon: %s", err) - } - if err := daemon.BackgroundWorker("Tangle", func(shutdownSignal <-chan struct{}) { <-shutdownSignal messageFactory.Shutdown() @@ -144,5 +137,4 @@ func run(*node.Plugin) { }, shutdown.PriorityTangle); err != nil { log.Panicf("Failed to start as daemon: %s", err) } - } diff --git a/tools/integration-tests/tester/go.mod b/tools/integration-tests/tester/go.mod index bf7684da5c7af5a85162cf78f74871b69282e102..1feee38f3163d1a80f682f097a1b85a1863f45cf 100644 --- a/tools/integration-tests/tester/go.mod +++ b/tools/integration-tests/tester/go.mod @@ -10,7 +10,7 @@ require ( github.com/docker/go-units v0.4.0 // indirect github.com/drand/drand v0.9.1 github.com/iotaledger/goshimmer v0.1.3 - github.com/iotaledger/hive.go v0.0.0-20200625105326-310ea88f1337 + github.com/iotaledger/hive.go v0.0.0-20200713104016-55eaecb9d028 github.com/mr-tron/base58 v1.2.0 github.com/opencontainers/go-digest v1.0.0 // indirect github.com/stretchr/testify v1.6.1 diff --git a/tools/integration-tests/tester/go.sum b/tools/integration-tests/tester/go.sum index 6dbd85bccedbfb13897fc3b27bd2d0da6c5a25c5..3a4532b5f7279961bc484ea5b45deece13f8c06c 100644 --- a/tools/integration-tests/tester/go.sum +++ b/tools/integration-tests/tester/go.sum @@ -260,8 +260,8 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO github.com/huin/goupnp v1.0.0/go.mod h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7Bnc= github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150/go.mod h1:PpLOETDnJ0o3iZrZfqZzyLl6l7F3c6L1oWn7OICBi6o= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= -github.com/iotaledger/hive.go v0.0.0-20200625105326-310ea88f1337 h1:F6PzAkymPcKr1vJVK3/80wiVovjkL47c9FMjUOesXGA= -github.com/iotaledger/hive.go v0.0.0-20200625105326-310ea88f1337/go.mod h1:42UvBc41QBsuM7z1P1fABMonTJb7kGqkzshRebClQvA= +github.com/iotaledger/hive.go v0.0.0-20200713104016-55eaecb9d028 h1:GUKeANC04c4NE94Gs6rkbWfo9bfOkqtLwbnCGFLL+TY= +github.com/iotaledger/hive.go v0.0.0-20200713104016-55eaecb9d028/go.mod h1:42UvBc41QBsuM7z1P1fABMonTJb7kGqkzshRebClQvA= github.com/iotaledger/iota.go v1.0.0-beta.15/go.mod h1:Rn6v5hLAn8YBaJlRu1ZQdPAgKlshJR1PTeLQaft2778= github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= diff --git a/tools/integration-tests/tester/tests/testutil.go b/tools/integration-tests/tester/tests/testutil.go index c337b7daac7eef6061cf9f208a00fd3c923a84a6..49ab041eee8bae05bfc6e73256aef7cc5bd0a6b3 100644 --- a/tools/integration-tests/tester/tests/testutil.go +++ b/tools/integration-tests/tester/tests/testutil.go @@ -119,7 +119,7 @@ func CheckForMessageIds(t *testing.T, peers []*framework.Peer, ids map[string]Da // check that the peer sees itself as synchronized info, err := peer.Info() require.NoError(t, err) - require.True(t, info.Synced) + assert.Truef(t, info.Synced, "Node %s is not synced", peer) } resp, err := peer.FindMessageByID(idsSlice)