Skip to content
Snippets Groups Projects
Unverified Commit f7e23d7d authored by Wolfgang Welz's avatar Wolfgang Welz Committed by GitHub
Browse files

Feat: Only gossip recent messages (#627)

* do not gossip old messages

* improve tip broadcaster

* remove log messages

* make info consistent

* delete id
parent d6e2383b
Branches
Tags
No related merge requests found
...@@ -41,6 +41,7 @@ ...@@ -41,6 +41,7 @@
}, },
"gossip": { "gossip": {
"port": 14666, "port": 14666,
"ageThreshold": "5s",
"tipsBroadcaster": { "tipsBroadcaster": {
"interval": "10s" "interval": "10s"
} }
......
...@@ -70,6 +70,11 @@ func MessageMetadataFromStorageKey(key []byte) (result objectstorage.StorableObj ...@@ -70,6 +70,11 @@ func MessageMetadataFromStorageKey(key []byte) (result objectstorage.StorableObj
return return
} }
// ReceivedTime returns the time when the message was received.
func (messageMetadata *MessageMetadata) ReceivedTime() time.Time {
return messageMetadata.receivedTime
}
func (messageMetadata *MessageMetadata) IsSolid() (result bool) { func (messageMetadata *MessageMetadata) IsSolid() (result bool) {
messageMetadata.solidMutex.RLock() messageMetadata.solidMutex.RLock()
result = messageMetadata.solid result = messageMetadata.solid
...@@ -119,7 +124,7 @@ func (messageMetadata *MessageMetadata) ObjectStorageKey() []byte { ...@@ -119,7 +124,7 @@ func (messageMetadata *MessageMetadata) ObjectStorageKey() []byte {
func (messageMetadata *MessageMetadata) ObjectStorageValue() []byte { func (messageMetadata *MessageMetadata) ObjectStorageValue() []byte {
return marshalutil.New(). return marshalutil.New().
WriteTime(messageMetadata.receivedTime). WriteTime(messageMetadata.ReceivedTime()).
WriteTime(messageMetadata.SolidificationTime()). WriteTime(messageMetadata.SolidificationTime()).
WriteBool(messageMetadata.IsSolid()). WriteBool(messageMetadata.IsSolid()).
Bytes() Bytes()
......
...@@ -81,7 +81,7 @@ func start(shutdownSignal <-chan struct{}) { ...@@ -81,7 +81,7 @@ func start(shutdownSignal <-chan struct{}) {
// trigger start of the autopeering selection // trigger start of the autopeering selection
go func() { autopeering.StartSelection() }() go func() { autopeering.StartSelection() }()
log.Infof("%s started, bind-address=%s", PluginName, localAddr.String()) log.Infof("%s started: age-threshold=%v bind-address=%s", PluginName, ageThreshold, localAddr.String())
<-shutdownSignal <-shutdownSignal
log.Info("Stopping " + PluginName + " ...") log.Info("Stopping " + PluginName + " ...")
...@@ -90,12 +90,13 @@ func start(shutdownSignal <-chan struct{}) { ...@@ -90,12 +90,13 @@ func start(shutdownSignal <-chan struct{}) {
autopeering.Selection().Close() autopeering.Selection().Close()
} }
// loads the given message from the message layer or an error if not found. // loads the given message from the message layer and returns it or an error if not found.
func loadMessage(messageID message.Id) (bytes []byte, err error) { func loadMessage(msgID message.Id) ([]byte, error) {
if !messagelayer.Tangle().Message(messageID).Consume(func(message *message.Message) { cachedMessage := messagelayer.Tangle().Message(msgID)
bytes = message.Bytes() defer cachedMessage.Release()
}) { if !cachedMessage.Exists() {
err = ErrMessageNotFound return nil, ErrMessageNotFound
} }
return msg := cachedMessage.Unwrap()
return msg.Bytes(), nil
} }
...@@ -9,12 +9,14 @@ import ( ...@@ -9,12 +9,14 @@ import (
const ( const (
// CfgGossipPort defines the config flag of the gossip port. // CfgGossipPort defines the config flag of the gossip port.
CfgGossipPort = "gossip.port" CfgGossipPort = "gossip.port"
// CfgGossipAgeThreshold defines the maximum age (time since reception) of a message to be gossiped.
// CfgGossipTipsBroadcastInterval the interval in which the oldest known tip is re-broadcasted. CfgGossipAgeThreshold = "gossip.ageThreshold"
// CfgGossipTipsBroadcastInterval the interval in which the oldest known tip is re-broadcast.
CfgGossipTipsBroadcastInterval = "gossip.tipsBroadcaster.interval" CfgGossipTipsBroadcastInterval = "gossip.tipsBroadcaster.interval"
) )
func init() { func init() {
flag.Int(CfgGossipPort, 14666, "tcp port for gossip connection") flag.Int(CfgGossipPort, 14666, "tcp port for gossip connection")
flag.Duration(CfgGossipTipsBroadcastInterval, 10*time.Second, "the interval in which the oldest known tip is re-broadcasted") flag.Duration(CfgGossipAgeThreshold, 5*time.Second, "message age threshold for gossip")
flag.Duration(CfgGossipTipsBroadcastInterval, 10*time.Second, "the interval in which the oldest known tip is re-broadcast")
} }
...@@ -2,12 +2,14 @@ package gossip ...@@ -2,12 +2,14 @@ package gossip
import ( import (
"sync" "sync"
"time"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/tangle" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/tangle"
"github.com/iotaledger/goshimmer/packages/gossip" "github.com/iotaledger/goshimmer/packages/gossip"
"github.com/iotaledger/goshimmer/packages/shutdown" "github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/autopeering" "github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/goshimmer/plugins/messagelayer" "github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/iotaledger/hive.go/autopeering/peer" "github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/hive.go/autopeering/selection" "github.com/iotaledger/hive.go/autopeering/selection"
...@@ -25,7 +27,9 @@ var ( ...@@ -25,7 +27,9 @@ var (
plugin *node.Plugin plugin *node.Plugin
once sync.Once once sync.Once
log *logger.Logger log *logger.Logger
ageThreshold time.Duration
tipsBroadcasterInterval time.Duration
) )
// Plugin gets the plugin instance. // Plugin gets the plugin instance.
...@@ -38,17 +42,21 @@ func Plugin() *node.Plugin { ...@@ -38,17 +42,21 @@ func Plugin() *node.Plugin {
func configure(*node.Plugin) { func configure(*node.Plugin) {
log = logger.NewLogger(PluginName) log = logger.NewLogger(PluginName)
ageThreshold = config.Node().GetDuration(CfgGossipAgeThreshold)
tipsBroadcasterInterval = config.Node().GetDuration(CfgGossipTipsBroadcastInterval)
configureLogging() configureLogging()
configureMessageLayer() configureMessageLayer()
configureAutopeering() configureAutopeering()
configureTipBroadcaster()
} }
func run(*node.Plugin) { func run(*node.Plugin) {
if err := daemon.BackgroundWorker(PluginName, start, shutdown.PriorityGossip); err != nil { if err := daemon.BackgroundWorker(PluginName, start, shutdown.PriorityGossip); err != nil {
log.Panicf("Failed to start as daemon: %s", err) log.Panicf("Failed to start as daemon: %s", err)
} }
if err := daemon.BackgroundWorker(tipsBroadcasterName, startTipBroadcaster, shutdown.PriorityGossip); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
} }
func configureAutopeering() { func configureAutopeering() {
...@@ -121,14 +129,21 @@ func configureMessageLayer() { ...@@ -121,14 +129,21 @@ func configureMessageLayer() {
// configure flow of outgoing messages (gossip on solidification) // configure flow of outgoing messages (gossip on solidification)
messagelayer.Tangle().Events.MessageSolid.Attach(events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) { messagelayer.Tangle().Events.MessageSolid.Attach(events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) {
cachedMessageMetadata.Release() defer cachedMessage.Release()
cachedMessage.Consume(func(msg *message.Message) { defer cachedMessageMetadata.Release()
mgr.SendMessage(msg.Bytes())
}) // only broadcast new message shortly after they have been received
metadata := cachedMessageMetadata.Unwrap()
if time.Since(metadata.ReceivedTime()) > ageThreshold {
return
}
msg := cachedMessage.Unwrap()
mgr.SendMessage(msg.Bytes())
})) }))
// request missing messages // request missing messages
messagelayer.MessageRequester().Events.SendRequest.Attach(events.NewClosure(func(messageId message.Id) { messagelayer.MessageRequester().Events.SendRequest.Attach(events.NewClosure(func(msgID message.Id) {
mgr.RequestMessage(messageId[:]) mgr.RequestMessage(msgID[:])
})) }))
} }
...@@ -3,19 +3,16 @@ package gossip ...@@ -3,19 +3,16 @@ package gossip
import ( import (
"container/list" "container/list"
"sync" "sync"
"time"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/goshimmer/plugins/messagelayer" "github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/timeutil"
) )
const ( const (
// the amount of oldest tips in the tip pool to broadcast up on each interval // the name of the tips broadcaster worker
maxOldestTipsToBroadcastPerInterval = 2 tipsBroadcasterName = PluginName + "[TipsBroadcaster]"
) )
var tips = tiplist{dict: make(map[message.Id]*list.Element)} var tips = tiplist{dict: make(map[message.Id]*list.Element)}
...@@ -35,7 +32,6 @@ func (s *tiplist) AddTip(id message.Id) { ...@@ -35,7 +32,6 @@ func (s *tiplist) AddTip(id message.Id) {
if _, contains := s.dict[id]; contains { if _, contains := s.dict[id]; contains {
return return
} }
elem := s.list.PushBack(id) elem := s.list.PushBack(id)
s.dict[id] = elem s.dict[id] = elem
if s.iterator == nil { if s.iterator == nil {
...@@ -47,24 +43,27 @@ func (s *tiplist) RemoveTip(id message.Id) { ...@@ -47,24 +43,27 @@ func (s *tiplist) RemoveTip(id message.Id) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
elem, ok := s.dict[id] elem, contains := s.dict[id]
if ok { if !contains {
s.list.Remove(elem) return
if s.iterator == elem { }
s.next(elem) delete(s.dict, id)
} s.list.Remove(elem)
if s.iterator == elem {
s.next(elem)
} }
} }
func (s *tiplist) Next() (id message.Id) { func (s *tiplist) Next() message.Id {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
if s.iterator != nil { if s.iterator == nil {
id = s.iterator.Value.(message.Id) return message.EmptyId
s.next(s.iterator)
} }
return id := s.iterator.Value.(message.Id)
s.next(s.iterator)
return id
} }
func (s *tiplist) next(elem *list.Element) { func (s *tiplist) next(elem *list.Element) {
...@@ -74,53 +73,39 @@ func (s *tiplist) next(elem *list.Element) { ...@@ -74,53 +73,39 @@ func (s *tiplist) next(elem *list.Element) {
} }
} }
func configureTipBroadcaster() { func startTipBroadcaster(shutdownSignal <-chan struct{}) {
defer log.Infof("Stopping %s ... done", tipsBroadcasterName)
removeClosure := events.NewClosure(tips.RemoveTip)
addClosure := events.NewClosure(tips.AddTip)
// attach the tip list to the TipSelector
tipSelector := messagelayer.TipSelector() tipSelector := messagelayer.TipSelector()
addedTipsClosure := events.NewClosure(tips.AddTip) tipSelector.Events.TipRemoved.Attach(removeClosure)
removedTipClosure := events.NewClosure(tips.RemoveTip) defer tipSelector.Events.TipRemoved.Detach(removeClosure)
tipSelector.Events.TipAdded.Attach(addedTipsClosure) tipSelector.Events.TipAdded.Attach(addClosure)
tipSelector.Events.TipRemoved.Attach(removedTipClosure) defer tipSelector.Events.TipAdded.Detach(addClosure)
if err := daemon.BackgroundWorker("Tips-Broadcaster", func(shutdownSignal <-chan struct{}) { log.Infof("%s started: interval=%v", tipsBroadcasterName, tipsBroadcasterInterval)
log.Info("broadcaster started") timeutil.Ticker(broadcastNextOldestTip, tipsBroadcasterInterval, shutdownSignal)
defer log.Info("broadcaster stopped") log.Infof("Stopping %s ...", tipsBroadcasterName)
defer tipSelector.Events.TipAdded.Detach(addedTipsClosure)
defer tipSelector.Events.TipRemoved.Detach(removedTipClosure)
ticker := time.NewTicker(config.Node().GetDuration(CfgGossipTipsBroadcastInterval))
defer ticker.Stop()
for {
select {
case <-ticker.C:
broadcastOldestTips()
case <-shutdownSignal:
return
}
}
}, shutdown.PriorityGossip); err != nil {
log.Panicf("Couldn't create demon: %s", err)
}
} }
// broadcasts up to maxOldestTipsToBroadcastPerInterval tips from the tip pool // broadcasts the next oldest tip from the tip pool to all connected neighbors.
// to all connected neighbors. func broadcastNextOldestTip() {
func broadcastOldestTips() { msgID := tips.Next()
for toBroadcast := maxOldestTipsToBroadcastPerInterval; toBroadcast > 0; toBroadcast-- { if msgID == message.EmptyId {
msgID := tips.Next() return
if msgID == message.EmptyId {
break
}
log.Debugf("broadcasting tip %s", msgID)
broadcastMessage(msgID)
} }
broadcastMessage(msgID)
} }
// broadcasts the given message to all neighbors if it exists. // broadcasts the given message to all neighbors if it exists.
func broadcastMessage(msgID message.Id) { func broadcastMessage(msgID message.Id) {
cachedMessage := messagelayer.Tangle().Message(msgID) msgBytes, err := loadMessage(msgID)
defer cachedMessage.Release() if err != nil {
if !cachedMessage.Exists() {
return return
} }
msg := cachedMessage.Unwrap() log.Debugw("broadcast tip", "id", msgID)
Manager().SendMessage(msg.Bytes()) Manager().SendMessage(msgBytes)
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment