Skip to content
Snippets Groups Projects
Unverified Commit 262d7d8c authored by Wolfgang Welz's avatar Wolfgang Welz Committed by GitHub
Browse files

Feat: Use backoff to retry sends (#153)

parent f4a9646a
No related branches found
No related tags found
No related merge requests found
......@@ -10,7 +10,6 @@ import (
// Default values for the global parameters
const (
DefaultReverifyInterval = 10 * time.Second
DefaultReverifyTries = 2
DefaultQueryInterval = 60 * time.Second
DefaultMaxManaged = 1000
DefaultMaxReplacements = 10
......@@ -18,7 +17,6 @@ const (
var (
reverifyInterval = DefaultReverifyInterval // time interval after which the next peer is reverified
reverifyTries = DefaultReverifyTries // number of times a peer is pinged before it is removed
queryInterval = DefaultQueryInterval // time interval after which peers are queried for new peers
maxManaged = DefaultMaxManaged // maximum number of peers that can be managed
maxReplacements = DefaultMaxReplacements // maximum number of peers kept in the replacement list
......@@ -36,7 +34,6 @@ type Config struct {
// Parameters holds the parameters that can be configured.
type Parameters struct {
ReverifyInterval time.Duration // time interval after which the next peer is reverified
ReverifyTries int // number of times a peer is pinged before it is removed
QueryInterval time.Duration // time interval after which peers are queried for new peers
MaxManaged int // maximum number of peers that can be managed
MaxReplacements int // maximum number of peers kept in the replacement list
......@@ -50,11 +47,6 @@ func SetParameter(param Parameters) {
} else {
reverifyInterval = DefaultReverifyInterval
}
if param.ReverifyTries > 0 {
reverifyTries = param.ReverifyTries
} else {
reverifyTries = DefaultReverifyTries
}
if param.QueryInterval > 0 {
queryInterval = param.QueryInterval
} else {
......
......@@ -7,6 +7,7 @@ import (
"github.com/iotaledger/goshimmer/packages/autopeering/peer"
"github.com/iotaledger/goshimmer/packages/autopeering/server"
"github.com/iotaledger/hive.go/backoff"
"github.com/iotaledger/hive.go/logger"
)
......@@ -17,11 +18,17 @@ const (
MaxPeersInResponse = 6
// MaxServices is the maximum number of services a peer can support.
MaxServices = 5
// NetworkMaxRetries is the maximum number of times a failing network send is retried.
NetworkMaxRetries = 2
// VersionNum specifies the expected version number for this Protocol.
VersionNum = 0
)
// policy for retrying failed network calls
var networkRetryPolicy = backoff.ExponentialBackOff(500*time.Millisecond, 1.5).With(
backoff.Jitter(0.5), backoff.MaxRetries(NetworkMaxRetries))
type network interface {
local() *peer.Local
......@@ -137,20 +144,13 @@ func (m *manager) doReverify(done chan<- struct{}) {
"addr", p.Address(),
)
var err error
for i := 0; i < reverifyTries; i++ {
err = m.net.Ping(unwrapPeer(p))
if err == nil {
break
} else {
m.log.Debugw("ping failed",
"id", p.ID(),
"addr", p.Address(),
"err", err,
)
time.Sleep(1 * time.Second)
err := backoff.Retry(networkRetryPolicy, func() error {
err := m.net.Ping(unwrapPeer(p))
if err != nil && err != server.ErrTimeout {
return backoff.Permanent(err)
}
}
return err
})
// could not verify the peer
if err != nil {
......
......@@ -157,7 +157,7 @@ func TestMgrDeleteUnreachablePeer(t *testing.T) {
p := newDummyPeer("p")
// expect ping of peer p, but return error
m.On("Ping", p).Return(server.ErrTimeout).Times(reverifyTries)
m.On("Ping", p).Return(server.ErrTimeout).Times(NetworkMaxRetries + 1)
// ignore discoveryRequest calls
m.On("discoveryRequest", mock.Anything).Return([]*peer.Peer{}, nil).Maybe()
......
......@@ -22,7 +22,6 @@ func init() {
// decrease parameters to simplify and speed up tests
SetParameter(Parameters{
ReverifyInterval: 500 * time.Millisecond,
ReverifyTries: 1,
QueryInterval: 1000 * time.Millisecond,
MaxManaged: 10,
MaxReplacements: 2,
......
......@@ -5,6 +5,10 @@ import (
"math/rand"
"sync"
"time"
"github.com/iotaledger/goshimmer/packages/autopeering/peer"
"github.com/iotaledger/goshimmer/packages/autopeering/server"
"github.com/iotaledger/hive.go/backoff"
)
// doQuery is the main method of the query strategy.
......@@ -34,8 +38,17 @@ func (m *manager) doQuery(next chan<- time.Duration) {
func (m *manager) requestWorker(p *mpeer, wg *sync.WaitGroup) {
defer wg.Done()
r, err := m.net.discoveryRequest(unwrapPeer(p))
if err != nil || len(r) == 0 {
var peers []*peer.Peer
err := backoff.Retry(networkRetryPolicy, func() error {
var err error
peers, err = m.net.discoveryRequest(unwrapPeer(p))
if err != nil && err != server.ErrTimeout {
return backoff.Permanent(err)
}
return err
})
if err != nil || len(peers) == 0 {
p.lastNewPeers = 0
m.log.Debugw("query failed",
......@@ -47,7 +60,7 @@ func (m *manager) requestWorker(p *mpeer, wg *sync.WaitGroup) {
}
var added uint
for _, rp := range r {
for _, rp := range peers {
if m.addDiscoveredPeer(rp) {
added++
}
......
......@@ -15,6 +15,7 @@ import (
"github.com/iotaledger/goshimmer/packages/autopeering/peer"
"github.com/iotaledger/goshimmer/packages/autopeering/peer/service"
pb "github.com/iotaledger/goshimmer/packages/autopeering/server/proto"
"github.com/iotaledger/hive.go/backoff"
"go.uber.org/zap"
)
......@@ -31,13 +32,17 @@ var (
// connection timeouts
const (
acceptTimeout = 1000 * time.Millisecond
handshakeTimeout = 500 * time.Millisecond
connectionTimeout = acceptTimeout + handshakeTimeout
dialTimeout = 1 * time.Second // timeout for net.Dial
handshakeTimeout = 500 * time.Millisecond // read/write timeout of the handshake packages
acceptTimeout = 3 * time.Second // timeout to accept incoming connections
connectionTimeout = acceptTimeout + 2*handshakeTimeout // timeout after which the connection must be established
maxHandshakePacketSize = 256
)
// retry net.Dial once, on fail after 0.5s
var dialRetryPolicy = backoff.ConstantBackOff(500 * time.Millisecond).With(backoff.MaxRetries(1))
// TCP establishes verified incoming and outgoing TCP connections to other peers.
type TCP struct {
local *peer.Local
......@@ -139,14 +144,20 @@ func (t *TCP) DialPeer(p *peer.Peer) (net.Conn, error) {
return nil, ErrNoGossip
}
conn, err := net.DialTimeout(gossipAddr.Network(), gossipAddr.String(), acceptTimeout)
if err != nil {
return nil, fmt.Errorf("dial peer failed: %w", err)
}
var conn net.Conn
if err := backoff.Retry(dialRetryPolicy, func() error {
var err error
conn, err = net.DialTimeout(gossipAddr.Network(), gossipAddr.String(), dialTimeout)
if err != nil {
return fmt.Errorf("dial %s / %s failed: %w", gossipAddr.String(), p.ID(), err)
}
err = t.doHandshake(p.PublicKey(), gossipAddr.String(), conn)
if err != nil {
return nil, fmt.Errorf("outgoing handshake failed: %w", err)
if err = t.doHandshake(p.PublicKey(), gossipAddr.String(), conn); err != nil {
return fmt.Errorf("handshake %s / %s failed: %w", gossipAddr.String(), p.ID(), err)
}
return nil
}); err != nil {
return nil, err
}
t.log.Debugw("outgoing connection established",
......@@ -159,14 +170,15 @@ func (t *TCP) DialPeer(p *peer.Peer) (net.Conn, error) {
// AcceptPeer awaits an incoming connection from the given peer.
// If the peer does not establish the connection or the handshake fails, an error is returned.
func (t *TCP) AcceptPeer(p *peer.Peer) (net.Conn, error) {
if p.Services().Get(service.GossipKey) == nil {
gossipAddr := p.Services().Get(service.GossipKey)
if gossipAddr == nil {
return nil, ErrNoGossip
}
// wait for the connection
connected := <-t.acceptPeer(p)
if connected.err != nil {
return nil, fmt.Errorf("accept peer failed: %w", connected.err)
return nil, fmt.Errorf("accept %s / %s failed: %w", gossipAddr.String(), p.ID(), connected.err)
}
t.log.Debugw("incoming connection established",
......@@ -248,6 +260,7 @@ func (t *TCP) run() {
for e := matcherList.Front(); e != nil; e = e.Next() {
m := e.Value.(*acceptMatcher)
if now.After(m.deadline) || now.Equal(m.deadline) {
t.log.Debugw("accept timeout", "id", m.peer.ID())
m.connected <- connect{nil, ErrTimeout}
matcherList.Remove(e)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment