From ba4bd54814bef916a666d3261771d0b02e4d331a Mon Sep 17 00:00:00 2001 From: Luca Moser <moser.luca@gmail.com> Date: Wed, 1 Jul 2020 15:28:14 +0200 Subject: [PATCH] Adds tips broadcaster to gossip plugin (#608) * adds tips broadcaster to gossip plugin * adds missing for loop * addresses review comments --- config.default.json | 5 +- plugins/gossip/parameters.go | 6 ++ plugins/gossip/plugin.go | 1 + plugins/gossip/tips_broadcaster.go | 126 +++++++++++++++++++++++++++++ 4 files changed, 137 insertions(+), 1 deletion(-) create mode 100644 plugins/gossip/tips_broadcaster.go diff --git a/config.default.json b/config.default.json index 5f172cac..924792ef 100644 --- a/config.default.json +++ b/config.default.json @@ -40,7 +40,10 @@ "bindAddress": "0.0.0.0:10895" }, "gossip": { - "port": 14666 + "port": 14666, + "tipsBroadcaster": { + "interval": "10s" + } }, "logger": { "level": "info", diff --git a/plugins/gossip/parameters.go b/plugins/gossip/parameters.go index ade8e8a2..44ac5efa 100644 --- a/plugins/gossip/parameters.go +++ b/plugins/gossip/parameters.go @@ -1,14 +1,20 @@ package gossip import ( + "time" + flag "github.com/spf13/pflag" ) const ( // CfgGossipPort defines the config flag of the gossip port. CfgGossipPort = "gossip.port" + + // CfgGossipTipsBroadcastInterval the interval in which the oldest known tip is re-broadcasted. + CfgGossipTipsBroadcastInterval = "gossip.tipsBroadcaster.interval" ) func init() { flag.Int(CfgGossipPort, 14666, "tcp port for gossip connection") + flag.Duration(CfgGossipTipsBroadcastInterval, 10*time.Second, "the interval in which the oldest known tip is re-broadcasted") } diff --git a/plugins/gossip/plugin.go b/plugins/gossip/plugin.go index d32ad253..83f36f08 100644 --- a/plugins/gossip/plugin.go +++ b/plugins/gossip/plugin.go @@ -42,6 +42,7 @@ func configure(*node.Plugin) { configureLogging() configureMessageLayer() configureAutopeering() + configureTipBroadcaster() } func run(*node.Plugin) { diff --git a/plugins/gossip/tips_broadcaster.go b/plugins/gossip/tips_broadcaster.go new file mode 100644 index 00000000..0ea6341f --- /dev/null +++ b/plugins/gossip/tips_broadcaster.go @@ -0,0 +1,126 @@ +package gossip + +import ( + "container/list" + "sync" + "time" + + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" + "github.com/iotaledger/goshimmer/packages/shutdown" + "github.com/iotaledger/goshimmer/plugins/config" + "github.com/iotaledger/goshimmer/plugins/messagelayer" + "github.com/iotaledger/hive.go/daemon" + "github.com/iotaledger/hive.go/events" +) + +const ( + // the amount of oldest tips in the tip pool to broadcast up on each interval + maxOldestTipsToBroadcastPerInterval = 2 +) + +var tips = tiplist{dict: make(map[message.Id]*list.Element)} + +type tiplist struct { + mu sync.Mutex + + dict map[message.Id]*list.Element + list list.List + iterator *list.Element +} + +func (s *tiplist) AddTip(id message.Id) { + s.mu.Lock() + defer s.mu.Unlock() + + if _, contains := s.dict[id]; contains { + return + } + + elem := s.list.PushBack(id) + s.dict[id] = elem + if s.iterator == nil { + s.iterator = elem + } +} + +func (s *tiplist) RemoveTip(id message.Id) { + s.mu.Lock() + defer s.mu.Unlock() + + elem, ok := s.dict[id] + if ok { + s.list.Remove(elem) + if s.iterator == elem { + s.next(elem) + } + } +} + +func (s *tiplist) Next() (id message.Id) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.iterator != nil { + id = s.iterator.Value.(message.Id) + s.next(s.iterator) + } + return +} + +func (s *tiplist) next(elem *list.Element) { + s.iterator = elem.Next() + if s.iterator == nil { + s.iterator = s.list.Front() + } +} + +func configureTipBroadcaster() { + tipSelector := messagelayer.TipSelector() + addedTipsClosure := events.NewClosure(tips.AddTip) + removedTipClosure := events.NewClosure(tips.RemoveTip) + tipSelector.Events.TipAdded.Attach(addedTipsClosure) + tipSelector.Events.TipRemoved.Attach(removedTipClosure) + + if err := daemon.BackgroundWorker("Tips-Broadcaster", func(shutdownSignal <-chan struct{}) { + log.Info("broadcaster started") + defer log.Info("broadcaster stopped") + defer tipSelector.Events.TipAdded.Detach(addedTipsClosure) + defer tipSelector.Events.TipRemoved.Detach(removedTipClosure) + ticker := time.NewTicker(config.Node().GetDuration(CfgGossipTipsBroadcastInterval)) + defer ticker.Stop() + for { + select { + case <-ticker.C: + broadcastOldestTips() + case <-shutdownSignal: + return + } + } + }, shutdown.PriorityGossip); err != nil { + log.Panicf("Couldn't create demon: %s", err) + } +} + +// broadcasts up to maxOldestTipsToBroadcastPerInterval tips from the tip pool +// to all connected neighbors. +func broadcastOldestTips() { + for toBroadcast := maxOldestTipsToBroadcastPerInterval; toBroadcast > 0; toBroadcast-- { + msgID := tips.Next() + if msgID == message.EmptyId { + break + } + log.Debugf("broadcasting tip %s", msgID) + broadcastMessage(msgID) + } +} + +// broadcasts the given message to all neighbors if it exists. +func broadcastMessage(msgID message.Id) { + cachedMessage := messagelayer.Tangle().Message(msgID) + defer cachedMessage.Release() + if !cachedMessage.Exists() { + return + } + msg := cachedMessage.Unwrap() + Manager().SendMessage(msg.Bytes()) +} -- GitLab