-
Angelo Capossele authored
*
Add panic if backgroundWorker fails * Use log.Panicf instead of panicAngelo Capossele authored*
Add panic if backgroundWorker fails * Use log.Panicf instead of panic
plugin.go 3.86 KiB
package gossip
import (
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/tangle"
"github.com/iotaledger/goshimmer/packages/gossip"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/hive.go/autopeering/selection"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
)
// PluginName is the name of the gossip plugin.
const PluginName = "Gossip"
var (
// Plugin is the plugin instance of the gossip plugin.
Plugin = node.NewPlugin(PluginName, node.Enabled, configure, run)
log *logger.Logger
)
func configure(*node.Plugin) {
log = logger.NewLogger(PluginName)
configureLogging()
configureMessageLayer()
configureAutopeering()
}
func run(*node.Plugin) {
if err := daemon.BackgroundWorker(PluginName, start, shutdown.PriorityGossip); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
}
func configureAutopeering() {
// assure that the Manager is instantiated
mgr := Manager()
// link to the autopeering events
peerSel := autopeering.Selection()
peerSel.Events().Dropped.Attach(events.NewClosure(func(ev *selection.DroppedEvent) {
go func() {
if err := mgr.DropNeighbor(ev.DroppedID); err != nil {
log.Debugw("error dropping neighbor", "id", ev.DroppedID, "err", err)
}
}()
}))
peerSel.Events().IncomingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) {
if !ev.Status {
return // ignore rejected peering
}
go func() {
if err := mgr.AddInbound(ev.Peer); err != nil {
log.Debugw("error adding inbound", "id", ev.Peer.ID(), "err", err)
}
}()
}))
peerSel.Events().OutgoingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) {
if !ev.Status {
return // ignore rejected peering
}
go func() {
if err := mgr.AddOutbound(ev.Peer); err != nil {
log.Debugw("error adding outbound", "id", ev.Peer.ID(), "err", err)
}
}()
}))
// notify the autopeering on connection loss
mgr.Events().ConnectionFailed.Attach(events.NewClosure(func(p *peer.Peer, _ error) {
peerSel.RemoveNeighbor(p.ID())
}))
mgr.Events().NeighborRemoved.Attach(events.NewClosure(func(p *peer.Peer) {
peerSel.RemoveNeighbor(p.ID())
}))
}
func configureLogging() {
// assure that the Manager is instantiated
mgr := Manager()
// log the gossip events
mgr.Events().ConnectionFailed.Attach(events.NewClosure(func(p *peer.Peer, err error) {
log.Infof("Connection to neighbor %s / %s failed: %s", gossip.GetAddress(p), p.ID(), err)
}))
mgr.Events().NeighborAdded.Attach(events.NewClosure(func(n *gossip.Neighbor) {
log.Infof("Neighbor added: %s / %s", gossip.GetAddress(n.Peer), n.ID())
}))
mgr.Events().NeighborRemoved.Attach(events.NewClosure(func(p *peer.Peer) {
log.Infof("Neighbor removed: %s / %s", gossip.GetAddress(p), p.ID())
}))
}
func configureMessageLayer() {
// assure that the Manager is instantiated
mgr := Manager()
// configure flow of incoming messages
mgr.Events().MessageReceived.Attach(events.NewClosure(func(event *gossip.MessageReceivedEvent) {
messagelayer.MessageParser.Parse(event.Data, event.Peer)
}))
// configure flow of outgoing messages (gossip on solidification)
messagelayer.Tangle.Events.MessageSolid.Attach(events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) {
cachedMessageMetadata.Release()
cachedMessage.Consume(func(msg *message.Message) {
mgr.SendMessage(msg.Bytes())
})
}))
// request missing messages
messagelayer.MessageRequester.Events.SendRequest.Attach(events.NewClosure(func(messageId message.Id) {
mgr.RequestMessage(messageId[:])
}))
}