diff --git a/packages/autopeering/discover/common.go b/packages/autopeering/discover/common.go index 826ad2cd1298bec69490998b9c609147307b5bf1..99b092ea1ec01fc399bdf3a9a0b7e0737e742da9 100644 --- a/packages/autopeering/discover/common.go +++ b/packages/autopeering/discover/common.go @@ -10,7 +10,6 @@ import ( // Default values for the global parameters const ( DefaultReverifyInterval = 10 * time.Second - DefaultReverifyTries = 2 DefaultQueryInterval = 60 * time.Second DefaultMaxManaged = 1000 DefaultMaxReplacements = 10 @@ -18,7 +17,6 @@ const ( var ( reverifyInterval = DefaultReverifyInterval // time interval after which the next peer is reverified - reverifyTries = DefaultReverifyTries // number of times a peer is pinged before it is removed queryInterval = DefaultQueryInterval // time interval after which peers are queried for new peers maxManaged = DefaultMaxManaged // maximum number of peers that can be managed maxReplacements = DefaultMaxReplacements // maximum number of peers kept in the replacement list @@ -36,7 +34,6 @@ type Config struct { // Parameters holds the parameters that can be configured. type Parameters struct { ReverifyInterval time.Duration // time interval after which the next peer is reverified - ReverifyTries int // number of times a peer is pinged before it is removed QueryInterval time.Duration // time interval after which peers are queried for new peers MaxManaged int // maximum number of peers that can be managed MaxReplacements int // maximum number of peers kept in the replacement list @@ -50,11 +47,6 @@ func SetParameter(param Parameters) { } else { reverifyInterval = DefaultReverifyInterval } - if param.ReverifyTries > 0 { - reverifyTries = param.ReverifyTries - } else { - reverifyTries = DefaultReverifyTries - } if param.QueryInterval > 0 { queryInterval = param.QueryInterval } else { diff --git a/packages/autopeering/discover/manager.go b/packages/autopeering/discover/manager.go index 5fbded485bc9c4e2f1f6f43fe83da88e14bb3e0e..974b976a6fcf6b2749cfbbbf63804e8c78d2a869 100644 --- a/packages/autopeering/discover/manager.go +++ b/packages/autopeering/discover/manager.go @@ -7,6 +7,7 @@ import ( "github.com/iotaledger/goshimmer/packages/autopeering/peer" "github.com/iotaledger/goshimmer/packages/autopeering/server" + "github.com/iotaledger/hive.go/backoff" "github.com/iotaledger/hive.go/logger" ) @@ -17,11 +18,17 @@ const ( MaxPeersInResponse = 6 // MaxServices is the maximum number of services a peer can support. MaxServices = 5 + // NetworkMaxRetries is the maximum number of times a failing network send is retried. + NetworkMaxRetries = 2 // VersionNum specifies the expected version number for this Protocol. VersionNum = 0 ) +// policy for retrying failed network calls +var networkRetryPolicy = backoff.ExponentialBackOff(500*time.Millisecond, 1.5).With( + backoff.Jitter(0.5), backoff.MaxRetries(NetworkMaxRetries)) + type network interface { local() *peer.Local @@ -137,20 +144,13 @@ func (m *manager) doReverify(done chan<- struct{}) { "addr", p.Address(), ) - var err error - for i := 0; i < reverifyTries; i++ { - err = m.net.Ping(unwrapPeer(p)) - if err == nil { - break - } else { - m.log.Debugw("ping failed", - "id", p.ID(), - "addr", p.Address(), - "err", err, - ) - time.Sleep(1 * time.Second) + err := backoff.Retry(networkRetryPolicy, func() error { + err := m.net.Ping(unwrapPeer(p)) + if err != nil && err != server.ErrTimeout { + return backoff.Permanent(err) } - } + return err + }) // could not verify the peer if err != nil { diff --git a/packages/autopeering/discover/manager_test.go b/packages/autopeering/discover/manager_test.go index 090371232430ac348eda54759d6110d9abfe251b..07a35889bcf61e46123a2156a8578dd5cb09715d 100644 --- a/packages/autopeering/discover/manager_test.go +++ b/packages/autopeering/discover/manager_test.go @@ -157,7 +157,7 @@ func TestMgrDeleteUnreachablePeer(t *testing.T) { p := newDummyPeer("p") // expect ping of peer p, but return error - m.On("Ping", p).Return(server.ErrTimeout).Times(reverifyTries) + m.On("Ping", p).Return(server.ErrTimeout).Times(NetworkMaxRetries + 1) // ignore discoveryRequest calls m.On("discoveryRequest", mock.Anything).Return([]*peer.Peer{}, nil).Maybe() diff --git a/packages/autopeering/discover/protocol_test.go b/packages/autopeering/discover/protocol_test.go index e262865122e1ef15883a869b1d2fa30bf8986590..7a98bec2423dc01de0f2b98f8b296dd374aedec1 100644 --- a/packages/autopeering/discover/protocol_test.go +++ b/packages/autopeering/discover/protocol_test.go @@ -22,7 +22,6 @@ func init() { // decrease parameters to simplify and speed up tests SetParameter(Parameters{ ReverifyInterval: 500 * time.Millisecond, - ReverifyTries: 1, QueryInterval: 1000 * time.Millisecond, MaxManaged: 10, MaxReplacements: 2, diff --git a/packages/autopeering/discover/query_strat.go b/packages/autopeering/discover/query_strat.go index 8095473551aa86487b443a2adfbb9831852585fc..80247c54e443ee5490cd9f5f2c19d936679df1e8 100644 --- a/packages/autopeering/discover/query_strat.go +++ b/packages/autopeering/discover/query_strat.go @@ -5,6 +5,10 @@ import ( "math/rand" "sync" "time" + + "github.com/iotaledger/goshimmer/packages/autopeering/peer" + "github.com/iotaledger/goshimmer/packages/autopeering/server" + "github.com/iotaledger/hive.go/backoff" ) // doQuery is the main method of the query strategy. @@ -34,8 +38,17 @@ func (m *manager) doQuery(next chan<- time.Duration) { func (m *manager) requestWorker(p *mpeer, wg *sync.WaitGroup) { defer wg.Done() - r, err := m.net.discoveryRequest(unwrapPeer(p)) - if err != nil || len(r) == 0 { + var peers []*peer.Peer + err := backoff.Retry(networkRetryPolicy, func() error { + var err error + peers, err = m.net.discoveryRequest(unwrapPeer(p)) + if err != nil && err != server.ErrTimeout { + return backoff.Permanent(err) + } + return err + }) + + if err != nil || len(peers) == 0 { p.lastNewPeers = 0 m.log.Debugw("query failed", @@ -47,7 +60,7 @@ func (m *manager) requestWorker(p *mpeer, wg *sync.WaitGroup) { } var added uint - for _, rp := range r { + for _, rp := range peers { if m.addDiscoveredPeer(rp) { added++ } diff --git a/packages/gossip/server/server.go b/packages/gossip/server/server.go index 8cbab0e37cb9194601b4eac4cc1309fc569388b0..f6fd2f8b8ce853e5ef6e0d5790002e4a57a277fa 100644 --- a/packages/gossip/server/server.go +++ b/packages/gossip/server/server.go @@ -15,6 +15,7 @@ import ( "github.com/iotaledger/goshimmer/packages/autopeering/peer" "github.com/iotaledger/goshimmer/packages/autopeering/peer/service" pb "github.com/iotaledger/goshimmer/packages/autopeering/server/proto" + "github.com/iotaledger/hive.go/backoff" "go.uber.org/zap" ) @@ -31,13 +32,17 @@ var ( // connection timeouts const ( - acceptTimeout = 1000 * time.Millisecond - handshakeTimeout = 500 * time.Millisecond - connectionTimeout = acceptTimeout + handshakeTimeout + dialTimeout = 1 * time.Second // timeout for net.Dial + handshakeTimeout = 500 * time.Millisecond // read/write timeout of the handshake packages + acceptTimeout = 3 * time.Second // timeout to accept incoming connections + connectionTimeout = acceptTimeout + 2*handshakeTimeout // timeout after which the connection must be established maxHandshakePacketSize = 256 ) +// retry net.Dial once, on fail after 0.5s +var dialRetryPolicy = backoff.ConstantBackOff(500 * time.Millisecond).With(backoff.MaxRetries(1)) + // TCP establishes verified incoming and outgoing TCP connections to other peers. type TCP struct { local *peer.Local @@ -139,14 +144,20 @@ func (t *TCP) DialPeer(p *peer.Peer) (net.Conn, error) { return nil, ErrNoGossip } - conn, err := net.DialTimeout(gossipAddr.Network(), gossipAddr.String(), acceptTimeout) - if err != nil { - return nil, fmt.Errorf("dial peer failed: %w", err) - } + var conn net.Conn + if err := backoff.Retry(dialRetryPolicy, func() error { + var err error + conn, err = net.DialTimeout(gossipAddr.Network(), gossipAddr.String(), dialTimeout) + if err != nil { + return fmt.Errorf("dial %s / %s failed: %w", gossipAddr.String(), p.ID(), err) + } - err = t.doHandshake(p.PublicKey(), gossipAddr.String(), conn) - if err != nil { - return nil, fmt.Errorf("outgoing handshake failed: %w", err) + if err = t.doHandshake(p.PublicKey(), gossipAddr.String(), conn); err != nil { + return fmt.Errorf("handshake %s / %s failed: %w", gossipAddr.String(), p.ID(), err) + } + return nil + }); err != nil { + return nil, err } t.log.Debugw("outgoing connection established", @@ -159,14 +170,15 @@ func (t *TCP) DialPeer(p *peer.Peer) (net.Conn, error) { // AcceptPeer awaits an incoming connection from the given peer. // If the peer does not establish the connection or the handshake fails, an error is returned. func (t *TCP) AcceptPeer(p *peer.Peer) (net.Conn, error) { - if p.Services().Get(service.GossipKey) == nil { + gossipAddr := p.Services().Get(service.GossipKey) + if gossipAddr == nil { return nil, ErrNoGossip } // wait for the connection connected := <-t.acceptPeer(p) if connected.err != nil { - return nil, fmt.Errorf("accept peer failed: %w", connected.err) + return nil, fmt.Errorf("accept %s / %s failed: %w", gossipAddr.String(), p.ID(), connected.err) } t.log.Debugw("incoming connection established", @@ -248,6 +260,7 @@ func (t *TCP) run() { for e := matcherList.Front(); e != nil; e = e.Next() { m := e.Value.(*acceptMatcher) if now.After(m.deadline) || now.Equal(m.deadline) { + t.log.Debugw("accept timeout", "id", m.peer.ID()) m.connected <- connect{nil, ErrTimeout} matcherList.Remove(e) }