Skip to content
Snippets Groups Projects
Unverified Commit 8e22cef5 authored by Wolfgang Welz's avatar Wolfgang Welz Committed by GitHub
Browse files

Refactor: autopeering events and singletons (#394)


* adapt to hive.go autopeering events changes

* add exposed function to start autopeering neighbor selection

* start autopeering neighbor selection when gossip is started

* update go modules

* Apply suggestions from code review

Co-Authored-By: default avatarAngelo Capossele <angelocapossele@gmail.com>

* fix build errors

Co-authored-by: default avatarAngelo Capossele <angelocapossele@gmail.com>
parent 0f64f600
No related branches found
No related tags found
No related merge requests found
......@@ -58,7 +58,7 @@ func Voter() vote.DRNGRoundBasedVoter {
// create a function which gets OpinionGivers
opinionGiverFunc := func() (givers []vote.OpinionGiver, err error) {
opinionGivers := make([]vote.OpinionGiver, 0)
for _, p := range autopeering.Discovery.GetVerifiedPeers() {
for _, p := range autopeering.Discovery().GetVerifiedPeers() {
fpcService := p.Services().Get(service.FPCKey)
if fpcService == nil {
continue
......
......@@ -7,13 +7,10 @@ require (
github.com/dgraph-io/badger/v2 v2.0.2
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/drand/drand v0.5.4
github.com/gobuffalo/logger v1.0.1
github.com/gobuffalo/packr/v2 v2.7.1
github.com/golang/protobuf v1.3.4
github.com/googollee/go-engine.io v1.4.3-0.20190924125625-798118fc0dd2
github.com/googollee/go-socket.io v1.4.3-0.20191204093753-683f8725b6d0
github.com/gorilla/websocket v1.4.1
github.com/iotaledger/hive.go v0.0.0-20200428004254-2aad9483e71c
github.com/iotaledger/hive.go v0.0.0-20200430073924-0e16f8c3a522
github.com/iotaledger/iota.go v1.0.0-beta.14
github.com/labstack/echo v3.3.10+incompatible
github.com/labstack/gommon v0.3.0
......
This diff is collapsed.
......@@ -59,9 +59,9 @@ func getEventDispatchers(conn *network.ManagedConnection) *EventDispatchers {
}
log.Debugw(
"Heartbeat",
"nodeId", hex.EncodeToString(packet.OwnID),
"outboundIds", out.String(),
"inboundIds", in.String(),
"nodeID", hex.EncodeToString(packet.OwnID),
"outboundIDs", out.String(),
"inboundIDs", in.String(),
)
// Marshal() copies the content of packet, it doesn't modify it.
......@@ -88,29 +88,26 @@ func reportHeartbeat(dispatchers *EventDispatchers) {
nodeID = local.GetInstance().ID().Bytes()
}
var outboundIds [][]byte
var inboundIds [][]byte
var outboundIDs [][]byte
var inboundIDs [][]byte
// When gossip (and autopeering selection) is enabled, we have neighbors to report
if autopeering.Selection != nil {
// Get outboundIds (chosen neighbors)
outgoingNeighbors := autopeering.Selection.GetOutgoingNeighbors()
outboundIds = make([][]byte, len(outgoingNeighbors))
// Get outboundIDs (chosen neighbors)
outgoingNeighbors := autopeering.Selection().GetOutgoingNeighbors()
outboundIDs = make([][]byte, len(outgoingNeighbors))
for i, neighbor := range outgoingNeighbors {
// Doesn't copy the ID, take care not to modify underlying bytearray!
outboundIds[i] = neighbor.ID().Bytes()
outboundIDs[i] = neighbor.ID().Bytes()
}
// Get inboundIds (accepted neighbors)
incomingNeighbors := autopeering.Selection.GetIncomingNeighbors()
inboundIds = make([][]byte, len(incomingNeighbors))
// Get inboundIDs (accepted neighbors)
incomingNeighbors := autopeering.Selection().GetIncomingNeighbors()
inboundIDs = make([][]byte, len(incomingNeighbors))
for i, neighbor := range incomingNeighbors {
// Doesn't copy the ID, take care not to modify underlying bytearray!
inboundIds[i] = neighbor.ID().Bytes()
}
inboundIDs[i] = neighbor.ID().Bytes()
}
packet := &heartbeat.Packet{OwnID: nodeID, OutboundIDs: outboundIds, InboundIDs: inboundIds}
packet := &heartbeat.Packet{OwnID: nodeID, OutboundIDs: outboundIDs, InboundIDs: inboundIDs}
dispatchers.Heartbeat(packet)
}
......@@ -8,11 +8,11 @@ import (
"net"
"strconv"
"strings"
"sync"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/goshimmer/plugins/banner"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/goshimmer/plugins/gossip"
"github.com/iotaledger/hive.go/autopeering/discover"
"github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/hive.go/autopeering/peer/service"
......@@ -20,7 +20,7 @@ import (
"github.com/iotaledger/hive.go/autopeering/server"
"github.com/iotaledger/hive.go/crypto/ed25519"
"github.com/iotaledger/hive.go/identity"
"github.com/iotaledger/hive.go/node"
"github.com/iotaledger/hive.go/logger"
)
// autopeering constants
......@@ -30,55 +30,85 @@ const (
)
var (
// Discovery is the peer discovery protocol.
Discovery *discover.Protocol
// Selection is the peer selection protocol.
Selection *selection.Protocol
// ErrParsingMasterNode is returned for an invalid master node
// ErrParsingMasterNode is returned for an invalid master node.
ErrParsingMasterNode = errors.New("cannot parse master node")
// NetworkID specifies the autopeering network identifier
NetworkID = hash32([]byte(banner.AppVersion + NetworkVersion))
)
func hash32(b []byte) uint32 {
hash := fnv.New32()
_, err := hash.Write(b)
if err != nil {
panic(err)
var (
// the peer discovery protocol
peerDisc *discover.Protocol
peerDiscOnce sync.Once
// the peer selection protocol
peerSel *selection.Protocol
peerSelOnce sync.Once
// block until the peering server has been started
srvBarrier = struct {
once sync.Once
c chan *server.Server
}{c: make(chan *server.Server, 1)}
)
// Discovery returns the peer discovery instance.
func Discovery() *discover.Protocol {
peerDiscOnce.Do(createPeerDisc)
return peerDisc
}
return hash.Sum32()
// Selection returns the neighbor selection instance.
func Selection() *selection.Protocol {
peerSelOnce.Do(createPeerSel)
return peerSel
}
// GetBindAddress returns the string form of the autopeering bind address.
func GetBindAddress() string {
// BindAddress returns the string form of the autopeering bind address.
func BindAddress() string {
peering := local.GetInstance().Services().Get(service.PeeringKey)
host := config.Node.GetString(local.CfgBind)
port := strconv.Itoa(peering.Port())
return net.JoinHostPort(host, port)
}
func configureAP() {
// StartSelection starts the neighbor selection process.
// It blocks until the peer discovery has been started. Multiple calls of StartSelection are ignored.
func StartSelection() {
srvBarrier.once.Do(func() {
srv := <-srvBarrier.c
close(srvBarrier.c)
Selection().Start(srv)
})
}
func createPeerDisc() {
// assure that the logger is available
log := logger.NewLogger(PluginName).Named("disc")
masterPeers, err := parseEntryNodes()
if err != nil {
log.Errorf("Invalid entry nodes; ignoring: %v", err)
}
log.Debugf("Master peers: %v", masterPeers)
Discovery = discover.New(local.GetInstance(), ProtocolVersion, NetworkID,
discover.Logger(log.Named("disc")),
peerDisc = discover.New(local.GetInstance(), ProtocolVersion, NetworkID,
discover.Logger(log),
discover.MasterPeers(masterPeers),
)
}
// enable peer selection only when gossip is enabled
if !node.IsSkipped(gossip.Plugin) {
Selection = selection.New(local.GetInstance(), Discovery,
selection.Logger(log.Named("sel")),
func createPeerSel() {
// assure that the logger is available
log := logger.NewLogger(PluginName).Named("sel")
peerSel = selection.New(local.GetInstance(), Discovery(),
selection.Logger(log),
selection.NeighborValidator(selection.ValidatorFunc(isValidNeighbor)),
)
}
}
// isValidNeighbor checks whether a peer is a valid neighbor.
func isValidNeighbor(p *peer.Peer) bool {
......@@ -101,7 +131,7 @@ func start(shutdownSignal <-chan struct{}) {
peering := lPeer.Services().Get(service.PeeringKey)
// resolve the bind address
localAddr, err := net.ResolveUDPAddr(peering.Network(), GetBindAddress())
localAddr, err := net.ResolveUDPAddr(peering.Network(), BindAddress())
if err != nil {
log.Fatalf("Error resolving %s: %v", local.CfgBind, err)
}
......@@ -112,24 +142,13 @@ func start(shutdownSignal <-chan struct{}) {
}
defer conn.Close()
handlers := []server.Handler{Discovery}
if Selection != nil {
handlers = append(handlers, Selection)
}
// start a server doing discovery and peering
srv := server.Serve(lPeer, conn, log.Named("srv"), handlers...)
// start a server doing peerDisc and peering
srv := server.Serve(lPeer, conn, log.Named("srv"), Discovery(), Selection())
defer srv.Close()
// start the discovery on that connection
Discovery.Start(srv)
defer Discovery.Close()
if Selection != nil {
// start the peering on that connection
Selection.Start(srv)
defer Selection.Close()
}
// start the peer discovery on that connection
Discovery().Start(srv)
srvBarrier.c <- srv
log.Infof("%s started: ID=%s Address=%s/%s", PluginName, lPeer.ID(), localAddr.String(), localAddr.Network())
......@@ -137,8 +156,20 @@ func start(shutdownSignal <-chan struct{}) {
log.Infof("Stopping %s ...", PluginName)
count := lPeer.Database().PersistSeeds()
log.Infof("%d peers persisted as seeds", count)
Discovery().Close()
Selection().Close()
// persists potential peering seeds for the next start
log.Infof("%d peers persisted as seeds", lPeer.Database().PersistSeeds())
}
func hash32(b []byte) uint32 {
hash := fnv.New32()
_, err := hash.Write(b)
if err != nil {
panic(err)
}
return hash.Sum32()
}
func parseEntryNodes() (result []*peer.Peer, err error) {
......
......@@ -4,9 +4,7 @@ import (
"time"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/gossip"
"github.com/iotaledger/hive.go/autopeering/discover"
"github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/hive.go/autopeering/selection"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
......@@ -20,13 +18,14 @@ const PluginName = "Autopeering"
var (
// Plugin is the plugin instance of the autopeering plugin.
Plugin = node.NewPlugin(PluginName, node.Enabled, configure, run)
log *logger.Logger
)
func configure(*node.Plugin) {
log = logger.NewLogger(PluginName)
configureEvents()
configureAP()
}
func run(*node.Plugin) {
......@@ -36,35 +35,33 @@ func run(*node.Plugin) {
}
func configureEvents() {
// notify the selection when a connection is closed or failed.
gossip.Manager().Events().ConnectionFailed.Attach(events.NewClosure(func(p *peer.Peer, _ error) {
Selection.RemoveNeighbor(p.ID())
}))
gossip.Manager().Events().NeighborRemoved.Attach(events.NewClosure(func(p *peer.Peer) {
Selection.RemoveNeighbor(p.ID())
}))
// assure that the autopeering is instantiated
peerDisc := Discovery()
peerSel := Selection()
discover.Events.PeerDiscovered.Attach(events.NewClosure(func(ev *discover.DiscoveredEvent) {
// log the peer discovery events
peerDisc.Events().PeerDiscovered.Attach(events.NewClosure(func(ev *discover.DiscoveredEvent) {
log.Infof("Discovered: %s / %s", ev.Peer.Address(), ev.Peer.ID())
}))
discover.Events.PeerDeleted.Attach(events.NewClosure(func(ev *discover.DeletedEvent) {
peerDisc.Events().PeerDeleted.Attach(events.NewClosure(func(ev *discover.DeletedEvent) {
log.Infof("Removed offline: %s / %s", ev.Peer.Address(), ev.Peer.ID())
}))
selection.Events.SaltUpdated.Attach(events.NewClosure(func(ev *selection.SaltUpdatedEvent) {
// log the peer selection events
peerSel.Events().SaltUpdated.Attach(events.NewClosure(func(ev *selection.SaltUpdatedEvent) {
log.Infof("Salt updated; expires=%s", ev.Public.GetExpiration().Format(time.RFC822))
}))
selection.Events.OutgoingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) {
peerSel.Events().OutgoingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) {
if ev.Status {
log.Infof("Peering chosen: %s / %s", ev.Peer.Address(), ev.Peer.ID())
}
}))
selection.Events.IncomingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) {
peerSel.Events().IncomingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) {
if ev.Status {
log.Infof("Peering accepted: %s / %s", ev.Peer.Address(), ev.Peer.ID())
}
}))
selection.Events.Dropped.Attach(events.NewClosure(func(ev *selection.DroppedEvent) {
peerSel.Events().Dropped.Attach(events.NewClosure(func(ev *selection.DroppedEvent) {
log.Infof("Peering dropped: %s", ev.DroppedID)
}))
}
......@@ -193,7 +193,7 @@ func neighborMetrics() []neighbormetric {
for _, neighbor := range neighbors {
// unfortunately the neighbor manager doesn't keep track of the origin of the connection
origin := "Inbound"
for _, peer := range autopeering.Selection.GetOutgoingNeighbors() {
for _, peer := range autopeering.Selection().GetOutgoingNeighbors() {
if neighbor.Peer == peer {
origin = "Outbound"
break
......
......@@ -45,7 +45,7 @@ func Voter() vote.DRNGRoundBasedVoter {
// create a function which gets OpinionGivers
opinionGiverFunc := func() (givers []vote.OpinionGiver, err error) {
opinionGivers := make([]vote.OpinionGiver, 0)
for _, p := range autopeering.Discovery.GetVerifiedPeers() {
for _, p := range autopeering.Discovery().GetVerifiedPeers() {
fpcService := p.Services().Get(service.FPCKey)
if fpcService == nil {
continue
......
......@@ -9,6 +9,7 @@ import (
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
"github.com/iotaledger/goshimmer/packages/gossip"
"github.com/iotaledger/goshimmer/packages/gossip/server"
"github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
......@@ -18,7 +19,6 @@ import (
)
var (
log *logger.Logger
mgr *gossip.Manager
mgrOnce sync.Once
)
......@@ -30,8 +30,8 @@ func Manager() *gossip.Manager {
}
func createManager() {
log = logger.NewLogger(PluginName)
lPeer := local.GetInstance()
// assure that the logger is available
log := logger.NewLogger(PluginName)
// announce the gossip service
gossipPort := config.Node.GetInt(CfgGossipPort)
......@@ -39,6 +39,7 @@ func createManager() {
log.Fatalf("Invalid port number (%s): %d", CfgGossipPort, gossipPort)
}
lPeer := local.GetInstance()
if err := lPeer.UpdateService(service.GossipKey, "tcp", gossipPort); err != nil {
log.Fatalf("could not update services: %s", err)
}
......@@ -72,10 +73,16 @@ func start(shutdownSignal <-chan struct{}) {
mgr.Start(srv)
defer mgr.Close()
// trigger start of the autopeering selection
go func() { autopeering.StartSelection() }()
log.Infof("%s started: Address=%s/%s", PluginName, localAddr.String(), localAddr.Network())
<-shutdownSignal
log.Info("Stopping " + PluginName + " ...")
// assure that the autopeering selection is always stopped before the gossip manager
autopeering.Selection().Close()
}
// loads the given message from the message layer or an error if not found.
......
......@@ -5,33 +5,54 @@ import (
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/tangle"
"github.com/iotaledger/goshimmer/packages/gossip"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/hive.go/autopeering/selection"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
)
// PluginName is the name of the gossip plugin.
const PluginName = "Gossip"
var (
// Plugin is the plugin instance of the gossip plugin.
var Plugin = node.NewPlugin(PluginName, node.Enabled, configure, run)
Plugin = node.NewPlugin(PluginName, node.Enabled, configure, run)
log *logger.Logger
)
func configure(*node.Plugin) {
log = logger.NewLogger(PluginName)
configureLogging()
configureMessageLayer()
configureAutopeering()
}
func run(*node.Plugin) {
if err := daemon.BackgroundWorker(PluginName, start, shutdown.PriorityGossip); err != nil {
log.Errorf("Failed to start as daemon: %s", err)
}
}
func configureAutopeering() {
// assure that the Manager is instantiated
mgr := Manager()
// link to the auto peering
selection.Events.Dropped.Attach(events.NewClosure(func(ev *selection.DroppedEvent) {
// link to the autopeering events
peerSel := autopeering.Selection()
peerSel.Events().Dropped.Attach(events.NewClosure(func(ev *selection.DroppedEvent) {
go func() {
if err := mgr.DropNeighbor(ev.DroppedID); err != nil {
log.Debugw("error dropping neighbor", "id", ev.DroppedID, "err", err)
}
}()
}))
selection.Events.IncomingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) {
peerSel.Events().IncomingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) {
if !ev.Status {
return // ignore rejected peering
}
......@@ -41,7 +62,7 @@ func configure(*node.Plugin) {
}
}()
}))
selection.Events.OutgoingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) {
peerSel.Events().OutgoingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) {
if !ev.Status {
return // ignore rejected peering
}
......@@ -52,7 +73,20 @@ func configure(*node.Plugin) {
}()
}))
// log neighbor changes
// notify the autopeering on connection loss
mgr.Events().ConnectionFailed.Attach(events.NewClosure(func(p *peer.Peer, _ error) {
peerSel.RemoveNeighbor(p.ID())
}))
mgr.Events().NeighborRemoved.Attach(events.NewClosure(func(p *peer.Peer) {
peerSel.RemoveNeighbor(p.ID())
}))
}
func configureLogging() {
// assure that the Manager is instantiated
mgr := Manager()
// log the gossip events
mgr.Events().ConnectionFailed.Attach(events.NewClosure(func(p *peer.Peer, err error) {
log.Infof("Connection to neighbor %s / %s failed: %s", gossip.GetAddress(p), p.ID(), err)
}))
......@@ -62,6 +96,11 @@ func configure(*node.Plugin) {
mgr.Events().NeighborRemoved.Attach(events.NewClosure(func(p *peer.Peer) {
log.Infof("Neighbor removed: %s / %s", gossip.GetAddress(p), p.ID())
}))
}
func configureMessageLayer() {
// assure that the Manager is instantiated
mgr := Manager()
// configure flow of incoming messages
mgr.Events().MessageReceived.Attach(events.NewClosure(func(event *gossip.MessageReceivedEvent) {
......@@ -81,9 +120,3 @@ func configure(*node.Plugin) {
mgr.RequestMessage(messageId[:])
}))
}
func run(*node.Plugin) {
if err := daemon.BackgroundWorker(PluginName, start, shutdown.PriorityGossip); err != nil {
log.Errorf("Failed to start as daemon: %s", err)
}
}
......@@ -37,7 +37,7 @@ func checkAutopeeringConnection() {
peering := local.GetInstance().Services().Get(service.PeeringKey)
// resolve the bind address
localAddr, err := net.ResolveUDPAddr(peering.Network(), autopeering.GetBindAddress())
localAddr, err := net.ResolveUDPAddr(peering.Network(), autopeering.BindAddress())
if err != nil {
log.Fatalf("Error resolving %s: %v", local.CfgBind, err)
}
......@@ -56,7 +56,7 @@ func checkAutopeeringConnection() {
disc.Start(srv)
defer disc.Close()
for _, master := range autopeering.Discovery.GetMasterPeers() {
for _, master := range autopeering.Discovery().GetMasterPeers() {
err = disc.Ping(master)
if err == nil {
log.Infof("Pong received from %s", master.IP())
......
......@@ -31,16 +31,8 @@ func getNeighbors(c echo.Context) error {
accepted := []Neighbor{}
knownPeers := []Neighbor{}
if autopeering.Selection == nil {
return c.JSON(http.StatusNotImplemented, Response{Error: "Neighbor Selection is not enabled"})
}
if autopeering.Discovery == nil {
return c.JSON(http.StatusNotImplemented, Response{Error: "Neighbor Discovery is not enabled"})
}
if c.QueryParam("known") == "1" {
for _, peer := range autopeering.Discovery.GetVerifiedPeers() {
for _, peer := range autopeering.Discovery().GetVerifiedPeers() {
n := Neighbor{
ID: peer.ID().String(),
PublicKey: base64.StdEncoding.EncodeToString(peer.PublicKey().Bytes()),
......@@ -50,7 +42,7 @@ func getNeighbors(c echo.Context) error {
}
}
for _, peer := range autopeering.Selection.GetOutgoingNeighbors() {
for _, peer := range autopeering.Selection().GetOutgoingNeighbors() {
n := Neighbor{
ID: peer.ID().String(),
PublicKey: base64.StdEncoding.EncodeToString(peer.PublicKey().Bytes()),
......@@ -58,7 +50,7 @@ func getNeighbors(c echo.Context) error {
n.Services = getServices(peer)
chosen = append(chosen, n)
}
for _, peer := range autopeering.Selection.GetIncomingNeighbors() {
for _, peer := range autopeering.Selection().GetIncomingNeighbors() {
n := Neighbor{
ID: peer.ID().String(),
PublicKey: base64.StdEncoding.EncodeToString(peer.PublicKey().Bytes()),
......
......@@ -9,7 +9,7 @@ require (
github.com/docker/go-connections v0.4.0
github.com/docker/go-units v0.4.0 // indirect
github.com/iotaledger/goshimmer v0.1.3
github.com/iotaledger/hive.go v0.0.0-20200428004254-2aad9483e71c
github.com/iotaledger/hive.go v0.0.0-20200430073924-0e16f8c3a522
github.com/opencontainers/go-digest v1.0.0-rc1 // indirect
github.com/stretchr/testify v1.5.1
)
......
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment