diff --git a/config.default.json b/config.default.json index 5f172cac790624ecfbedbd7aeb46c5d84ddcde82..924792efdddbda2959662dc680a7798257cd55a7 100644 --- a/config.default.json +++ b/config.default.json @@ -40,7 +40,10 @@ "bindAddress": "0.0.0.0:10895" }, "gossip": { - "port": 14666 + "port": 14666, + "tipsBroadcaster": { + "interval": "10s" + } }, "logger": { "level": "info", diff --git a/plugins/gossip/parameters.go b/plugins/gossip/parameters.go index ade8e8a22e94808dd3c130be13c20fb509e107e4..44ac5efa94642e769de21b9d17cd9c1ce52adf94 100644 --- a/plugins/gossip/parameters.go +++ b/plugins/gossip/parameters.go @@ -1,14 +1,20 @@ package gossip import ( + "time" + flag "github.com/spf13/pflag" ) const ( // CfgGossipPort defines the config flag of the gossip port. CfgGossipPort = "gossip.port" + + // CfgGossipTipsBroadcastInterval the interval in which the oldest known tip is re-broadcasted. + CfgGossipTipsBroadcastInterval = "gossip.tipsBroadcaster.interval" ) func init() { flag.Int(CfgGossipPort, 14666, "tcp port for gossip connection") + flag.Duration(CfgGossipTipsBroadcastInterval, 10*time.Second, "the interval in which the oldest known tip is re-broadcasted") } diff --git a/plugins/gossip/plugin.go b/plugins/gossip/plugin.go index d32ad25341a99bdb1c72d93a9f4046771ece4259..83f36f08c0aed6a6122d4176ba3faa608728597e 100644 --- a/plugins/gossip/plugin.go +++ b/plugins/gossip/plugin.go @@ -42,6 +42,7 @@ func configure(*node.Plugin) { configureLogging() configureMessageLayer() configureAutopeering() + configureTipBroadcaster() } func run(*node.Plugin) { diff --git a/plugins/gossip/tips_broadcaster.go b/plugins/gossip/tips_broadcaster.go new file mode 100644 index 0000000000000000000000000000000000000000..0ea6341f5e542e8d4f1f9600fbcf37292b675ac0 --- /dev/null +++ b/plugins/gossip/tips_broadcaster.go @@ -0,0 +1,126 @@ +package gossip + +import ( + "container/list" + "sync" + "time" + + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" + "github.com/iotaledger/goshimmer/packages/shutdown" + "github.com/iotaledger/goshimmer/plugins/config" + "github.com/iotaledger/goshimmer/plugins/messagelayer" + "github.com/iotaledger/hive.go/daemon" + "github.com/iotaledger/hive.go/events" +) + +const ( + // the amount of oldest tips in the tip pool to broadcast up on each interval + maxOldestTipsToBroadcastPerInterval = 2 +) + +var tips = tiplist{dict: make(map[message.Id]*list.Element)} + +type tiplist struct { + mu sync.Mutex + + dict map[message.Id]*list.Element + list list.List + iterator *list.Element +} + +func (s *tiplist) AddTip(id message.Id) { + s.mu.Lock() + defer s.mu.Unlock() + + if _, contains := s.dict[id]; contains { + return + } + + elem := s.list.PushBack(id) + s.dict[id] = elem + if s.iterator == nil { + s.iterator = elem + } +} + +func (s *tiplist) RemoveTip(id message.Id) { + s.mu.Lock() + defer s.mu.Unlock() + + elem, ok := s.dict[id] + if ok { + s.list.Remove(elem) + if s.iterator == elem { + s.next(elem) + } + } +} + +func (s *tiplist) Next() (id message.Id) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.iterator != nil { + id = s.iterator.Value.(message.Id) + s.next(s.iterator) + } + return +} + +func (s *tiplist) next(elem *list.Element) { + s.iterator = elem.Next() + if s.iterator == nil { + s.iterator = s.list.Front() + } +} + +func configureTipBroadcaster() { + tipSelector := messagelayer.TipSelector() + addedTipsClosure := events.NewClosure(tips.AddTip) + removedTipClosure := events.NewClosure(tips.RemoveTip) + tipSelector.Events.TipAdded.Attach(addedTipsClosure) + tipSelector.Events.TipRemoved.Attach(removedTipClosure) + + if err := daemon.BackgroundWorker("Tips-Broadcaster", func(shutdownSignal <-chan struct{}) { + log.Info("broadcaster started") + defer log.Info("broadcaster stopped") + defer tipSelector.Events.TipAdded.Detach(addedTipsClosure) + defer tipSelector.Events.TipRemoved.Detach(removedTipClosure) + ticker := time.NewTicker(config.Node().GetDuration(CfgGossipTipsBroadcastInterval)) + defer ticker.Stop() + for { + select { + case <-ticker.C: + broadcastOldestTips() + case <-shutdownSignal: + return + } + } + }, shutdown.PriorityGossip); err != nil { + log.Panicf("Couldn't create demon: %s", err) + } +} + +// broadcasts up to maxOldestTipsToBroadcastPerInterval tips from the tip pool +// to all connected neighbors. +func broadcastOldestTips() { + for toBroadcast := maxOldestTipsToBroadcastPerInterval; toBroadcast > 0; toBroadcast-- { + msgID := tips.Next() + if msgID == message.EmptyId { + break + } + log.Debugf("broadcasting tip %s", msgID) + broadcastMessage(msgID) + } +} + +// broadcasts the given message to all neighbors if it exists. +func broadcastMessage(msgID message.Id) { + cachedMessage := messagelayer.Tangle().Message(msgID) + defer cachedMessage.Release() + if !cachedMessage.Exists() { + return + } + msg := cachedMessage.Unwrap() + Manager().SendMessage(msg.Bytes()) +}