Skip to content
Snippets Groups Projects
plugin.go 3.86 KiB
package gossip

import (
	"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
	"github.com/iotaledger/goshimmer/packages/binary/messagelayer/tangle"
	"github.com/iotaledger/goshimmer/packages/gossip"
	"github.com/iotaledger/goshimmer/packages/shutdown"
	"github.com/iotaledger/goshimmer/plugins/autopeering"
	"github.com/iotaledger/goshimmer/plugins/messagelayer"
	"github.com/iotaledger/hive.go/autopeering/peer"
	"github.com/iotaledger/hive.go/autopeering/selection"
	"github.com/iotaledger/hive.go/daemon"
	"github.com/iotaledger/hive.go/events"
	"github.com/iotaledger/hive.go/logger"
	"github.com/iotaledger/hive.go/node"
)

// PluginName is the name of the gossip plugin.
const PluginName = "Gossip"

var (
	// Plugin is the plugin instance of the gossip plugin.
	Plugin = node.NewPlugin(PluginName, node.Enabled, configure, run)

	log *logger.Logger
)

func configure(*node.Plugin) {
	log = logger.NewLogger(PluginName)

	configureLogging()
	configureMessageLayer()
	configureAutopeering()
}

func run(*node.Plugin) {
	if err := daemon.BackgroundWorker(PluginName, start, shutdown.PriorityGossip); err != nil {
		log.Panicf("Failed to start as daemon: %s", err)
	}
}

func configureAutopeering() {
	// assure that the Manager is instantiated
	mgr := Manager()

	// link to the autopeering events
	peerSel := autopeering.Selection()
	peerSel.Events().Dropped.Attach(events.NewClosure(func(ev *selection.DroppedEvent) {
		go func() {
			if err := mgr.DropNeighbor(ev.DroppedID); err != nil {
				log.Debugw("error dropping neighbor", "id", ev.DroppedID, "err", err)
			}
		}()
	}))
	peerSel.Events().IncomingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) {
		if !ev.Status {
			return // ignore rejected peering
		}
		go func() {
			if err := mgr.AddInbound(ev.Peer); err != nil {
				log.Debugw("error adding inbound", "id", ev.Peer.ID(), "err", err)
			}
		}()
	}))
	peerSel.Events().OutgoingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) {
		if !ev.Status {
			return // ignore rejected peering
		}
		go func() {
			if err := mgr.AddOutbound(ev.Peer); err != nil {
				log.Debugw("error adding outbound", "id", ev.Peer.ID(), "err", err)
			}
		}()
	}))

	// notify the autopeering on connection loss
	mgr.Events().ConnectionFailed.Attach(events.NewClosure(func(p *peer.Peer, _ error) {
		peerSel.RemoveNeighbor(p.ID())
	}))
	mgr.Events().NeighborRemoved.Attach(events.NewClosure(func(p *peer.Peer) {
		peerSel.RemoveNeighbor(p.ID())
	}))
}

func configureLogging() {
	// assure that the Manager is instantiated
	mgr := Manager()

	// log the gossip events
	mgr.Events().ConnectionFailed.Attach(events.NewClosure(func(p *peer.Peer, err error) {
		log.Infof("Connection to neighbor %s / %s failed: %s", gossip.GetAddress(p), p.ID(), err)
	}))
	mgr.Events().NeighborAdded.Attach(events.NewClosure(func(n *gossip.Neighbor) {
		log.Infof("Neighbor added: %s / %s", gossip.GetAddress(n.Peer), n.ID())
	}))
	mgr.Events().NeighborRemoved.Attach(events.NewClosure(func(p *peer.Peer) {
		log.Infof("Neighbor removed: %s / %s", gossip.GetAddress(p), p.ID())
	}))
}

func configureMessageLayer() {
	// assure that the Manager is instantiated
	mgr := Manager()

	// configure flow of incoming messages
	mgr.Events().MessageReceived.Attach(events.NewClosure(func(event *gossip.MessageReceivedEvent) {
		messagelayer.MessageParser.Parse(event.Data, event.Peer)
	}))

	// configure flow of outgoing messages (gossip on solidification)
	messagelayer.Tangle.Events.MessageSolid.Attach(events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) {
		cachedMessageMetadata.Release()
		cachedMessage.Consume(func(msg *message.Message) {
			mgr.SendMessage(msg.Bytes())
		})
	}))

	// request missing messages
	messagelayer.MessageRequester.Events.SendRequest.Attach(events.NewClosure(func(messageId message.Id) {
		mgr.RequestMessage(messageId[:])
	}))
}