From f7e23d7d735d12c1e62c08268a56fdf819985b90 Mon Sep 17 00:00:00 2001 From: Wolfgang Welz <welzwo@gmail.com> Date: Thu, 9 Jul 2020 16:17:40 +0200 Subject: [PATCH] Feat: Only gossip recent messages (#627) * do not gossip old messages * improve tip broadcaster * remove log messages * make info consistent * delete id --- config.default.json | 1 + .../messagelayer/tangle/messagemetadata.go | 7 +- plugins/gossip/gossip.go | 17 ++-- plugins/gossip/parameters.go | 8 +- plugins/gossip/plugin.go | 31 ++++-- plugins/gossip/tips_broadcaster.go | 99 ++++++++----------- 6 files changed, 86 insertions(+), 77 deletions(-) diff --git a/config.default.json b/config.default.json index 924792ef..aa11caf7 100644 --- a/config.default.json +++ b/config.default.json @@ -41,6 +41,7 @@ }, "gossip": { "port": 14666, + "ageThreshold": "5s", "tipsBroadcaster": { "interval": "10s" } diff --git a/packages/binary/messagelayer/tangle/messagemetadata.go b/packages/binary/messagelayer/tangle/messagemetadata.go index e1abe276..9285ef78 100644 --- a/packages/binary/messagelayer/tangle/messagemetadata.go +++ b/packages/binary/messagelayer/tangle/messagemetadata.go @@ -70,6 +70,11 @@ func MessageMetadataFromStorageKey(key []byte) (result objectstorage.StorableObj return } +// ReceivedTime returns the time when the message was received. +func (messageMetadata *MessageMetadata) ReceivedTime() time.Time { + return messageMetadata.receivedTime +} + func (messageMetadata *MessageMetadata) IsSolid() (result bool) { messageMetadata.solidMutex.RLock() result = messageMetadata.solid @@ -119,7 +124,7 @@ func (messageMetadata *MessageMetadata) ObjectStorageKey() []byte { func (messageMetadata *MessageMetadata) ObjectStorageValue() []byte { return marshalutil.New(). - WriteTime(messageMetadata.receivedTime). + WriteTime(messageMetadata.ReceivedTime()). WriteTime(messageMetadata.SolidificationTime()). WriteBool(messageMetadata.IsSolid()). Bytes() diff --git a/plugins/gossip/gossip.go b/plugins/gossip/gossip.go index d08d8e61..8db0d74f 100644 --- a/plugins/gossip/gossip.go +++ b/plugins/gossip/gossip.go @@ -81,7 +81,7 @@ func start(shutdownSignal <-chan struct{}) { // trigger start of the autopeering selection go func() { autopeering.StartSelection() }() - log.Infof("%s started, bind-address=%s", PluginName, localAddr.String()) + log.Infof("%s started: age-threshold=%v bind-address=%s", PluginName, ageThreshold, localAddr.String()) <-shutdownSignal log.Info("Stopping " + PluginName + " ...") @@ -90,12 +90,13 @@ func start(shutdownSignal <-chan struct{}) { autopeering.Selection().Close() } -// loads the given message from the message layer or an error if not found. -func loadMessage(messageID message.Id) (bytes []byte, err error) { - if !messagelayer.Tangle().Message(messageID).Consume(func(message *message.Message) { - bytes = message.Bytes() - }) { - err = ErrMessageNotFound +// loads the given message from the message layer and returns it or an error if not found. +func loadMessage(msgID message.Id) ([]byte, error) { + cachedMessage := messagelayer.Tangle().Message(msgID) + defer cachedMessage.Release() + if !cachedMessage.Exists() { + return nil, ErrMessageNotFound } - return + msg := cachedMessage.Unwrap() + return msg.Bytes(), nil } diff --git a/plugins/gossip/parameters.go b/plugins/gossip/parameters.go index 44ac5efa..c3c96d5e 100644 --- a/plugins/gossip/parameters.go +++ b/plugins/gossip/parameters.go @@ -9,12 +9,14 @@ import ( const ( // CfgGossipPort defines the config flag of the gossip port. CfgGossipPort = "gossip.port" - - // CfgGossipTipsBroadcastInterval the interval in which the oldest known tip is re-broadcasted. + // CfgGossipAgeThreshold defines the maximum age (time since reception) of a message to be gossiped. + CfgGossipAgeThreshold = "gossip.ageThreshold" + // CfgGossipTipsBroadcastInterval the interval in which the oldest known tip is re-broadcast. CfgGossipTipsBroadcastInterval = "gossip.tipsBroadcaster.interval" ) func init() { flag.Int(CfgGossipPort, 14666, "tcp port for gossip connection") - flag.Duration(CfgGossipTipsBroadcastInterval, 10*time.Second, "the interval in which the oldest known tip is re-broadcasted") + flag.Duration(CfgGossipAgeThreshold, 5*time.Second, "message age threshold for gossip") + flag.Duration(CfgGossipTipsBroadcastInterval, 10*time.Second, "the interval in which the oldest known tip is re-broadcast") } diff --git a/plugins/gossip/plugin.go b/plugins/gossip/plugin.go index 83f36f08..b1b55933 100644 --- a/plugins/gossip/plugin.go +++ b/plugins/gossip/plugin.go @@ -2,12 +2,14 @@ package gossip import ( "sync" + "time" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/tangle" "github.com/iotaledger/goshimmer/packages/gossip" "github.com/iotaledger/goshimmer/packages/shutdown" "github.com/iotaledger/goshimmer/plugins/autopeering" + "github.com/iotaledger/goshimmer/plugins/config" "github.com/iotaledger/goshimmer/plugins/messagelayer" "github.com/iotaledger/hive.go/autopeering/peer" "github.com/iotaledger/hive.go/autopeering/selection" @@ -25,7 +27,9 @@ var ( plugin *node.Plugin once sync.Once - log *logger.Logger + log *logger.Logger + ageThreshold time.Duration + tipsBroadcasterInterval time.Duration ) // Plugin gets the plugin instance. @@ -38,17 +42,21 @@ func Plugin() *node.Plugin { func configure(*node.Plugin) { log = logger.NewLogger(PluginName) + ageThreshold = config.Node().GetDuration(CfgGossipAgeThreshold) + tipsBroadcasterInterval = config.Node().GetDuration(CfgGossipTipsBroadcastInterval) configureLogging() configureMessageLayer() configureAutopeering() - configureTipBroadcaster() } func run(*node.Plugin) { if err := daemon.BackgroundWorker(PluginName, start, shutdown.PriorityGossip); err != nil { log.Panicf("Failed to start as daemon: %s", err) } + if err := daemon.BackgroundWorker(tipsBroadcasterName, startTipBroadcaster, shutdown.PriorityGossip); err != nil { + log.Panicf("Failed to start as daemon: %s", err) + } } func configureAutopeering() { @@ -121,14 +129,21 @@ func configureMessageLayer() { // configure flow of outgoing messages (gossip on solidification) messagelayer.Tangle().Events.MessageSolid.Attach(events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) { - cachedMessageMetadata.Release() - cachedMessage.Consume(func(msg *message.Message) { - mgr.SendMessage(msg.Bytes()) - }) + defer cachedMessage.Release() + defer cachedMessageMetadata.Release() + + // only broadcast new message shortly after they have been received + metadata := cachedMessageMetadata.Unwrap() + if time.Since(metadata.ReceivedTime()) > ageThreshold { + return + } + + msg := cachedMessage.Unwrap() + mgr.SendMessage(msg.Bytes()) })) // request missing messages - messagelayer.MessageRequester().Events.SendRequest.Attach(events.NewClosure(func(messageId message.Id) { - mgr.RequestMessage(messageId[:]) + messagelayer.MessageRequester().Events.SendRequest.Attach(events.NewClosure(func(msgID message.Id) { + mgr.RequestMessage(msgID[:]) })) } diff --git a/plugins/gossip/tips_broadcaster.go b/plugins/gossip/tips_broadcaster.go index 0ea6341f..caff47df 100644 --- a/plugins/gossip/tips_broadcaster.go +++ b/plugins/gossip/tips_broadcaster.go @@ -3,19 +3,16 @@ package gossip import ( "container/list" "sync" - "time" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" - "github.com/iotaledger/goshimmer/packages/shutdown" - "github.com/iotaledger/goshimmer/plugins/config" "github.com/iotaledger/goshimmer/plugins/messagelayer" - "github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/events" + "github.com/iotaledger/hive.go/timeutil" ) const ( - // the amount of oldest tips in the tip pool to broadcast up on each interval - maxOldestTipsToBroadcastPerInterval = 2 + // the name of the tips broadcaster worker + tipsBroadcasterName = PluginName + "[TipsBroadcaster]" ) var tips = tiplist{dict: make(map[message.Id]*list.Element)} @@ -35,7 +32,6 @@ func (s *tiplist) AddTip(id message.Id) { if _, contains := s.dict[id]; contains { return } - elem := s.list.PushBack(id) s.dict[id] = elem if s.iterator == nil { @@ -47,24 +43,27 @@ func (s *tiplist) RemoveTip(id message.Id) { s.mu.Lock() defer s.mu.Unlock() - elem, ok := s.dict[id] - if ok { - s.list.Remove(elem) - if s.iterator == elem { - s.next(elem) - } + elem, contains := s.dict[id] + if !contains { + return + } + delete(s.dict, id) + s.list.Remove(elem) + if s.iterator == elem { + s.next(elem) } } -func (s *tiplist) Next() (id message.Id) { +func (s *tiplist) Next() message.Id { s.mu.Lock() defer s.mu.Unlock() - if s.iterator != nil { - id = s.iterator.Value.(message.Id) - s.next(s.iterator) + if s.iterator == nil { + return message.EmptyId } - return + id := s.iterator.Value.(message.Id) + s.next(s.iterator) + return id } func (s *tiplist) next(elem *list.Element) { @@ -74,53 +73,39 @@ func (s *tiplist) next(elem *list.Element) { } } -func configureTipBroadcaster() { +func startTipBroadcaster(shutdownSignal <-chan struct{}) { + defer log.Infof("Stopping %s ... done", tipsBroadcasterName) + + removeClosure := events.NewClosure(tips.RemoveTip) + addClosure := events.NewClosure(tips.AddTip) + + // attach the tip list to the TipSelector tipSelector := messagelayer.TipSelector() - addedTipsClosure := events.NewClosure(tips.AddTip) - removedTipClosure := events.NewClosure(tips.RemoveTip) - tipSelector.Events.TipAdded.Attach(addedTipsClosure) - tipSelector.Events.TipRemoved.Attach(removedTipClosure) - - if err := daemon.BackgroundWorker("Tips-Broadcaster", func(shutdownSignal <-chan struct{}) { - log.Info("broadcaster started") - defer log.Info("broadcaster stopped") - defer tipSelector.Events.TipAdded.Detach(addedTipsClosure) - defer tipSelector.Events.TipRemoved.Detach(removedTipClosure) - ticker := time.NewTicker(config.Node().GetDuration(CfgGossipTipsBroadcastInterval)) - defer ticker.Stop() - for { - select { - case <-ticker.C: - broadcastOldestTips() - case <-shutdownSignal: - return - } - } - }, shutdown.PriorityGossip); err != nil { - log.Panicf("Couldn't create demon: %s", err) - } + tipSelector.Events.TipRemoved.Attach(removeClosure) + defer tipSelector.Events.TipRemoved.Detach(removeClosure) + tipSelector.Events.TipAdded.Attach(addClosure) + defer tipSelector.Events.TipAdded.Detach(addClosure) + + log.Infof("%s started: interval=%v", tipsBroadcasterName, tipsBroadcasterInterval) + timeutil.Ticker(broadcastNextOldestTip, tipsBroadcasterInterval, shutdownSignal) + log.Infof("Stopping %s ...", tipsBroadcasterName) } -// broadcasts up to maxOldestTipsToBroadcastPerInterval tips from the tip pool -// to all connected neighbors. -func broadcastOldestTips() { - for toBroadcast := maxOldestTipsToBroadcastPerInterval; toBroadcast > 0; toBroadcast-- { - msgID := tips.Next() - if msgID == message.EmptyId { - break - } - log.Debugf("broadcasting tip %s", msgID) - broadcastMessage(msgID) +// broadcasts the next oldest tip from the tip pool to all connected neighbors. +func broadcastNextOldestTip() { + msgID := tips.Next() + if msgID == message.EmptyId { + return } + broadcastMessage(msgID) } // broadcasts the given message to all neighbors if it exists. func broadcastMessage(msgID message.Id) { - cachedMessage := messagelayer.Tangle().Message(msgID) - defer cachedMessage.Release() - if !cachedMessage.Exists() { + msgBytes, err := loadMessage(msgID) + if err != nil { return } - msg := cachedMessage.Unwrap() - Manager().SendMessage(msg.Bytes()) + log.Debugw("broadcast tip", "id", msgID) + Manager().SendMessage(msgBytes) } -- GitLab