diff --git a/go.mod b/go.mod index d0e1df79114c59810d2c315e1ffc1275bfec78a0..7a2417445e102b8befa95b29d42d328c92c0497b 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/googollee/go-engine.io v1.4.3-0.20190924125625-798118fc0dd2 github.com/googollee/go-socket.io v1.4.3-0.20191204093753-683f8725b6d0 github.com/gorilla/websocket v1.4.1 - github.com/iotaledger/hive.go v0.0.0-20200120174440-057de3927083 + github.com/iotaledger/hive.go v0.0.0-20200121213505-28904d5f037c github.com/iotaledger/iota.go v1.0.0-beta.14 github.com/labstack/echo v3.3.10+incompatible github.com/labstack/gommon v0.3.0 // indirect diff --git a/go.sum b/go.sum index 3664b27905c74ce03f34db0ca1e2e8ad939f2f08..47bfbc4884977d5de44ead02f5e46e48cb57ceaa 100644 --- a/go.sum +++ b/go.sum @@ -118,8 +118,8 @@ github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= -github.com/iotaledger/hive.go v0.0.0-20200120174440-057de3927083 h1:dQx6NHouYh3KBStOuYrgMm6wYNnf4L+grKWQV4iBxNI= -github.com/iotaledger/hive.go v0.0.0-20200120174440-057de3927083/go.mod h1:S+v90R3u4Rqe4VoOg4DhiZrAKlKZhz2UFKuK/Neqa2o= +github.com/iotaledger/hive.go v0.0.0-20200121213505-28904d5f037c h1:3T0MLyZIL74kYLqVrmv1xQlwE2ktS1IO8kjM+NyXEMU= +github.com/iotaledger/hive.go v0.0.0-20200121213505-28904d5f037c/go.mod h1:S+v90R3u4Rqe4VoOg4DhiZrAKlKZhz2UFKuK/Neqa2o= github.com/iotaledger/iota.go v1.0.0-beta.9/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8= github.com/iotaledger/iota.go v1.0.0-beta.14 h1:Oeb28MfBuJEeXcGrLhTCJFtbsnc8y1u7xidsAmiOD5A= github.com/iotaledger/iota.go v1.0.0-beta.14/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8= diff --git a/packages/autopeering/discover/manager.go b/packages/autopeering/discover/manager.go index 974b976a6fcf6b2749cfbbbf63804e8c78d2a869..e451a3cf6cee248811613f26b517b222e139c690 100644 --- a/packages/autopeering/discover/manager.go +++ b/packages/autopeering/discover/manager.go @@ -7,7 +7,6 @@ import ( "github.com/iotaledger/goshimmer/packages/autopeering/peer" "github.com/iotaledger/goshimmer/packages/autopeering/server" - "github.com/iotaledger/hive.go/backoff" "github.com/iotaledger/hive.go/logger" ) @@ -18,22 +17,16 @@ const ( MaxPeersInResponse = 6 // MaxServices is the maximum number of services a peer can support. MaxServices = 5 - // NetworkMaxRetries is the maximum number of times a failing network send is retried. - NetworkMaxRetries = 2 // VersionNum specifies the expected version number for this Protocol. VersionNum = 0 ) -// policy for retrying failed network calls -var networkRetryPolicy = backoff.ExponentialBackOff(500*time.Millisecond, 1.5).With( - backoff.Jitter(0.5), backoff.MaxRetries(NetworkMaxRetries)) - type network interface { local() *peer.Local Ping(*peer.Peer) error - discoveryRequest(*peer.Peer) ([]*peer.Peer, error) + DiscoveryRequest(*peer.Peer) ([]*peer.Peer, error) } type manager struct { @@ -144,16 +137,8 @@ func (m *manager) doReverify(done chan<- struct{}) { "addr", p.Address(), ) - err := backoff.Retry(networkRetryPolicy, func() error { - err := m.net.Ping(unwrapPeer(p)) - if err != nil && err != server.ErrTimeout { - return backoff.Permanent(err) - } - return err - }) - // could not verify the peer - if err != nil { + if m.net.Ping(unwrapPeer(p)) != nil { m.mutex.Lock() defer m.mutex.Unlock() diff --git a/packages/autopeering/discover/manager_test.go b/packages/autopeering/discover/manager_test.go index 07a35889bcf61e46123a2156a8578dd5cb09715d..afe6a3899c5e9aeb3de0b5c19e922f4b6f232ffd 100644 --- a/packages/autopeering/discover/manager_test.go +++ b/packages/autopeering/discover/manager_test.go @@ -27,7 +27,7 @@ func (m *NetworkMock) Ping(p *peer.Peer) error { return args.Error(0) } -func (m *NetworkMock) discoveryRequest(p *peer.Peer) ([]*peer.Peer, error) { +func (m *NetworkMock) DiscoveryRequest(p *peer.Peer) ([]*peer.Peer, error) { args := m.Called(p) return args.Get(0).([]*peer.Peer), args.Error(1) } @@ -69,10 +69,10 @@ func TestMgrVerifyDiscoveredPeer(t *testing.T) { p := newDummyPeer("p") - // expect ping of peer p + // expect Ping of peer p m.On("Ping", p).Return(nil).Once() - // ignore discoveryRequest calls - m.On("discoveryRequest", mock.Anything).Return([]*peer.Peer{}, nil).Maybe() + // ignore DiscoveryRequest calls + m.On("DiscoveryRequest", mock.Anything).Return([]*peer.Peer{}, nil).Maybe() // let the manager initialize time.Sleep(graceTime) @@ -89,10 +89,10 @@ func TestMgrReverifyPeer(t *testing.T) { p := newDummyPeer("p") - // expect ping of peer p + // expect Ping of peer p m.On("Ping", p).Return(nil).Once() - // ignore discoveryRequest calls - m.On("discoveryRequest", mock.Anything).Return([]*peer.Peer{}, nil).Maybe() + // ignore DiscoveryRequest calls + m.On("DiscoveryRequest", mock.Anything).Return([]*peer.Peer{}, nil).Maybe() // let the manager initialize time.Sleep(graceTime) @@ -110,9 +110,9 @@ func TestMgrRequestDiscoveredPeer(t *testing.T) { p1 := newDummyPeer("verified") p2 := newDummyPeer("discovered") - // expect discoveryRequest on the discovered peer - m.On("discoveryRequest", p1).Return([]*peer.Peer{p2}, nil).Once() - // ignore any ping + // expect DiscoveryRequest on the discovered peer + m.On("DiscoveryRequest", p1).Return([]*peer.Peer{p2}, nil).Once() + // ignore any Ping m.On("Ping", mock.Anything).Return(nil).Maybe() mgr.addVerifiedPeer(p1) @@ -128,10 +128,10 @@ func TestMgrAddManyVerifiedPeers(t *testing.T) { p := newDummyPeer("p") - // expect ping of peer p + // expect Ping of peer p m.On("Ping", p).Return(nil).Once() - // ignore discoveryRequest calls - m.On("discoveryRequest", mock.Anything).Return([]*peer.Peer{}, nil).Maybe() + // ignore DiscoveryRequest calls + m.On("DiscoveryRequest", mock.Anything).Return([]*peer.Peer{}, nil).Maybe() // let the manager initialize time.Sleep(graceTime) @@ -156,10 +156,10 @@ func TestMgrDeleteUnreachablePeer(t *testing.T) { p := newDummyPeer("p") - // expect ping of peer p, but return error - m.On("Ping", p).Return(server.ErrTimeout).Times(NetworkMaxRetries + 1) - // ignore discoveryRequest calls - m.On("discoveryRequest", mock.Anything).Return([]*peer.Peer{}, nil).Maybe() + // expect Ping of peer p, but return error + m.On("Ping", p).Return(server.ErrTimeout).Times(1) + // ignore DiscoveryRequest calls + m.On("DiscoveryRequest", mock.Anything).Return([]*peer.Peer{}, nil).Maybe() // let the manager initialize time.Sleep(graceTime) diff --git a/packages/autopeering/discover/protocol.go b/packages/autopeering/discover/protocol.go index 3cbe49361e744677789f59af69919cf8944ee543..8fbda7d71422dbb5fc6472d9128846a9a6dd14ff 100644 --- a/packages/autopeering/discover/protocol.go +++ b/packages/autopeering/discover/protocol.go @@ -2,6 +2,7 @@ package discover import ( "bytes" + "errors" "fmt" "sync" "time" @@ -12,9 +13,19 @@ import ( peerpb "github.com/iotaledger/goshimmer/packages/autopeering/peer/proto" "github.com/iotaledger/goshimmer/packages/autopeering/peer/service" "github.com/iotaledger/goshimmer/packages/autopeering/server" + "github.com/iotaledger/hive.go/backoff" "github.com/iotaledger/hive.go/logger" ) +const ( + maxRetries = 2 + logSends = true +) + +// policy for retrying failed network calls +var retryPolicy = backoff.ExponentialBackOff(500*time.Millisecond, 1.5).With( + backoff.Jitter(0.5), backoff.MaxRetries(maxRetries)) + // The Protocol handles the peer discovery. // It responds to incoming messages and sends own requests when needed. type Protocol struct { @@ -39,19 +50,12 @@ func New(local *peer.Local, cfg Config) *Protocol { return p } -// local returns the associated local peer of the neighbor selection. -func (p *Protocol) local() *peer.Local { - return p.loc -} - // Start starts the actual peer discovery over the provided Sender. func (p *Protocol) Start(s server.Sender) { p.Protocol.Sender = s p.mgr.start() - p.log.Debugw("discover started", - "addr", s.LocalAddr(), - ) + p.log.Debug("discover started") } // Close finalizes the protocol. @@ -66,14 +70,17 @@ func (p *Protocol) IsVerified(id peer.ID, addr string) bool { return time.Since(p.loc.Database().LastPong(id, addr)) < PingExpiration } -// EnsureVerified checks if the given peer has recently sent a ping; -// if not, we send a ping to trigger a verification. -func (p *Protocol) EnsureVerified(peer *peer.Peer) { +// EnsureVerified checks if the given peer has recently sent a Ping; +// if not, we send a Ping to trigger a verification. +func (p *Protocol) EnsureVerified(peer *peer.Peer) error { if !p.hasVerified(peer.ID(), peer.Address()) { - <-p.sendPing(peer.Address(), peer.ID()) - // Wait for them to ping back and process our pong + if err := p.Ping(peer); err != nil { + return err + } + // Wait for them to Ping back and process our pong time.Sleep(server.ResponseTimeout) } + return nil } // GetVerifiedPeer returns the verified peer with the given ID, or nil if no such peer exists. @@ -106,7 +113,7 @@ func (p *Protocol) HandleMessage(s *server.Server, fromAddr string, fromID peer. if err := proto.Unmarshal(data[1:], m); err != nil { return true, fmt.Errorf("invalid message: %w", err) } - if p.validatePing(s, fromAddr, m) { + if p.validatePing(fromAddr, m) { p.handlePing(s, fromAddr, fromID, fromKey, data) } @@ -126,7 +133,7 @@ func (p *Protocol) HandleMessage(s *server.Server, fromAddr string, fromID peer. if err := proto.Unmarshal(data[1:], m); err != nil { return true, fmt.Errorf("invalid message: %w", err) } - if p.validateDiscoveryRequest(s, fromAddr, fromID, m) { + if p.validateDiscoveryRequest(fromAddr, fromID, m) { p.handleDiscoveryRequest(s, fromAddr, data) } @@ -146,17 +153,33 @@ func (p *Protocol) HandleMessage(s *server.Server, fromAddr string, fromID peer. return true, nil } +// local returns the associated local peer of the neighbor selection. +func (p *Protocol) local() *peer.Local { + return p.loc +} + +// publicAddr returns the public address of the peering service in string representation. +func (p *Protocol) publicAddr() string { + return p.loc.Services().Get(service.PeeringKey).String() +} + // ------ message senders ------ -// ping sends a ping to the specified peer and blocks until a reply is received or timeout. +// Ping sends a Ping to the specified peer and blocks until a reply is received or timeout. func (p *Protocol) Ping(to *peer.Peer) error { - return <-p.sendPing(to.Address(), to.ID()) + return backoff.Retry(retryPolicy, func() error { + err := <-p.sendPing(to.Address(), to.ID()) + if err != nil && !errors.Is(err, server.ErrTimeout) { + return backoff.Permanent(err) + } + return err + }) } -// sendPing sends a ping to the specified address and expects a matching reply. +// sendPing sends a Ping to the specified address and expects a matching reply. // This method is non-blocking, but it returns a channel that can be used to query potential errors. func (p *Protocol) sendPing(toAddr string, toID peer.ID) <-chan error { - ping := newPing(p.LocalAddr(), toAddr) + ping := newPing(p.publicAddr(), toAddr) data := marshal(ping) // compute the message hash @@ -165,48 +188,48 @@ func (p *Protocol) sendPing(toAddr string, toID peer.ID) <-chan error { return bytes.Equal(m.(*pb.Pong).GetPingHash(), hash) } - // expect a pong referencing the ping we are about to send - p.log.Debugw("send message", "type", ping.Name(), "addr", toAddr) - errc := p.Protocol.SendExpectingReply(toAddr, toID, data, pb.MPong, hashEqual) - - return errc + p.logSend(toAddr, ping) + return p.Protocol.SendExpectingReply(toAddr, toID, data, pb.MPong, hashEqual) } -// discoveryRequest request known peers from the given target. This method blocks +// DiscoveryRequest request known peers from the given target. This method blocks // until a response is received and the provided peers are returned. -func (p *Protocol) discoveryRequest(to *peer.Peer) ([]*peer.Peer, error) { - p.EnsureVerified(to) +func (p *Protocol) DiscoveryRequest(to *peer.Peer) ([]*peer.Peer, error) { + if err := p.EnsureVerified(to); err != nil { + return nil, err + } - // create the request package - toAddr := to.Address() - req := newDiscoveryRequest(toAddr) + req := newDiscoveryRequest(to.Address()) data := marshal(req) // compute the message hash hash := server.PacketHash(data) - peers := make([]*peer.Peer, 0, MaxPeersInResponse) - p.log.Debugw("send message", "type", req.Name(), "addr", toAddr) - errc := p.Protocol.SendExpectingReply(toAddr, to.ID(), data, pb.MDiscoveryResponse, func(m interface{}) bool { + peers := make([]*peer.Peer, 0, MaxPeersInResponse) + callback := func(m interface{}) bool { res := m.(*pb.DiscoveryResponse) if !bytes.Equal(res.GetReqHash(), hash) { return false } - for _, rp := range res.GetPeers() { - peer, err := peer.FromProto(rp) - if err != nil { - p.log.Warnw("invalid peer received", "err", err) - continue + peers = peers[:0] + for _, protoPeer := range res.GetPeers() { + if p, _ := peer.FromProto(protoPeer); p != nil { + peers = append(peers, p) } - peers = append(peers, peer) } - return true - }) + } - // wait for the response and then return peers - return peers, <-errc + err := backoff.Retry(retryPolicy, func() error { + p.logSend(to.Address(), req) + err := <-p.Protocol.SendExpectingReply(to.Address(), to.ID(), data, pb.MDiscoveryResponse, callback) + if err != nil && !errors.Is(err, server.ErrTimeout) { + return backoff.Permanent(err) + } + return err + }) + return peers, err } // ------ helper functions ------ @@ -216,6 +239,12 @@ func (p *Protocol) hasVerified(id peer.ID, addr string) bool { return time.Since(p.loc.Database().LastPing(id, addr)) < PingExpiration } +func (p *Protocol) logSend(toAddr string, msg pb.Message) { + if logSends { + p.log.Debugw("send message", "type", msg.Name(), "addr", toAddr) + } +} + func marshal(msg pb.Message) []byte { mType := msg.Type() if mType > 0xFF { @@ -229,15 +258,15 @@ func marshal(msg pb.Message) []byte { return append([]byte{byte(mType)}, data...) } -// createDiscoverPeer creates a new peer that only has a peering service at the given address. -func createDiscoverPeer(key peer.PublicKey, network string, address string) *peer.Peer { +// newPeer creates a new peer that only has a peering service at the given address. +func newPeer(key peer.PublicKey, network string, address string) *peer.Peer { services := service.New() services.Update(service.PeeringKey, network, address) return peer.NewPeer(key, services) } -// ------ Packet Constructors ------ +// ------ Message Constructors ------ func newPing(fromAddr string, toAddr string) *pb.Ping { return &pb.Ping{ @@ -274,9 +303,9 @@ func newDiscoveryResponse(reqData []byte, list []*peer.Peer) *pb.DiscoveryRespon } } -// ------ Packet Handlers ------ +// ------ Message Handlers ------ -func (p *Protocol) validatePing(s *server.Server, fromAddr string, m *pb.Ping) bool { +func (p *Protocol) validatePing(fromAddr string, m *pb.Ping) bool { // check version number if m.GetVersion() != VersionNum { p.log.Debugw("invalid message", @@ -296,11 +325,11 @@ func (p *Protocol) validatePing(s *server.Server, fromAddr string, m *pb.Ping) b return false } // check that To matches the local address - if m.GetTo() != s.LocalAddr() { + if m.GetTo() != p.publicAddr() { p.log.Debugw("invalid message", "type", m.Name(), "to", m.GetTo(), - "want", s.LocalAddr(), + "want", p.publicAddr(), ) return false } @@ -322,36 +351,32 @@ func (p *Protocol) validatePing(s *server.Server, fromAddr string, m *pb.Ping) b func (p *Protocol) handlePing(s *server.Server, fromAddr string, fromID peer.ID, fromKey peer.PublicKey, rawData []byte) { // create and send the pong response - pong := newPong(fromAddr, rawData, s.Local().Services().CreateRecord()) + pong := newPong(fromAddr, rawData, p.loc.Services().CreateRecord()) - p.log.Debugw("send message", - "type", pong.Name(), - "addr", fromAddr, - ) + p.logSend(fromAddr, pong) s.Send(fromAddr, marshal(pong)) - // if the peer is new or expired, send a ping to verify + // if the peer is new or expired, send a Ping to verify if !p.IsVerified(fromID, fromAddr) { p.sendPing(fromAddr, fromID) } else if !p.mgr.isKnown(fromID) { // add a discovered peer to the manager if it is new - peer := createDiscoverPeer(fromKey, p.LocalNetwork(), fromAddr) - p.mgr.addDiscoveredPeer(peer) + p.mgr.addDiscoveredPeer(newPeer(fromKey, s.LocalAddr().Network(), fromAddr)) } - _ = p.local().Database().UpdateLastPing(fromID, fromAddr, time.Now()) + _ = p.loc.Database().UpdateLastPing(fromID, fromAddr, time.Now()) } func (p *Protocol) validatePong(s *server.Server, fromAddr string, fromID peer.ID, m *pb.Pong) bool { // check that To matches the local address - if m.GetTo() != s.LocalAddr() { + if m.GetTo() != p.publicAddr() { p.log.Debugw("invalid message", "type", m.Name(), "to", m.GetTo(), - "want", s.LocalAddr(), + "want", p.publicAddr(), ) return false } - // there must be a ping waiting for this pong as a reply + // there must be a Ping waiting for this pong as a reply if !s.IsExpectedReply(fromAddr, fromID, m.Type(), m) { p.log.Debugw("invalid message", "type", m.Name(), @@ -391,18 +416,18 @@ func (p *Protocol) handlePong(fromAddr string, fromID peer.ID, fromKey peer.Publ p.mgr.addVerifiedPeer(from) // update peer database - db := p.local().Database() + db := p.loc.Database() _ = db.UpdateLastPong(fromID, fromAddr, time.Now()) _ = db.UpdatePeer(from) } -func (p *Protocol) validateDiscoveryRequest(s *server.Server, fromAddr string, fromID peer.ID, m *pb.DiscoveryRequest) bool { +func (p *Protocol) validateDiscoveryRequest(fromAddr string, fromID peer.ID, m *pb.DiscoveryRequest) bool { // check that To matches the local address - if m.GetTo() != s.LocalAddr() { + if m.GetTo() != p.publicAddr() { p.log.Debugw("invalid message", "type", m.Name(), "to", m.GetTo(), - "want", s.LocalAddr(), + "want", p.publicAddr(), ) return false } @@ -435,10 +460,7 @@ func (p *Protocol) handleDiscoveryRequest(s *server.Server, fromAddr string, raw peers := p.mgr.getRandomPeers(MaxPeersInResponse, 1) res := newDiscoveryResponse(rawData, peers) - p.log.Debugw("send message", - "type", res.Name(), - "addr", fromAddr, - ) + p.logSend(fromAddr, res) s.Send(fromAddr, marshal(res)) } diff --git a/packages/autopeering/discover/protocol_test.go b/packages/autopeering/discover/protocol_test.go index 7a98bec2423dc01de0f2b98f8b296dd374aedec1..fd294f7bcacedfcbba2a13b247ddbcf088feb08e 100644 --- a/packages/autopeering/discover/protocol_test.go +++ b/packages/autopeering/discover/protocol_test.go @@ -87,11 +87,11 @@ func TestProtPingPong(t *testing.T) { peerA := getPeer(srvA) peerB := getPeer(srvB) - // send a ping from node A to B + // send a Ping from node A to B t.Run("A->B", func(t *testing.T) { assert.NoError(t, protA.Ping(peerB)) }) time.Sleep(graceTime) - // send a ping from node B to A + // send a Ping from node B to A t.Run("B->A", func(t *testing.T) { assert.NoError(t, protB.Ping(peerA)) }) time.Sleep(graceTime) } @@ -107,7 +107,7 @@ func TestProtPingTimeout(t *testing.T) { peerB := getPeer(srvB) - // send a ping from node A to B + // send a Ping from node A to B err := protA.Ping(peerB) assert.EqualError(t, err, server.ErrTimeout.Error()) } @@ -123,7 +123,7 @@ func TestProtVerifiedPeers(t *testing.T) { peerB := getPeer(srvB) - // send a ping from node A to B + // send a Ping from node A to B assert.NoError(t, protA.Ping(peerB)) time.Sleep(graceTime) @@ -146,7 +146,7 @@ func TestProtVerifiedPeer(t *testing.T) { peerA := getPeer(srvA) peerB := getPeer(srvB) - // send a ping from node A to B + // send a Ping from node A to B assert.NoError(t, protA.Ping(peerB)) time.Sleep(graceTime) @@ -172,13 +172,13 @@ func TestProtDiscoveryRequest(t *testing.T) { // request peers from node A t.Run("A->B", func(t *testing.T) { - if ps, err := protA.discoveryRequest(peerB); assert.NoError(t, err) { + if ps, err := protA.DiscoveryRequest(peerB); assert.NoError(t, err) { assert.ElementsMatch(t, []*peer.Peer{peerA}, ps) } }) // request peers from node B t.Run("B->A", func(t *testing.T) { - if ps, err := protB.discoveryRequest(peerA); assert.NoError(t, err) { + if ps, err := protB.DiscoveryRequest(peerA); assert.NoError(t, err) { assert.ElementsMatch(t, []*peer.Peer{peerB}, ps) } }) @@ -246,14 +246,14 @@ func BenchmarkPingPong(b *testing.B) { peerB := getPeer(srvB) - // send initial ping to ensure that every peer is verified + // send initial Ping to ensure that every peer is verified err := protA.Ping(peerB) require.NoError(b, err) time.Sleep(graceTime) b.ResetTimer() for n := 0; n < b.N; n++ { - // send a ping from node A to B + // send a Ping from node A to B _ = protA.Ping(peerB) } @@ -276,14 +276,14 @@ func BenchmarkDiscoveryRequest(b *testing.B) { peerB := getPeer(srvB) - // send initial request to ensure that every peer is verified - _, err := protA.discoveryRequest(peerB) + // send initial DiscoveryRequest to ensure that every peer is verified + _, err := protA.DiscoveryRequest(peerB) require.NoError(b, err) time.Sleep(graceTime) b.ResetTimer() for n := 0; n < b.N; n++ { - _, _ = protA.discoveryRequest(peerB) + _, _ = protA.DiscoveryRequest(peerB) } b.StopTimer() diff --git a/packages/autopeering/discover/query_strat.go b/packages/autopeering/discover/query_strat.go index 80247c54e443ee5490cd9f5f2c19d936679df1e8..0f70a6ba2a488af5bf99f1b1132c6855622cf1c0 100644 --- a/packages/autopeering/discover/query_strat.go +++ b/packages/autopeering/discover/query_strat.go @@ -5,10 +5,6 @@ import ( "math/rand" "sync" "time" - - "github.com/iotaledger/goshimmer/packages/autopeering/peer" - "github.com/iotaledger/goshimmer/packages/autopeering/server" - "github.com/iotaledger/hive.go/backoff" ) // doQuery is the main method of the query strategy. @@ -38,16 +34,7 @@ func (m *manager) doQuery(next chan<- time.Duration) { func (m *manager) requestWorker(p *mpeer, wg *sync.WaitGroup) { defer wg.Done() - var peers []*peer.Peer - err := backoff.Retry(networkRetryPolicy, func() error { - var err error - peers, err = m.net.discoveryRequest(unwrapPeer(p)) - if err != nil && err != server.ErrTimeout { - return backoff.Permanent(err) - } - return err - }) - + peers, err := m.net.DiscoveryRequest(unwrapPeer(p)) if err != nil || len(peers) == 0 { p.lastNewPeers = 0 diff --git a/packages/autopeering/selection/manager.go b/packages/autopeering/selection/manager.go index 0ac30f101df80213593e5ad3964fc220caa2001f..ae58fd5359dbb7282568356e14edcc064dfdfa6f 100644 --- a/packages/autopeering/selection/manager.go +++ b/packages/autopeering/selection/manager.go @@ -22,8 +22,8 @@ const ( type network interface { local() *peer.Local - RequestPeering(*peer.Peer, *salt.Salt) (bool, error) - SendPeeringDrop(*peer.Peer) + PeeringRequest(*peer.Peer, *salt.Salt) (bool, error) + PeeringDrop(*peer.Peer) } type peeringRequest struct { @@ -230,8 +230,7 @@ func (m *manager) updateOutbound(resultChan chan<- peer.PeerDistance) { return } - // send peering request - status, err := m.net.RequestPeering(candidate.Remote, m.getPublicSalt()) + status, err := m.net.PeeringRequest(candidate.Remote, m.getPublicSalt()) if err != nil { m.rejectionFilter.AddPeer(candidate.Remote.ID()) m.log.Debugw("error requesting peering", @@ -317,7 +316,7 @@ func (m *manager) dropNeighborhood(nh *Neighborhood) { // dropPeering sends the peering drop over the network and triggers the corresponding event. func (m *manager) dropPeering(p *peer.Peer) { - m.net.SendPeeringDrop(p) + m.net.PeeringDrop(p) m.log.Debugw("peering dropped", "id", p.ID(), diff --git a/packages/autopeering/selection/manager_test.go b/packages/autopeering/selection/manager_test.go index 15e472a20ce4f433e275dc27019338b0e2fb4955..af24ee4161a1f0aebe465a3d868fdca54db3cff1 100644 --- a/packages/autopeering/selection/manager_test.go +++ b/packages/autopeering/selection/manager_test.go @@ -197,11 +197,11 @@ func (n *networkMock) local() *peer.Local { return n.loc } -func (n *networkMock) SendPeeringDrop(p *peer.Peer) { +func (n *networkMock) PeeringDrop(p *peer.Peer) { n.mgr[p.ID()].removeNeighbor(n.local().ID()) } -func (n *networkMock) RequestPeering(p *peer.Peer, s *salt.Salt) (bool, error) { +func (n *networkMock) PeeringRequest(p *peer.Peer, s *salt.Salt) (bool, error) { return n.mgr[p.ID()].requestPeering(&n.local().Peer, s), nil } diff --git a/packages/autopeering/selection/protocol.go b/packages/autopeering/selection/protocol.go index f82697afd88be5cf25c0e14ecb98ec3833fa2d8a..61ad178d8b46ad5b55e9d85c647e9de9ea85fa9a 100644 --- a/packages/autopeering/selection/protocol.go +++ b/packages/autopeering/selection/protocol.go @@ -2,22 +2,34 @@ package selection import ( "bytes" + "errors" "fmt" "sync" "time" "github.com/golang/protobuf/proto" "github.com/iotaledger/goshimmer/packages/autopeering/peer" + "github.com/iotaledger/goshimmer/packages/autopeering/peer/service" "github.com/iotaledger/goshimmer/packages/autopeering/salt" pb "github.com/iotaledger/goshimmer/packages/autopeering/selection/proto" "github.com/iotaledger/goshimmer/packages/autopeering/server" + "github.com/iotaledger/hive.go/backoff" "github.com/iotaledger/hive.go/logger" ) +const ( + maxRetries = 2 + logSends = true +) + +// policy for retrying failed network calls +var retryPolicy = backoff.ExponentialBackOff(500*time.Millisecond, 1.5).With( + backoff.Jitter(0.5), backoff.MaxRetries(maxRetries)) + // DiscoverProtocol specifies the methods from the peer discovery that are required. type DiscoverProtocol interface { IsVerified(id peer.ID, addr string) bool - EnsureVerified(*peer.Peer) + EnsureVerified(*peer.Peer) error GetVerifiedPeer(id peer.ID, addr string) *peer.Peer GetVerifiedPeers() []*peer.Peer @@ -49,19 +61,12 @@ func New(local *peer.Local, disc DiscoverProtocol, cfg Config) *Protocol { return p } -// Local returns the associated local peer of the neighbor selection. -func (p *Protocol) local() *peer.Local { - return p.loc -} - // Start starts the actual neighbor selection over the provided Sender. func (p *Protocol) Start(s server.Sender) { p.Protocol.Sender = s p.mgr.start() - p.log.Debugw("neighborhood started", - "addr", s.LocalAddr(), - ) + p.log.Debug("neighborhood started") } // Close finalizes the protocol. @@ -94,7 +99,7 @@ func (p *Protocol) RemoveNeighbor(id peer.ID) { } // HandleMessage responds to incoming neighbor selection messages. -func (p *Protocol) HandleMessage(s *server.Server, fromAddr string, fromID peer.ID, fromKey peer.PublicKey, data []byte) (bool, error) { +func (p *Protocol) HandleMessage(s *server.Server, fromAddr string, fromID peer.ID, _ peer.PublicKey, data []byte) (bool, error) { switch pb.MType(data[0]) { // PeeringRequest @@ -103,7 +108,7 @@ func (p *Protocol) HandleMessage(s *server.Server, fromAddr string, fromID peer. if err := proto.Unmarshal(data[1:], m); err != nil { return true, fmt.Errorf("invalid message: %w", err) } - if p.validatePeeringRequest(s, fromAddr, fromID, m) { + if p.validatePeeringRequest(fromAddr, fromID, m) { p.handlePeeringRequest(s, fromAddr, fromID, data, m) } @@ -122,7 +127,7 @@ func (p *Protocol) HandleMessage(s *server.Server, fromAddr string, fromID peer. if err := proto.Unmarshal(data[1:], m); err != nil { return true, fmt.Errorf("invalid message: %w", err) } - if p.validatePeeringDrop(s, fromAddr, m) { + if p.validatePeeringDrop(fromAddr, m) { p.handlePeeringDrop(fromID) } @@ -133,12 +138,24 @@ func (p *Protocol) HandleMessage(s *server.Server, fromAddr string, fromID peer. return true, nil } +// Local returns the associated local peer of the neighbor selection. +func (p *Protocol) local() *peer.Local { + return p.loc +} + +// publicAddr returns the public address of the peering service in string representation. +func (p *Protocol) publicAddr() string { + return p.loc.Services().Get(service.PeeringKey).String() +} + // ------ message senders ------ -// RequestPeering sends a peering request to the given peer. This method blocks +// PeeringRequest sends a PeeringRequest to the given peer. This method blocks // until a response is received and the status answer is returned. -func (p *Protocol) RequestPeering(to *peer.Peer, salt *salt.Salt) (bool, error) { - p.disc.EnsureVerified(to) +func (p *Protocol) PeeringRequest(to *peer.Peer, salt *salt.Salt) (bool, error) { + if err := p.disc.EnsureVerified(to); err != nil { + return false, err + } // create the request package toAddr := to.Address() @@ -158,29 +175,33 @@ func (p *Protocol) RequestPeering(to *peer.Peer, salt *salt.Salt) (bool, error) return true } - p.log.Debugw("send message", - "type", req.Name(), - "addr", toAddr, - ) - err := <-p.Protocol.SendExpectingReply(toAddr, to.ID(), data, pb.MPeeringResponse, callback) - + err := backoff.Retry(retryPolicy, func() error { + p.logSend(toAddr, req) + err := <-p.Protocol.SendExpectingReply(toAddr, to.ID(), data, pb.MPeeringResponse, callback) + if err != nil && !errors.Is(err, server.ErrTimeout) { + return backoff.Permanent(err) + } + return err + }) return status, err } -// SendPeeringDrop sends a peering drop to the given peer and does not wait for any responses. -func (p *Protocol) SendPeeringDrop(to *peer.Peer) { - toAddr := to.Address() - drop := newPeeringDrop(toAddr) +// PeeringDrop sends a peering drop message to the given peer, non-blocking and does not wait for any responses. +func (p *Protocol) PeeringDrop(to *peer.Peer) { + drop := newPeeringDrop(to.Address()) - p.log.Debugw("send message", - "type", drop.Name(), - "addr", toAddr, - ) + p.logSend(to.Address(), drop) p.Protocol.Send(to, marshal(drop)) } // ------ helper functions ------ +func (p *Protocol) logSend(toAddr string, msg pb.Message) { + if logSends { + p.log.Debugw("send message", "type", msg.Name(), "addr", toAddr) + } +} + func marshal(msg pb.Message) []byte { mType := msg.Type() if mType > 0xFF { @@ -194,7 +215,7 @@ func marshal(msg pb.Message) []byte { return append([]byte{byte(mType)}, data...) } -// ------ Packet Constructors ------ +// ------ Message Constructors ------ func newPeeringRequest(toAddr string, salt *salt.Salt) *pb.PeeringRequest { return &pb.PeeringRequest{ @@ -220,12 +241,13 @@ func newPeeringDrop(toAddr string) *pb.PeeringDrop { // ------ Packet Handlers ------ -func (p *Protocol) validatePeeringRequest(s *server.Server, fromAddr string, fromID peer.ID, m *pb.PeeringRequest) bool { +func (p *Protocol) validatePeeringRequest(fromAddr string, fromID peer.ID, m *pb.PeeringRequest) bool { // check that To matches the local address - if m.GetTo() != s.LocalAddr() { + if m.GetTo() != p.publicAddr() { p.log.Debugw("invalid message", "type", m.Name(), "to", m.GetTo(), + "want", p.publicAddr(), ) return false } @@ -246,7 +268,7 @@ func (p *Protocol) validatePeeringRequest(s *server.Server, fromAddr string, fro return false } // check Salt - salt, err := salt.FromProto(m.GetSalt()) + s, err := salt.FromProto(m.GetSalt()) if err != nil { p.log.Debugw("invalid message", "type", m.Name(), @@ -255,10 +277,10 @@ func (p *Protocol) validatePeeringRequest(s *server.Server, fromAddr string, fro return false } // check salt expiration - if salt.Expired() { + if s.Expired() { p.log.Debugw("invalid message", "type", m.Name(), - "salt.expiration", salt.GetExpiration(), + "salt.expiration", s.GetExpiration(), ) return false } @@ -289,10 +311,7 @@ func (p *Protocol) handlePeeringRequest(s *server.Server, fromAddr string, fromI res := newPeeringResponse(rawData, p.mgr.requestPeering(from, salt)) - p.log.Debugw("send message", - "type", res.Name(), - "addr", fromAddr, - ) + p.logSend(fromAddr, res) s.Send(fromAddr, marshal(res)) } @@ -313,12 +332,13 @@ func (p *Protocol) validatePeeringResponse(s *server.Server, fromAddr string, fr return true } -func (p *Protocol) validatePeeringDrop(s *server.Server, fromAddr string, m *pb.PeeringDrop) bool { +func (p *Protocol) validatePeeringDrop(fromAddr string, m *pb.PeeringDrop) bool { // check that To matches the local address - if m.GetTo() != s.LocalAddr() { + if m.GetTo() != p.publicAddr() { p.log.Debugw("invalid message", "type", m.Name(), "to", m.GetTo(), + "want", p.publicAddr(), ) return false } diff --git a/packages/autopeering/selection/protocol_test.go b/packages/autopeering/selection/protocol_test.go index d43820f87d7e3150a2b4cc6a7888dec69ed3722e..ce6e59e1ba299fcdf1fec458ffbabd0406101b5d 100644 --- a/packages/autopeering/selection/protocol_test.go +++ b/packages/autopeering/selection/protocol_test.go @@ -24,7 +24,7 @@ var peerMap = make(map[peer.ID]*peer.Peer) type dummyDiscovery struct{} func (d dummyDiscovery) IsVerified(peer.ID, string) bool { return true } -func (d dummyDiscovery) EnsureVerified(*peer.Peer) {} +func (d dummyDiscovery) EnsureVerified(*peer.Peer) error { return nil } func (d dummyDiscovery) GetVerifiedPeer(id peer.ID, _ string) *peer.Peer { return peerMap[id] } func (d dummyDiscovery) GetVerifiedPeers() []*peer.Peer { return []*peer.Peer{} } @@ -77,13 +77,13 @@ func TestProtocol(t *testing.T) { // request peering to peer B t.Run("A->B", func(t *testing.T) { - if services, err := protA.RequestPeering(peerB, saltA); assert.NoError(t, err) { + if services, err := protA.PeeringRequest(peerB, saltA); assert.NoError(t, err) { assert.NotEmpty(t, services) } }) // request peering to peer A t.Run("B->A", func(t *testing.T) { - if services, err := protB.RequestPeering(peerA, saltB); assert.NoError(t, err) { + if services, err := protB.PeeringRequest(peerA, saltB); assert.NoError(t, err) { assert.NotEmpty(t, services) } }) @@ -102,7 +102,7 @@ func TestProtocol(t *testing.T) { peerB := getPeer(srvB) // request peering to peer B - _, err := protA.RequestPeering(peerB, saltA) + _, err := protA.PeeringRequest(peerB, saltA) assert.EqualError(t, err, server.ErrTimeout.Error()) }) @@ -120,14 +120,14 @@ func TestProtocol(t *testing.T) { peerB := getPeer(srvB) // request peering to peer B - services, err := protA.RequestPeering(peerB, saltA) + services, err := protA.PeeringRequest(peerB, saltA) require.NoError(t, err) assert.NotEmpty(t, services) require.Contains(t, protB.GetNeighbors(), peerA) // drop peer A - protA.SendPeeringDrop(peerB) + protA.PeeringDrop(peerB) time.Sleep(graceTime) require.NotContains(t, protB.GetNeighbors(), peerA) }) diff --git a/packages/autopeering/selection/selection.go b/packages/autopeering/selection/selection.go index f97acbdbb30703559aaa643f49d749c83c394d72..66183f246131b9d9cf9cdad18888166f44d346a3 100644 --- a/packages/autopeering/selection/selection.go +++ b/packages/autopeering/selection/selection.go @@ -24,9 +24,9 @@ func NewFilter() *Filter { func (f *Filter) Apply(list []peer.PeerDistance) (filteredList []peer.PeerDistance) { f.lock.RLock() defer f.lock.RUnlock() - for _, peer := range list { - if !f.internal[peer.Remote.ID()] { - filteredList = append(filteredList, peer) + for _, p := range list { + if !f.internal[p.Remote.ID()] { + filteredList = append(filteredList, p) } } return filteredList @@ -36,11 +36,11 @@ func (f *Filter) PushBack(list []peer.PeerDistance) (filteredList []peer.PeerDis var head, tail []peer.PeerDistance f.lock.RLock() defer f.lock.RUnlock() - for _, peer := range list { - if f.internal[peer.Remote.ID()] { - tail = append(tail, peer) + for _, p := range list { + if f.internal[p.Remote.ID()] { + tail = append(tail, p) } else { - head = append(head, peer) + head = append(head, p) } } return append(head, tail...) @@ -48,8 +48,8 @@ func (f *Filter) PushBack(list []peer.PeerDistance) (filteredList []peer.PeerDis func (f *Filter) AddPeers(n []*peer.Peer) { f.lock.Lock() - for _, peer := range n { - f.internal[peer.ID()] = true + for _, p := range n { + f.internal[p.ID()] = true } f.lock.Unlock() } diff --git a/packages/autopeering/server/common.go b/packages/autopeering/server/common.go index 4defddc2db2b2af520ac791ac37fd29421321246..593d916b1ff93a8ccf36c279208ff2d4a25cecbf 100644 --- a/packages/autopeering/server/common.go +++ b/packages/autopeering/server/common.go @@ -11,9 +11,6 @@ type MType uint // The Sender interface specifies common method required to send requests. type Sender interface { - LocalAddr() string - LocalNetwork() string - Send(toAddr string, data []byte) SendExpectingReply(toAddr string, toID peer.ID, data []byte, replyType MType, callback func(interface{}) bool) <-chan error } diff --git a/packages/autopeering/server/protocol.go b/packages/autopeering/server/protocol.go index bab76b712f2d9710cdb08e255a7498707f3743b7..e3ef6a622e3126f33bf62aff78527eda663e0587 100644 --- a/packages/autopeering/server/protocol.go +++ b/packages/autopeering/server/protocol.go @@ -15,16 +15,6 @@ type Protocol struct { Sender Sender // interface to send own requests } -// LocalAddr returns the local network address in string form. -func (p *Protocol) LocalAddr() string { - return p.Sender.LocalAddr() -} - -// LocalNetwork returns the name of the local network (for example, "tcp", "udp"). -func (p *Protocol) LocalNetwork() string { - return p.Sender.LocalNetwork() -} - // Send sends the data to the given peer. func (p *Protocol) Send(to *peer.Peer, data []byte) { p.Sender.Send(to.Address(), data) diff --git a/packages/autopeering/server/server.go b/packages/autopeering/server/server.go index 74d1bfbda4d911265602e11836bbeeaa624796bd..88ca6cc55daa53d44da03e69e7f05875c2b5ee1b 100644 --- a/packages/autopeering/server/server.go +++ b/packages/autopeering/server/server.go @@ -4,6 +4,7 @@ import ( "container/list" "fmt" "io" + "net" "sync" "time" @@ -87,6 +88,8 @@ func Listen(local *peer.Local, t transport.Transport, log *logger.Logger, h ...H go srv.replyLoop() go srv.readLoop() + log.Debugw("server started", "addr", srv.LocalAddr(), "#handlers", len(h)) + return srv } @@ -104,14 +107,9 @@ func (s *Server) Local() *peer.Local { return s.local } -// LocalNetwork returns the network of the local peer. -func (s *Server) LocalNetwork() string { - return s.network -} - // LocalAddr returns the address of the local peer in string form. -func (s *Server) LocalAddr() string { - return s.address +func (s *Server) LocalAddr() net.Addr { + return s.trans.LocalAddr() } // Send sends a message to the given address diff --git a/packages/autopeering/server/server_test.go b/packages/autopeering/server/server_test.go index f3cff2c83e7509697e4f7a8626432a2f2d648d21..58bc5a60e3134a2f03bb423bf4d9bfddd13682c6 100644 --- a/packages/autopeering/server/server_test.go +++ b/packages/autopeering/server/server_test.go @@ -205,5 +205,5 @@ func TestUnexpectedPong(t *testing.T) { // there should never be a Ping.Handle // there should never be a Pong.Handle - srvA.Send(srvB.LocalAddr(), new(Pong).Marshal()) + srvA.Send(srvB.LocalAddr().String(), new(Pong).Marshal()) } diff --git a/packages/gossip/manager.go b/packages/gossip/manager.go index 683cd24c966051d67b55cb91f8387fbbc457abd0..19933333c6755203f505b1145fe417574bd31f6e 100644 --- a/packages/gossip/manager.go +++ b/packages/gossip/manager.go @@ -7,7 +7,6 @@ import ( "github.com/golang/protobuf/proto" "github.com/iotaledger/goshimmer/packages/autopeering/peer" - "github.com/iotaledger/goshimmer/packages/autopeering/peer/service" pb "github.com/iotaledger/goshimmer/packages/gossip/proto" "github.com/iotaledger/goshimmer/packages/gossip/server" "github.com/iotaledger/hive.go/events" @@ -71,11 +70,6 @@ func (m *Manager) stop() { } } -// LocalAddr returns the public address of the gossip service. -func (m *Manager) LocalAddr() net.Addr { - return m.local.Services().Get(service.GossipKey) -} - // AddOutbound tries to add a neighbor by connecting to that peer. func (m *Manager) AddOutbound(p *peer.Peer) error { if p.ID() == m.local.ID() { diff --git a/plugins/autopeering/autopeering.go b/plugins/autopeering/autopeering.go index 33fb047aa5397424180c8d1641ce2a87926652f9..76c8122a6b0b467dc012e5e37c8d2c70cfd5b780 100644 --- a/plugins/autopeering/autopeering.go +++ b/plugins/autopeering/autopeering.go @@ -13,6 +13,7 @@ import ( "github.com/iotaledger/goshimmer/packages/autopeering/selection" "github.com/iotaledger/goshimmer/packages/autopeering/server" "github.com/iotaledger/goshimmer/packages/autopeering/transport" + "github.com/iotaledger/goshimmer/packages/netutil" "github.com/iotaledger/goshimmer/packages/parameter" "github.com/iotaledger/goshimmer/plugins/autopeering/local" "github.com/iotaledger/goshimmer/plugins/cli" @@ -56,8 +57,9 @@ func configureAP() { func start(shutdownSignal <-chan struct{}) { defer log.Info("Stopping " + name + " ... done") - addr := local.GetInstance().Services().Get(service.PeeringKey) - udpAddr, err := net.ResolveUDPAddr(addr.Network(), addr.String()) + loc := local.GetInstance() + peeringAddr := loc.Services().Get(service.PeeringKey) + udpAddr, err := net.ResolveUDPAddr(peeringAddr.Network(), peeringAddr.String()) if err != nil { log.Fatalf("ResolveUDPAddr: %v", err) } @@ -71,7 +73,12 @@ func start(shutdownSignal <-chan struct{}) { } } - conn, err := net.ListenUDP(addr.Network(), udpAddr) + // check that discovery is working and the port is open + log.Info("Testing service ...") + checkConnection(udpAddr, &loc.Peer) + log.Info("Testing service ... done") + + conn, err := net.ListenUDP(peeringAddr.Network(), udpAddr) if err != nil { log.Fatalf("ListenUDP: %v", err) } @@ -86,26 +93,21 @@ func start(shutdownSignal <-chan struct{}) { } // start a server doing discovery and peering - srv := server.Listen(local.GetInstance(), trans, log.Named("srv"), handlers...) + srv := server.Listen(loc, trans, log.Named("srv"), handlers...) defer srv.Close() // start the discovery on that connection Discovery.Start(srv) defer Discovery.Close() - //check that discovery is working and the port is open - log.Info("Testing service ...") - checkConnection(srv, &local.GetInstance().Peer) - log.Info("Testing service ... done") - if Selection != nil { // start the peering on that connection Selection.Start(srv) defer Selection.Close() } - log.Infof(name+" started: address=%s/udp", srv.LocalAddr()) - log.Debugf(name+" server started: PubKey=%s", base64.StdEncoding.EncodeToString(local.GetInstance().PublicKey())) + log.Infof(name+" started: address=%s/%s", peeringAddr.String(), peeringAddr.Network()) + log.Debugf(name+" server started: PubKey=%s", base64.StdEncoding.EncodeToString(loc.PublicKey())) <-shutdownSignal log.Info("Stopping " + name + " ...") @@ -135,14 +137,18 @@ func parseEntryNodes() (result []*peer.Peer, err error) { return result, nil } -func checkConnection(srv *server.Server, self *peer.Peer) { - if err := Discovery.Ping(self); err != nil { - if err == server.ErrTimeout { - log.Errorf("Error testing service: %s", err) - addr := self.Services().Get(service.PeeringKey) - log.Panicf("Please check that %s is publicly reachable at %s/%s", - cli.AppName, addr.String(), addr.Network()) - } - log.Panicf("Error: %s", err) +func checkConnection(localAddr *net.UDPAddr, self *peer.Peer) { + peering := self.Services().Get(service.PeeringKey) + remoteAddr, err := net.ResolveUDPAddr(peering.Network(), peering.String()) + if err != nil { + panic(err) + } + + // do not check the address as a NAT may change them for local connections + err = netutil.CheckUDP(localAddr, remoteAddr, false, true) + if err != nil { + log.Errorf("Error testing service: %s", err) + log.Panicf("Please check that %s is publicly reachable at %s/%s", + cli.AppName, peering.String(), peering.Network()) } } diff --git a/plugins/gossip/gossip.go b/plugins/gossip/gossip.go index b6ea302a756c98149dfa9b68c849244e3a6b99d0..5a6904c5accdeb4780e1efd607f501f3f81e24cf 100644 --- a/plugins/gossip/gossip.go +++ b/plugins/gossip/gossip.go @@ -44,7 +44,8 @@ func configureGossip() { func start(shutdownSignal <-chan struct{}) { defer log.Info("Stopping " + name + " ... done") - srv, err := server.ListenTCP(local.GetInstance(), log) + loc := local.GetInstance() + srv, err := server.ListenTCP(loc, log) if err != nil { log.Fatalf("ListenTCP: %v", err) } @@ -52,13 +53,14 @@ func start(shutdownSignal <-chan struct{}) { //check that the server is working and the port is open log.Info("Testing service ...") - checkConnection(srv, &local.GetInstance().Peer) + checkConnection(srv, &loc.Peer) log.Info("Testing service ... done") mgr.Start(srv) defer mgr.Close() - log.Infof(name+" started: address=%s/%s", mgr.LocalAddr().String(), mgr.LocalAddr().Network()) + gossipAddr := loc.Services().Get(service.GossipKey) + log.Infof(name+" started: address=%s/%s", gossipAddr.String(), gossipAddr.Network()) <-shutdownSignal log.Info("Stopping " + name + " ...")