From 60efb2aab0b05d5f2cd2a4efe4a2185efb989f5b Mon Sep 17 00:00:00 2001 From: Wolfgang Welz <welzwo@gmail.com> Date: Fri, 3 Jan 2020 19:00:38 +0100 Subject: [PATCH] fix: Run autopeering plugin as a daemon --- packages/gossip/transport/transport.go | 4 +- plugins/autopeering/autopeering.go | 203 ++++++++++++++----------- plugins/autopeering/entrynodes.go | 35 ----- plugins/autopeering/plugin.go | 21 +-- 4 files changed, 123 insertions(+), 140 deletions(-) delete mode 100644 plugins/autopeering/entrynodes.go diff --git a/packages/gossip/transport/transport.go b/packages/gossip/transport/transport.go index 97f98f00..39d6c32b 100644 --- a/packages/gossip/transport/transport.go +++ b/packages/gossip/transport/transport.go @@ -89,12 +89,12 @@ func Listen(local *peer.Local, log *zap.SugaredLogger) (*TCP, error) { if err != nil { return nil, err } - // if the ip is an external ip, set it to zero + // if the ip is an external ip, set it to unspecified if tcpAddr.IP.IsGlobalUnicast() { if tcpAddr.IP.To4() != nil { tcpAddr.IP = net.IPv4zero } else { - tcpAddr.IP = net.IPv6zero + tcpAddr.IP = net.IPv6unspecified } } diff --git a/plugins/autopeering/autopeering.go b/plugins/autopeering/autopeering.go index 314459ed..ea761dbf 100644 --- a/plugins/autopeering/autopeering.go +++ b/plugins/autopeering/autopeering.go @@ -7,6 +7,7 @@ import ( "net" "net/http" "strconv" + "strings" "github.com/iotaledger/autopeering-sim/discover" "github.com/iotaledger/autopeering-sim/logger" @@ -17,16 +18,16 @@ import ( "github.com/iotaledger/autopeering-sim/transport" "github.com/iotaledger/goshimmer/plugins/autopeering/local" "github.com/iotaledger/goshimmer/plugins/gossip" + "github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/parameter" - "go.uber.org/zap" + "github.com/pkg/errors" ) var ( - debugLevel = "info" - close = make(chan struct{}, 1) - srv *server.Server - Discovery *discover.Protocol - Selection *selection.Protocol + // Discovery is the peer discovery protocol. + Discovery *discover.Protocol + // Selection is the peer selection protocol. + Selection *selection.Protocol ) const defaultZLC = `{ @@ -50,67 +51,53 @@ const defaultZLC = `{ } }` -var ( - db peer.DB - trans *transport.TransportConn - zLogger *zap.SugaredLogger - handlers []server.Handler - conn *net.UDPConn -) +var zLogger = logger.NewLogger(defaultZLC, "info") -func configureAP() { - host := parameter.NodeConfig.GetString(CFG_ADDRESS) - localhost := host - apPort := strconv.Itoa(parameter.NodeConfig.GetInt(CFG_PORT)) - gossipPort := strconv.Itoa(parameter.NodeConfig.GetInt(gossip.GOSSIP_PORT)) - - if host == "0.0.0.0" { - host = getMyIP() - } - - listenAddr := host + ":" + apPort - gossipAddr := host + ":" + gossipPort - - zLogger = logger.NewLogger(defaultZLC, debugLevel) - - addr, err := net.ResolveUDPAddr("udp", localhost+":"+apPort) - if err != nil { - log.Fatalf("ResolveUDPAddr: %v", err) - } - conn, err = net.ListenUDP("udp", addr) - if err != nil { - log.Fatalf("ListenUDP: %v", err) +func configureLocal() { + ip := net.ParseIP(parameter.NodeConfig.GetString(CFG_ADDRESS)) + if ip == nil { + log.Fatalf("Invalid IP address: %s", parameter.NodeConfig.GetString(CFG_ADDRESS)) } - - var masterPeers []*peer.Peer - peers, err := parseEntryNodes() - if err != nil { - log.Fatalf("Ignoring entry nodes: %v\n", err) - } else if peers != nil { - masterPeers = peers + if ip.IsUnspecified() { + myIp, err := getMyIP() + if err != nil { + log.Fatalf("Could not query public IP: %v", err) + } + ip = myIp } - // use the UDP connection for transport - trans = transport.Conn(conn, func(network, address string) (net.Addr, error) { return net.ResolveUDPAddr(network, address) }) + apPort := strconv.Itoa(parameter.NodeConfig.GetInt(CFG_PORT)) + gossipPort := strconv.Itoa(parameter.NodeConfig.GetInt(gossip.GOSSIP_PORT)) // create a new local node - db = peer.NewPersistentDB(zLogger.Named("db")) + db := peer.NewPersistentDB(zLogger.Named("db")) - local.INSTANCE, err = peer.NewLocal("udp", listenAddr, db) + var err error + local.INSTANCE, err = peer.NewLocal(NETWORK, net.JoinHostPort(ip.String(), apPort), db) if err != nil { - log.Fatalf("ListenUDP: %v", err) + log.Fatalf("NewLocal: %v", err) } // add a service for the gossip if parameter.NodeConfig.GetBool(CFG_SELECTION) { - local.INSTANCE.UpdateService(service.GossipKey, "tcp", gossipAddr) + err = local.INSTANCE.UpdateService(service.GossipKey, "tcp", net.JoinHostPort(ip.String(), gossipPort)) + if err != nil { + log.Fatalf("UpdateService: %v", err) + } + } +} + +func configureAP() { + masterPeers, err := parseEntryNodes() + if err != nil { + log.Errorf("Invalid entry nodes; ignoring: %v", err) } + log.Debugf("Master peers: %v", masterPeers) Discovery = discover.New(local.INSTANCE, discover.Config{ Log: zLogger.Named("disc"), MasterPeers: masterPeers, }) - handlers = append([]server.Handler{}, Discovery) if parameter.NodeConfig.GetBool(CFG_SELECTION) { Selection = selection.New(local.INSTANCE, Discovery, selection.Config{ @@ -120,73 +107,103 @@ func configureAP() { RequiredService: []service.Key{service.GossipKey}, }, }) - handlers = append(handlers, Selection) } } func start() { + defer log.Info("Stopping Auto Peering server ... done") + + addr := local.INSTANCE.Services().Get(service.PeeringKey) + udpAddr, err := net.ResolveUDPAddr(addr.Network(), addr.String()) + if err != nil { + log.Fatalf("ResolveUDPAddr: %v", err) + } + + // if the ip is an external ip, set it to unspecified + if udpAddr.IP.IsGlobalUnicast() { + if udpAddr.IP.To4() != nil { + udpAddr.IP = net.IPv4zero + } else { + udpAddr.IP = net.IPv6unspecified + } + } + + conn, err := net.ListenUDP(addr.Network(), udpAddr) + if err != nil { + log.Fatalf("ListenUDP: %v", err) + } + + // use the UDP connection for transport + trans := transport.Conn(conn, func(network, address string) (net.Addr, error) { return net.ResolveUDPAddr(network, address) }) + defer trans.Close() + + handlers := []server.Handler{Discovery} + if Selection != nil { + handlers = append(handlers, Selection) + } + // start a server doing discovery and peering - srv = server.Listen(local.INSTANCE, trans, zLogger.Named("srv"), handlers...) + srv := server.Listen(local.INSTANCE, trans, zLogger.Named("srv"), handlers...) + defer srv.Close() // start the discovery on that connection Discovery.Start(srv) + defer Discovery.Close() - // start the peering on that connection - if parameter.NodeConfig.GetBool(CFG_SELECTION) { + if Selection != nil { + // start the peering on that connection Selection.Start(srv) defer Selection.Close() } - id := base64.StdEncoding.EncodeToString(local.INSTANCE.PublicKey()) - zLogger.Info("Discovery protocol started: ID=" + id + ", address=" + srv.LocalAddr()) + log.Infof("Auto Peering server started: ID=%x, address=%s", local.INSTANCE.ID(), srv.LocalAddr()) - defer func() { - _ = zLogger.Sync() // ignore the returned error - trans.Close() - db.Close() - conn.Close() - srv.Close() - Discovery.Close() - }() + <-daemon.ShutdownSignal + log.Info("Stopping Auto Peering server ...") +} - // Only for debug - // go func() { - // for t := range time.NewTicker(2 * time.Second).C { - // _ = t - // printReport(zLogger) - // } - // }() +func parseEntryNodes() (result []*peer.Peer, err error) { + for _, entryNodeDefinition := range parameter.NodeConfig.GetStringSlice(CFG_ENTRY_NODES) { + if entryNodeDefinition == "" { + continue + } + + parts := strings.Split(entryNodeDefinition, "@") + if len(parts) != 2 { + return nil, fmt.Errorf("parseMaster") + } + pubKey, err := base64.StdEncoding.DecodeString(parts[0]) + if err != nil { + return nil, errors.Wrap(err, "parseMaster") + } + + services := service.New() + services.Update(service.PeeringKey, "udp", parts[1]) + + result = append(result, peer.NewPeer(pubKey, services)) + } - <-close + return result, nil } -func getMyIP() string { +func getMyIP() (net.IP, error) { url := "https://api.ipify.org?format=text" resp, err := http.Get(url) if err != nil { - return "" + return nil, err } defer resp.Body.Close() - ip, err := ioutil.ReadAll(resp.Body) + + body, err := ioutil.ReadAll(resp.Body) if err != nil { - return "" + return nil, err } - return fmt.Sprintf("%s", ip) -} -// used only for debugging puropose -// func printReport(log *zap.SugaredLogger) { -// if Discovery == nil || Selection == nil { -// return -// } -// knownPeers := Discovery.GetVerifiedPeers() -// incoming := []*peer.Peer{} -// outgoing := []*peer.Peer{} -// if Selection != nil { -// incoming = Selection.GetIncomingNeighbors() -// outgoing = Selection.GetOutgoingNeighbors() -// } -// log.Info("Known peers:", len(knownPeers)) -// log.Info("Chosen:", len(outgoing)) -// log.Info("Accepted:", len(incoming)) -// } + // the body only consists of the ip address + ip := net.ParseIP(string(body)) + if ip == nil { + return nil, fmt.Errorf("not an IP: %s", body) + } + + return ip, nil +} diff --git a/plugins/autopeering/entrynodes.go b/plugins/autopeering/entrynodes.go deleted file mode 100644 index 1b4384d3..00000000 --- a/plugins/autopeering/entrynodes.go +++ /dev/null @@ -1,35 +0,0 @@ -package autopeering - -import ( - "encoding/base64" - "strings" - - "github.com/iotaledger/autopeering-sim/peer" - "github.com/iotaledger/autopeering-sim/peer/service" - "github.com/iotaledger/goshimmer/packages/errors" - "github.com/iotaledger/hive.go/parameter" -) - -func parseEntryNodes() (result []*peer.Peer, err error) { - for _, entryNodeDefinition := range parameter.NodeConfig.GetStringSlice(CFG_ENTRY_NODES) { - if entryNodeDefinition == "" { - continue - } - - parts := strings.Split(entryNodeDefinition, "@") - if len(parts) != 2 { - return nil, errors.New("parseMaster") - } - pubKey, err := base64.StdEncoding.DecodeString(parts[0]) - if err != nil { - return nil, errors.Wrap(err, "parseMaster") - } - - services := service.New() - services.Update(service.PeeringKey, "udp", parts[1]) - - result = append(result, peer.NewPeer(pubKey, services)) - } - - return result, nil -} diff --git a/plugins/autopeering/plugin.go b/plugins/autopeering/plugin.go index 9168f070..904a19ff 100644 --- a/plugins/autopeering/plugin.go +++ b/plugins/autopeering/plugin.go @@ -9,23 +9,24 @@ import ( "github.com/iotaledger/hive.go/node" ) -var PLUGIN = node.NewPlugin("Auto Peering", node.Enabled, configure, run) -var log = logger.NewLogger("Autopeering") +// NETWORK defines the network type used for the autopeering. +const NETWORK = "udp" -func configure(plugin *node.Plugin) { - daemon.Events.Shutdown.Attach(events.NewClosure(func() { - close <- struct{}{} - })) +var PLUGIN = node.NewPlugin("Autopeering", node.Enabled, configure, run) + +var log = logger.NewLogger("Autopeering") +func configure(*node.Plugin) { + configureEvents() + configureLocal() configureAP() - configureLogging(plugin) } -func run(plugin *node.Plugin) { - go start() +func run(*node.Plugin) { + daemon.BackgroundWorker("Autopeering", start) } -func configureLogging(plugin *node.Plugin) { +func configureEvents() { gossip.Events.NeighborDropped.Attach(events.NewClosure(func(ev *gossip.NeighborDroppedEvent) { log.Info("neighbor dropped: " + ev.Peer.Address() + " / " + ev.Peer.ID().String()) if Selection != nil { -- GitLab