Skip to content
Snippets Groups Projects
Unverified Commit ba4bd548 authored by Luca Moser's avatar Luca Moser Committed by GitHub
Browse files

Adds tips broadcaster to gossip plugin (#608)

* adds tips broadcaster to gossip plugin

* adds missing for loop

* addresses review comments
parent 16bd8d85
No related branches found
No related tags found
No related merge requests found
......@@ -40,7 +40,10 @@
"bindAddress": "0.0.0.0:10895"
},
"gossip": {
"port": 14666
"port": 14666,
"tipsBroadcaster": {
"interval": "10s"
}
},
"logger": {
"level": "info",
......
package gossip
import (
"time"
flag "github.com/spf13/pflag"
)
const (
// CfgGossipPort defines the config flag of the gossip port.
CfgGossipPort = "gossip.port"
// CfgGossipTipsBroadcastInterval the interval in which the oldest known tip is re-broadcasted.
CfgGossipTipsBroadcastInterval = "gossip.tipsBroadcaster.interval"
)
func init() {
flag.Int(CfgGossipPort, 14666, "tcp port for gossip connection")
flag.Duration(CfgGossipTipsBroadcastInterval, 10*time.Second, "the interval in which the oldest known tip is re-broadcasted")
}
......@@ -42,6 +42,7 @@ func configure(*node.Plugin) {
configureLogging()
configureMessageLayer()
configureAutopeering()
configureTipBroadcaster()
}
func run(*node.Plugin) {
......
package gossip
import (
"container/list"
"sync"
"time"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
)
const (
// the amount of oldest tips in the tip pool to broadcast up on each interval
maxOldestTipsToBroadcastPerInterval = 2
)
var tips = tiplist{dict: make(map[message.Id]*list.Element)}
type tiplist struct {
mu sync.Mutex
dict map[message.Id]*list.Element
list list.List
iterator *list.Element
}
func (s *tiplist) AddTip(id message.Id) {
s.mu.Lock()
defer s.mu.Unlock()
if _, contains := s.dict[id]; contains {
return
}
elem := s.list.PushBack(id)
s.dict[id] = elem
if s.iterator == nil {
s.iterator = elem
}
}
func (s *tiplist) RemoveTip(id message.Id) {
s.mu.Lock()
defer s.mu.Unlock()
elem, ok := s.dict[id]
if ok {
s.list.Remove(elem)
if s.iterator == elem {
s.next(elem)
}
}
}
func (s *tiplist) Next() (id message.Id) {
s.mu.Lock()
defer s.mu.Unlock()
if s.iterator != nil {
id = s.iterator.Value.(message.Id)
s.next(s.iterator)
}
return
}
func (s *tiplist) next(elem *list.Element) {
s.iterator = elem.Next()
if s.iterator == nil {
s.iterator = s.list.Front()
}
}
func configureTipBroadcaster() {
tipSelector := messagelayer.TipSelector()
addedTipsClosure := events.NewClosure(tips.AddTip)
removedTipClosure := events.NewClosure(tips.RemoveTip)
tipSelector.Events.TipAdded.Attach(addedTipsClosure)
tipSelector.Events.TipRemoved.Attach(removedTipClosure)
if err := daemon.BackgroundWorker("Tips-Broadcaster", func(shutdownSignal <-chan struct{}) {
log.Info("broadcaster started")
defer log.Info("broadcaster stopped")
defer tipSelector.Events.TipAdded.Detach(addedTipsClosure)
defer tipSelector.Events.TipRemoved.Detach(removedTipClosure)
ticker := time.NewTicker(config.Node().GetDuration(CfgGossipTipsBroadcastInterval))
defer ticker.Stop()
for {
select {
case <-ticker.C:
broadcastOldestTips()
case <-shutdownSignal:
return
}
}
}, shutdown.PriorityGossip); err != nil {
log.Panicf("Couldn't create demon: %s", err)
}
}
// broadcasts up to maxOldestTipsToBroadcastPerInterval tips from the tip pool
// to all connected neighbors.
func broadcastOldestTips() {
for toBroadcast := maxOldestTipsToBroadcastPerInterval; toBroadcast > 0; toBroadcast-- {
msgID := tips.Next()
if msgID == message.EmptyId {
break
}
log.Debugf("broadcasting tip %s", msgID)
broadcastMessage(msgID)
}
}
// broadcasts the given message to all neighbors if it exists.
func broadcastMessage(msgID message.Id) {
cachedMessage := messagelayer.Tangle().Message(msgID)
defer cachedMessage.Release()
if !cachedMessage.Exists() {
return
}
msg := cachedMessage.Unwrap()
Manager().SendMessage(msg.Bytes())
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment