Skip to content
Snippets Groups Projects
Commit 60efb2aa authored by Wolfgang Welz's avatar Wolfgang Welz
Browse files

fix: Run autopeering plugin as a daemon

parent 3b83a268
Branches
Tags
No related merge requests found
...@@ -89,12 +89,12 @@ func Listen(local *peer.Local, log *zap.SugaredLogger) (*TCP, error) { ...@@ -89,12 +89,12 @@ func Listen(local *peer.Local, log *zap.SugaredLogger) (*TCP, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
// if the ip is an external ip, set it to zero // if the ip is an external ip, set it to unspecified
if tcpAddr.IP.IsGlobalUnicast() { if tcpAddr.IP.IsGlobalUnicast() {
if tcpAddr.IP.To4() != nil { if tcpAddr.IP.To4() != nil {
tcpAddr.IP = net.IPv4zero tcpAddr.IP = net.IPv4zero
} else { } else {
tcpAddr.IP = net.IPv6zero tcpAddr.IP = net.IPv6unspecified
} }
} }
......
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"net" "net"
"net/http" "net/http"
"strconv" "strconv"
"strings"
"github.com/iotaledger/autopeering-sim/discover" "github.com/iotaledger/autopeering-sim/discover"
"github.com/iotaledger/autopeering-sim/logger" "github.com/iotaledger/autopeering-sim/logger"
...@@ -17,16 +18,16 @@ import ( ...@@ -17,16 +18,16 @@ import (
"github.com/iotaledger/autopeering-sim/transport" "github.com/iotaledger/autopeering-sim/transport"
"github.com/iotaledger/goshimmer/plugins/autopeering/local" "github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/goshimmer/plugins/gossip" "github.com/iotaledger/goshimmer/plugins/gossip"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/parameter" "github.com/iotaledger/hive.go/parameter"
"go.uber.org/zap" "github.com/pkg/errors"
) )
var ( var (
debugLevel = "info" // Discovery is the peer discovery protocol.
close = make(chan struct{}, 1) Discovery *discover.Protocol
srv *server.Server // Selection is the peer selection protocol.
Discovery *discover.Protocol Selection *selection.Protocol
Selection *selection.Protocol
) )
const defaultZLC = `{ const defaultZLC = `{
...@@ -50,67 +51,53 @@ const defaultZLC = `{ ...@@ -50,67 +51,53 @@ const defaultZLC = `{
} }
}` }`
var ( var zLogger = logger.NewLogger(defaultZLC, "info")
db peer.DB
trans *transport.TransportConn
zLogger *zap.SugaredLogger
handlers []server.Handler
conn *net.UDPConn
)
func configureAP() { func configureLocal() {
host := parameter.NodeConfig.GetString(CFG_ADDRESS) ip := net.ParseIP(parameter.NodeConfig.GetString(CFG_ADDRESS))
localhost := host if ip == nil {
apPort := strconv.Itoa(parameter.NodeConfig.GetInt(CFG_PORT)) log.Fatalf("Invalid IP address: %s", parameter.NodeConfig.GetString(CFG_ADDRESS))
gossipPort := strconv.Itoa(parameter.NodeConfig.GetInt(gossip.GOSSIP_PORT))
if host == "0.0.0.0" {
host = getMyIP()
}
listenAddr := host + ":" + apPort
gossipAddr := host + ":" + gossipPort
zLogger = logger.NewLogger(defaultZLC, debugLevel)
addr, err := net.ResolveUDPAddr("udp", localhost+":"+apPort)
if err != nil {
log.Fatalf("ResolveUDPAddr: %v", err)
}
conn, err = net.ListenUDP("udp", addr)
if err != nil {
log.Fatalf("ListenUDP: %v", err)
} }
if ip.IsUnspecified() {
var masterPeers []*peer.Peer myIp, err := getMyIP()
peers, err := parseEntryNodes() if err != nil {
if err != nil { log.Fatalf("Could not query public IP: %v", err)
log.Fatalf("Ignoring entry nodes: %v\n", err) }
} else if peers != nil { ip = myIp
masterPeers = peers
} }
// use the UDP connection for transport apPort := strconv.Itoa(parameter.NodeConfig.GetInt(CFG_PORT))
trans = transport.Conn(conn, func(network, address string) (net.Addr, error) { return net.ResolveUDPAddr(network, address) }) gossipPort := strconv.Itoa(parameter.NodeConfig.GetInt(gossip.GOSSIP_PORT))
// create a new local node // create a new local node
db = peer.NewPersistentDB(zLogger.Named("db")) db := peer.NewPersistentDB(zLogger.Named("db"))
local.INSTANCE, err = peer.NewLocal("udp", listenAddr, db) var err error
local.INSTANCE, err = peer.NewLocal(NETWORK, net.JoinHostPort(ip.String(), apPort), db)
if err != nil { if err != nil {
log.Fatalf("ListenUDP: %v", err) log.Fatalf("NewLocal: %v", err)
} }
// add a service for the gossip // add a service for the gossip
if parameter.NodeConfig.GetBool(CFG_SELECTION) { if parameter.NodeConfig.GetBool(CFG_SELECTION) {
local.INSTANCE.UpdateService(service.GossipKey, "tcp", gossipAddr) err = local.INSTANCE.UpdateService(service.GossipKey, "tcp", net.JoinHostPort(ip.String(), gossipPort))
if err != nil {
log.Fatalf("UpdateService: %v", err)
}
}
}
func configureAP() {
masterPeers, err := parseEntryNodes()
if err != nil {
log.Errorf("Invalid entry nodes; ignoring: %v", err)
} }
log.Debugf("Master peers: %v", masterPeers)
Discovery = discover.New(local.INSTANCE, discover.Config{ Discovery = discover.New(local.INSTANCE, discover.Config{
Log: zLogger.Named("disc"), Log: zLogger.Named("disc"),
MasterPeers: masterPeers, MasterPeers: masterPeers,
}) })
handlers = append([]server.Handler{}, Discovery)
if parameter.NodeConfig.GetBool(CFG_SELECTION) { if parameter.NodeConfig.GetBool(CFG_SELECTION) {
Selection = selection.New(local.INSTANCE, Discovery, selection.Config{ Selection = selection.New(local.INSTANCE, Discovery, selection.Config{
...@@ -120,73 +107,103 @@ func configureAP() { ...@@ -120,73 +107,103 @@ func configureAP() {
RequiredService: []service.Key{service.GossipKey}, RequiredService: []service.Key{service.GossipKey},
}, },
}) })
handlers = append(handlers, Selection)
} }
} }
func start() { func start() {
defer log.Info("Stopping Auto Peering server ... done")
addr := local.INSTANCE.Services().Get(service.PeeringKey)
udpAddr, err := net.ResolveUDPAddr(addr.Network(), addr.String())
if err != nil {
log.Fatalf("ResolveUDPAddr: %v", err)
}
// if the ip is an external ip, set it to unspecified
if udpAddr.IP.IsGlobalUnicast() {
if udpAddr.IP.To4() != nil {
udpAddr.IP = net.IPv4zero
} else {
udpAddr.IP = net.IPv6unspecified
}
}
conn, err := net.ListenUDP(addr.Network(), udpAddr)
if err != nil {
log.Fatalf("ListenUDP: %v", err)
}
// use the UDP connection for transport
trans := transport.Conn(conn, func(network, address string) (net.Addr, error) { return net.ResolveUDPAddr(network, address) })
defer trans.Close()
handlers := []server.Handler{Discovery}
if Selection != nil {
handlers = append(handlers, Selection)
}
// start a server doing discovery and peering // start a server doing discovery and peering
srv = server.Listen(local.INSTANCE, trans, zLogger.Named("srv"), handlers...) srv := server.Listen(local.INSTANCE, trans, zLogger.Named("srv"), handlers...)
defer srv.Close()
// start the discovery on that connection // start the discovery on that connection
Discovery.Start(srv) Discovery.Start(srv)
defer Discovery.Close()
// start the peering on that connection if Selection != nil {
if parameter.NodeConfig.GetBool(CFG_SELECTION) { // start the peering on that connection
Selection.Start(srv) Selection.Start(srv)
defer Selection.Close() defer Selection.Close()
} }
id := base64.StdEncoding.EncodeToString(local.INSTANCE.PublicKey()) log.Infof("Auto Peering server started: ID=%x, address=%s", local.INSTANCE.ID(), srv.LocalAddr())
zLogger.Info("Discovery protocol started: ID=" + id + ", address=" + srv.LocalAddr())
defer func() { <-daemon.ShutdownSignal
_ = zLogger.Sync() // ignore the returned error log.Info("Stopping Auto Peering server ...")
trans.Close() }
db.Close()
conn.Close()
srv.Close()
Discovery.Close()
}()
// Only for debug func parseEntryNodes() (result []*peer.Peer, err error) {
// go func() { for _, entryNodeDefinition := range parameter.NodeConfig.GetStringSlice(CFG_ENTRY_NODES) {
// for t := range time.NewTicker(2 * time.Second).C { if entryNodeDefinition == "" {
// _ = t continue
// printReport(zLogger) }
// }
// }() parts := strings.Split(entryNodeDefinition, "@")
if len(parts) != 2 {
return nil, fmt.Errorf("parseMaster")
}
pubKey, err := base64.StdEncoding.DecodeString(parts[0])
if err != nil {
return nil, errors.Wrap(err, "parseMaster")
}
services := service.New()
services.Update(service.PeeringKey, "udp", parts[1])
result = append(result, peer.NewPeer(pubKey, services))
}
<-close return result, nil
} }
func getMyIP() string { func getMyIP() (net.IP, error) {
url := "https://api.ipify.org?format=text" url := "https://api.ipify.org?format=text"
resp, err := http.Get(url) resp, err := http.Get(url)
if err != nil { if err != nil {
return "" return nil, err
} }
defer resp.Body.Close() defer resp.Body.Close()
ip, err := ioutil.ReadAll(resp.Body)
body, err := ioutil.ReadAll(resp.Body)
if err != nil { if err != nil {
return "" return nil, err
} }
return fmt.Sprintf("%s", ip)
}
// used only for debugging puropose // the body only consists of the ip address
// func printReport(log *zap.SugaredLogger) { ip := net.ParseIP(string(body))
// if Discovery == nil || Selection == nil { if ip == nil {
// return return nil, fmt.Errorf("not an IP: %s", body)
// } }
// knownPeers := Discovery.GetVerifiedPeers()
// incoming := []*peer.Peer{} return ip, nil
// outgoing := []*peer.Peer{} }
// if Selection != nil {
// incoming = Selection.GetIncomingNeighbors()
// outgoing = Selection.GetOutgoingNeighbors()
// }
// log.Info("Known peers:", len(knownPeers))
// log.Info("Chosen:", len(outgoing))
// log.Info("Accepted:", len(incoming))
// }
package autopeering
import (
"encoding/base64"
"strings"
"github.com/iotaledger/autopeering-sim/peer"
"github.com/iotaledger/autopeering-sim/peer/service"
"github.com/iotaledger/goshimmer/packages/errors"
"github.com/iotaledger/hive.go/parameter"
)
func parseEntryNodes() (result []*peer.Peer, err error) {
for _, entryNodeDefinition := range parameter.NodeConfig.GetStringSlice(CFG_ENTRY_NODES) {
if entryNodeDefinition == "" {
continue
}
parts := strings.Split(entryNodeDefinition, "@")
if len(parts) != 2 {
return nil, errors.New("parseMaster")
}
pubKey, err := base64.StdEncoding.DecodeString(parts[0])
if err != nil {
return nil, errors.Wrap(err, "parseMaster")
}
services := service.New()
services.Update(service.PeeringKey, "udp", parts[1])
result = append(result, peer.NewPeer(pubKey, services))
}
return result, nil
}
...@@ -9,23 +9,24 @@ import ( ...@@ -9,23 +9,24 @@ import (
"github.com/iotaledger/hive.go/node" "github.com/iotaledger/hive.go/node"
) )
var PLUGIN = node.NewPlugin("Auto Peering", node.Enabled, configure, run) // NETWORK defines the network type used for the autopeering.
var log = logger.NewLogger("Autopeering") const NETWORK = "udp"
func configure(plugin *node.Plugin) { var PLUGIN = node.NewPlugin("Autopeering", node.Enabled, configure, run)
daemon.Events.Shutdown.Attach(events.NewClosure(func() {
close <- struct{}{} var log = logger.NewLogger("Autopeering")
}))
func configure(*node.Plugin) {
configureEvents()
configureLocal()
configureAP() configureAP()
configureLogging(plugin)
} }
func run(plugin *node.Plugin) { func run(*node.Plugin) {
go start() daemon.BackgroundWorker("Autopeering", start)
} }
func configureLogging(plugin *node.Plugin) { func configureEvents() {
gossip.Events.NeighborDropped.Attach(events.NewClosure(func(ev *gossip.NeighborDroppedEvent) { gossip.Events.NeighborDropped.Attach(events.NewClosure(func(ev *gossip.NeighborDroppedEvent) {
log.Info("neighbor dropped: " + ev.Peer.Address() + " / " + ev.Peer.ID().String()) log.Info("neighbor dropped: " + ev.Peer.Address() + " / " + ev.Peer.ID().String())
if Selection != nil { if Selection != nil {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment