diff --git a/go.mod b/go.mod index 0564d83750563587252c4c190bc10c030a072a58..7a2417445e102b8befa95b29d42d328c92c0497b 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,7 @@ module github.com/iotaledger/goshimmer go 1.13 require ( - github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 // indirect - github.com/dgraph-io/badger v1.6.0 + github.com/dgraph-io/badger/v2 v2.0.1 github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/dgryski/go-farm v0.0.0-20191112170834-c2139c5d712b // indirect github.com/gdamore/tcell v1.3.0 @@ -13,7 +12,7 @@ require ( github.com/googollee/go-engine.io v1.4.3-0.20190924125625-798118fc0dd2 github.com/googollee/go-socket.io v1.4.3-0.20191204093753-683f8725b6d0 github.com/gorilla/websocket v1.4.1 - github.com/iotaledger/hive.go v0.0.0-20200120092048-f168257b6ccc + github.com/iotaledger/hive.go v0.0.0-20200121213505-28904d5f037c github.com/iotaledger/iota.go v1.0.0-beta.14 github.com/labstack/echo v3.3.10+incompatible github.com/labstack/gommon v0.3.0 // indirect diff --git a/go.sum b/go.sum index 6cba4d494385fe85e220a35a9c3824cd03edc7bd..47bfbc4884977d5de44ead02f5e46e48cb57ceaa 100644 --- a/go.sum +++ b/go.sum @@ -7,12 +7,12 @@ dmitri.shuralyov.com/service/change v0.0.0-20181023043359-a85b471d5412/go.mod h1 dmitri.shuralyov.com/state v0.0.0-20180228185332-28bcc343414c/go.mod h1:0PRwlb0D6DFvNNtx+9ybjezNCa8XF0xaYcETyp6rHWU= git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= -github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 h1:cTp8I5+VIoKjsnZuH8vjyaysT/ses3EvZeaV/1UkF2M= -github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= +github.com/DataDog/zstd v1.4.1 h1:3oxKN3wbHibqx897utPC2LTQU4J+IHWWJO+glkAkpFM= github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= +github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -36,10 +36,11 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgraph-io/badger v1.5.4 h1:gVTrpUTbbr/T24uvoCaqY2KSHfNLVGm0w+hbee2HMeg= github.com/dgraph-io/badger v1.5.4/go.mod h1:VZxzAIRPHRVNRKRo6AXrX9BJegn6il06VMTZVJYCIjQ= -github.com/dgraph-io/badger v1.6.0 h1:DshxFxZWXUcO0xX476VJC07Xsr6ZCBVRHKZ93Oh7Evo= -github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4= -github.com/dgraph-io/badger/v2 v2.0.0/go.mod h1:YoRSIp1LmAJ7zH7tZwRvjNMUYLxB4wl3ebYkaIruZ04= +github.com/dgraph-io/badger/v2 v2.0.1 h1:+D6dhIqC6jIeCclnxMHqk4HPuXgrRN5UfBsLR4dNQ3A= +github.com/dgraph-io/badger/v2 v2.0.1/go.mod h1:YoRSIp1LmAJ7zH7tZwRvjNMUYLxB4wl3ebYkaIruZ04= +github.com/dgraph-io/ristretto v0.0.0-20191025175511-c1f00be0418e h1:aeUNgwup7PnDOBAD1BOKAqzb/W/NksOj6r3dwKKuqfg= github.com/dgraph-io/ristretto v0.0.0-20191025175511-c1f00be0418e/go.mod h1:edzKIzGvqUCMzhTVWbiTSe75zD9Xxq0GtSBtFmaUTZs= github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= @@ -79,6 +80,7 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -116,8 +118,8 @@ github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= -github.com/iotaledger/hive.go v0.0.0-20200120092048-f168257b6ccc h1:7SZQ3+4EvMd/iSOONrvD7Sa7qCm712KqTTKh8CK/9TU= -github.com/iotaledger/hive.go v0.0.0-20200120092048-f168257b6ccc/go.mod h1:obs07gqna53/Yw1ltzLsQzJBMyA6lGu7Fb/ltjqWMnQ= +github.com/iotaledger/hive.go v0.0.0-20200121213505-28904d5f037c h1:3T0MLyZIL74kYLqVrmv1xQlwE2ktS1IO8kjM+NyXEMU= +github.com/iotaledger/hive.go v0.0.0-20200121213505-28904d5f037c/go.mod h1:S+v90R3u4Rqe4VoOg4DhiZrAKlKZhz2UFKuK/Neqa2o= github.com/iotaledger/iota.go v1.0.0-beta.9/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8= github.com/iotaledger/iota.go v1.0.0-beta.14 h1:Oeb28MfBuJEeXcGrLhTCJFtbsnc8y1u7xidsAmiOD5A= github.com/iotaledger/iota.go v1.0.0-beta.14/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8= @@ -239,6 +241,7 @@ github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4k github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:UdhH50NIW0fCiwBSr0co2m7BnFLdv4fQTgdqdJTHFeE= github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= github.com/spf13/afero v1.2.2 h1:5jhuqJyZCZf2JRofRvN/nIFgIWNzPa3/Vz8mYylgbWc= diff --git a/packages/autopeering/discover/manager.go b/packages/autopeering/discover/manager.go index 974b976a6fcf6b2749cfbbbf63804e8c78d2a869..e451a3cf6cee248811613f26b517b222e139c690 100644 --- a/packages/autopeering/discover/manager.go +++ b/packages/autopeering/discover/manager.go @@ -7,7 +7,6 @@ import ( "github.com/iotaledger/goshimmer/packages/autopeering/peer" "github.com/iotaledger/goshimmer/packages/autopeering/server" - "github.com/iotaledger/hive.go/backoff" "github.com/iotaledger/hive.go/logger" ) @@ -18,22 +17,16 @@ const ( MaxPeersInResponse = 6 // MaxServices is the maximum number of services a peer can support. MaxServices = 5 - // NetworkMaxRetries is the maximum number of times a failing network send is retried. - NetworkMaxRetries = 2 // VersionNum specifies the expected version number for this Protocol. VersionNum = 0 ) -// policy for retrying failed network calls -var networkRetryPolicy = backoff.ExponentialBackOff(500*time.Millisecond, 1.5).With( - backoff.Jitter(0.5), backoff.MaxRetries(NetworkMaxRetries)) - type network interface { local() *peer.Local Ping(*peer.Peer) error - discoveryRequest(*peer.Peer) ([]*peer.Peer, error) + DiscoveryRequest(*peer.Peer) ([]*peer.Peer, error) } type manager struct { @@ -144,16 +137,8 @@ func (m *manager) doReverify(done chan<- struct{}) { "addr", p.Address(), ) - err := backoff.Retry(networkRetryPolicy, func() error { - err := m.net.Ping(unwrapPeer(p)) - if err != nil && err != server.ErrTimeout { - return backoff.Permanent(err) - } - return err - }) - // could not verify the peer - if err != nil { + if m.net.Ping(unwrapPeer(p)) != nil { m.mutex.Lock() defer m.mutex.Unlock() diff --git a/packages/autopeering/discover/manager_test.go b/packages/autopeering/discover/manager_test.go index 07a35889bcf61e46123a2156a8578dd5cb09715d..afe6a3899c5e9aeb3de0b5c19e922f4b6f232ffd 100644 --- a/packages/autopeering/discover/manager_test.go +++ b/packages/autopeering/discover/manager_test.go @@ -27,7 +27,7 @@ func (m *NetworkMock) Ping(p *peer.Peer) error { return args.Error(0) } -func (m *NetworkMock) discoveryRequest(p *peer.Peer) ([]*peer.Peer, error) { +func (m *NetworkMock) DiscoveryRequest(p *peer.Peer) ([]*peer.Peer, error) { args := m.Called(p) return args.Get(0).([]*peer.Peer), args.Error(1) } @@ -69,10 +69,10 @@ func TestMgrVerifyDiscoveredPeer(t *testing.T) { p := newDummyPeer("p") - // expect ping of peer p + // expect Ping of peer p m.On("Ping", p).Return(nil).Once() - // ignore discoveryRequest calls - m.On("discoveryRequest", mock.Anything).Return([]*peer.Peer{}, nil).Maybe() + // ignore DiscoveryRequest calls + m.On("DiscoveryRequest", mock.Anything).Return([]*peer.Peer{}, nil).Maybe() // let the manager initialize time.Sleep(graceTime) @@ -89,10 +89,10 @@ func TestMgrReverifyPeer(t *testing.T) { p := newDummyPeer("p") - // expect ping of peer p + // expect Ping of peer p m.On("Ping", p).Return(nil).Once() - // ignore discoveryRequest calls - m.On("discoveryRequest", mock.Anything).Return([]*peer.Peer{}, nil).Maybe() + // ignore DiscoveryRequest calls + m.On("DiscoveryRequest", mock.Anything).Return([]*peer.Peer{}, nil).Maybe() // let the manager initialize time.Sleep(graceTime) @@ -110,9 +110,9 @@ func TestMgrRequestDiscoveredPeer(t *testing.T) { p1 := newDummyPeer("verified") p2 := newDummyPeer("discovered") - // expect discoveryRequest on the discovered peer - m.On("discoveryRequest", p1).Return([]*peer.Peer{p2}, nil).Once() - // ignore any ping + // expect DiscoveryRequest on the discovered peer + m.On("DiscoveryRequest", p1).Return([]*peer.Peer{p2}, nil).Once() + // ignore any Ping m.On("Ping", mock.Anything).Return(nil).Maybe() mgr.addVerifiedPeer(p1) @@ -128,10 +128,10 @@ func TestMgrAddManyVerifiedPeers(t *testing.T) { p := newDummyPeer("p") - // expect ping of peer p + // expect Ping of peer p m.On("Ping", p).Return(nil).Once() - // ignore discoveryRequest calls - m.On("discoveryRequest", mock.Anything).Return([]*peer.Peer{}, nil).Maybe() + // ignore DiscoveryRequest calls + m.On("DiscoveryRequest", mock.Anything).Return([]*peer.Peer{}, nil).Maybe() // let the manager initialize time.Sleep(graceTime) @@ -156,10 +156,10 @@ func TestMgrDeleteUnreachablePeer(t *testing.T) { p := newDummyPeer("p") - // expect ping of peer p, but return error - m.On("Ping", p).Return(server.ErrTimeout).Times(NetworkMaxRetries + 1) - // ignore discoveryRequest calls - m.On("discoveryRequest", mock.Anything).Return([]*peer.Peer{}, nil).Maybe() + // expect Ping of peer p, but return error + m.On("Ping", p).Return(server.ErrTimeout).Times(1) + // ignore DiscoveryRequest calls + m.On("DiscoveryRequest", mock.Anything).Return([]*peer.Peer{}, nil).Maybe() // let the manager initialize time.Sleep(graceTime) diff --git a/packages/autopeering/discover/protocol.go b/packages/autopeering/discover/protocol.go index 3cbe49361e744677789f59af69919cf8944ee543..8fbda7d71422dbb5fc6472d9128846a9a6dd14ff 100644 --- a/packages/autopeering/discover/protocol.go +++ b/packages/autopeering/discover/protocol.go @@ -2,6 +2,7 @@ package discover import ( "bytes" + "errors" "fmt" "sync" "time" @@ -12,9 +13,19 @@ import ( peerpb "github.com/iotaledger/goshimmer/packages/autopeering/peer/proto" "github.com/iotaledger/goshimmer/packages/autopeering/peer/service" "github.com/iotaledger/goshimmer/packages/autopeering/server" + "github.com/iotaledger/hive.go/backoff" "github.com/iotaledger/hive.go/logger" ) +const ( + maxRetries = 2 + logSends = true +) + +// policy for retrying failed network calls +var retryPolicy = backoff.ExponentialBackOff(500*time.Millisecond, 1.5).With( + backoff.Jitter(0.5), backoff.MaxRetries(maxRetries)) + // The Protocol handles the peer discovery. // It responds to incoming messages and sends own requests when needed. type Protocol struct { @@ -39,19 +50,12 @@ func New(local *peer.Local, cfg Config) *Protocol { return p } -// local returns the associated local peer of the neighbor selection. -func (p *Protocol) local() *peer.Local { - return p.loc -} - // Start starts the actual peer discovery over the provided Sender. func (p *Protocol) Start(s server.Sender) { p.Protocol.Sender = s p.mgr.start() - p.log.Debugw("discover started", - "addr", s.LocalAddr(), - ) + p.log.Debug("discover started") } // Close finalizes the protocol. @@ -66,14 +70,17 @@ func (p *Protocol) IsVerified(id peer.ID, addr string) bool { return time.Since(p.loc.Database().LastPong(id, addr)) < PingExpiration } -// EnsureVerified checks if the given peer has recently sent a ping; -// if not, we send a ping to trigger a verification. -func (p *Protocol) EnsureVerified(peer *peer.Peer) { +// EnsureVerified checks if the given peer has recently sent a Ping; +// if not, we send a Ping to trigger a verification. +func (p *Protocol) EnsureVerified(peer *peer.Peer) error { if !p.hasVerified(peer.ID(), peer.Address()) { - <-p.sendPing(peer.Address(), peer.ID()) - // Wait for them to ping back and process our pong + if err := p.Ping(peer); err != nil { + return err + } + // Wait for them to Ping back and process our pong time.Sleep(server.ResponseTimeout) } + return nil } // GetVerifiedPeer returns the verified peer with the given ID, or nil if no such peer exists. @@ -106,7 +113,7 @@ func (p *Protocol) HandleMessage(s *server.Server, fromAddr string, fromID peer. if err := proto.Unmarshal(data[1:], m); err != nil { return true, fmt.Errorf("invalid message: %w", err) } - if p.validatePing(s, fromAddr, m) { + if p.validatePing(fromAddr, m) { p.handlePing(s, fromAddr, fromID, fromKey, data) } @@ -126,7 +133,7 @@ func (p *Protocol) HandleMessage(s *server.Server, fromAddr string, fromID peer. if err := proto.Unmarshal(data[1:], m); err != nil { return true, fmt.Errorf("invalid message: %w", err) } - if p.validateDiscoveryRequest(s, fromAddr, fromID, m) { + if p.validateDiscoveryRequest(fromAddr, fromID, m) { p.handleDiscoveryRequest(s, fromAddr, data) } @@ -146,17 +153,33 @@ func (p *Protocol) HandleMessage(s *server.Server, fromAddr string, fromID peer. return true, nil } +// local returns the associated local peer of the neighbor selection. +func (p *Protocol) local() *peer.Local { + return p.loc +} + +// publicAddr returns the public address of the peering service in string representation. +func (p *Protocol) publicAddr() string { + return p.loc.Services().Get(service.PeeringKey).String() +} + // ------ message senders ------ -// ping sends a ping to the specified peer and blocks until a reply is received or timeout. +// Ping sends a Ping to the specified peer and blocks until a reply is received or timeout. func (p *Protocol) Ping(to *peer.Peer) error { - return <-p.sendPing(to.Address(), to.ID()) + return backoff.Retry(retryPolicy, func() error { + err := <-p.sendPing(to.Address(), to.ID()) + if err != nil && !errors.Is(err, server.ErrTimeout) { + return backoff.Permanent(err) + } + return err + }) } -// sendPing sends a ping to the specified address and expects a matching reply. +// sendPing sends a Ping to the specified address and expects a matching reply. // This method is non-blocking, but it returns a channel that can be used to query potential errors. func (p *Protocol) sendPing(toAddr string, toID peer.ID) <-chan error { - ping := newPing(p.LocalAddr(), toAddr) + ping := newPing(p.publicAddr(), toAddr) data := marshal(ping) // compute the message hash @@ -165,48 +188,48 @@ func (p *Protocol) sendPing(toAddr string, toID peer.ID) <-chan error { return bytes.Equal(m.(*pb.Pong).GetPingHash(), hash) } - // expect a pong referencing the ping we are about to send - p.log.Debugw("send message", "type", ping.Name(), "addr", toAddr) - errc := p.Protocol.SendExpectingReply(toAddr, toID, data, pb.MPong, hashEqual) - - return errc + p.logSend(toAddr, ping) + return p.Protocol.SendExpectingReply(toAddr, toID, data, pb.MPong, hashEqual) } -// discoveryRequest request known peers from the given target. This method blocks +// DiscoveryRequest request known peers from the given target. This method blocks // until a response is received and the provided peers are returned. -func (p *Protocol) discoveryRequest(to *peer.Peer) ([]*peer.Peer, error) { - p.EnsureVerified(to) +func (p *Protocol) DiscoveryRequest(to *peer.Peer) ([]*peer.Peer, error) { + if err := p.EnsureVerified(to); err != nil { + return nil, err + } - // create the request package - toAddr := to.Address() - req := newDiscoveryRequest(toAddr) + req := newDiscoveryRequest(to.Address()) data := marshal(req) // compute the message hash hash := server.PacketHash(data) - peers := make([]*peer.Peer, 0, MaxPeersInResponse) - p.log.Debugw("send message", "type", req.Name(), "addr", toAddr) - errc := p.Protocol.SendExpectingReply(toAddr, to.ID(), data, pb.MDiscoveryResponse, func(m interface{}) bool { + peers := make([]*peer.Peer, 0, MaxPeersInResponse) + callback := func(m interface{}) bool { res := m.(*pb.DiscoveryResponse) if !bytes.Equal(res.GetReqHash(), hash) { return false } - for _, rp := range res.GetPeers() { - peer, err := peer.FromProto(rp) - if err != nil { - p.log.Warnw("invalid peer received", "err", err) - continue + peers = peers[:0] + for _, protoPeer := range res.GetPeers() { + if p, _ := peer.FromProto(protoPeer); p != nil { + peers = append(peers, p) } - peers = append(peers, peer) } - return true - }) + } - // wait for the response and then return peers - return peers, <-errc + err := backoff.Retry(retryPolicy, func() error { + p.logSend(to.Address(), req) + err := <-p.Protocol.SendExpectingReply(to.Address(), to.ID(), data, pb.MDiscoveryResponse, callback) + if err != nil && !errors.Is(err, server.ErrTimeout) { + return backoff.Permanent(err) + } + return err + }) + return peers, err } // ------ helper functions ------ @@ -216,6 +239,12 @@ func (p *Protocol) hasVerified(id peer.ID, addr string) bool { return time.Since(p.loc.Database().LastPing(id, addr)) < PingExpiration } +func (p *Protocol) logSend(toAddr string, msg pb.Message) { + if logSends { + p.log.Debugw("send message", "type", msg.Name(), "addr", toAddr) + } +} + func marshal(msg pb.Message) []byte { mType := msg.Type() if mType > 0xFF { @@ -229,15 +258,15 @@ func marshal(msg pb.Message) []byte { return append([]byte{byte(mType)}, data...) } -// createDiscoverPeer creates a new peer that only has a peering service at the given address. -func createDiscoverPeer(key peer.PublicKey, network string, address string) *peer.Peer { +// newPeer creates a new peer that only has a peering service at the given address. +func newPeer(key peer.PublicKey, network string, address string) *peer.Peer { services := service.New() services.Update(service.PeeringKey, network, address) return peer.NewPeer(key, services) } -// ------ Packet Constructors ------ +// ------ Message Constructors ------ func newPing(fromAddr string, toAddr string) *pb.Ping { return &pb.Ping{ @@ -274,9 +303,9 @@ func newDiscoveryResponse(reqData []byte, list []*peer.Peer) *pb.DiscoveryRespon } } -// ------ Packet Handlers ------ +// ------ Message Handlers ------ -func (p *Protocol) validatePing(s *server.Server, fromAddr string, m *pb.Ping) bool { +func (p *Protocol) validatePing(fromAddr string, m *pb.Ping) bool { // check version number if m.GetVersion() != VersionNum { p.log.Debugw("invalid message", @@ -296,11 +325,11 @@ func (p *Protocol) validatePing(s *server.Server, fromAddr string, m *pb.Ping) b return false } // check that To matches the local address - if m.GetTo() != s.LocalAddr() { + if m.GetTo() != p.publicAddr() { p.log.Debugw("invalid message", "type", m.Name(), "to", m.GetTo(), - "want", s.LocalAddr(), + "want", p.publicAddr(), ) return false } @@ -322,36 +351,32 @@ func (p *Protocol) validatePing(s *server.Server, fromAddr string, m *pb.Ping) b func (p *Protocol) handlePing(s *server.Server, fromAddr string, fromID peer.ID, fromKey peer.PublicKey, rawData []byte) { // create and send the pong response - pong := newPong(fromAddr, rawData, s.Local().Services().CreateRecord()) + pong := newPong(fromAddr, rawData, p.loc.Services().CreateRecord()) - p.log.Debugw("send message", - "type", pong.Name(), - "addr", fromAddr, - ) + p.logSend(fromAddr, pong) s.Send(fromAddr, marshal(pong)) - // if the peer is new or expired, send a ping to verify + // if the peer is new or expired, send a Ping to verify if !p.IsVerified(fromID, fromAddr) { p.sendPing(fromAddr, fromID) } else if !p.mgr.isKnown(fromID) { // add a discovered peer to the manager if it is new - peer := createDiscoverPeer(fromKey, p.LocalNetwork(), fromAddr) - p.mgr.addDiscoveredPeer(peer) + p.mgr.addDiscoveredPeer(newPeer(fromKey, s.LocalAddr().Network(), fromAddr)) } - _ = p.local().Database().UpdateLastPing(fromID, fromAddr, time.Now()) + _ = p.loc.Database().UpdateLastPing(fromID, fromAddr, time.Now()) } func (p *Protocol) validatePong(s *server.Server, fromAddr string, fromID peer.ID, m *pb.Pong) bool { // check that To matches the local address - if m.GetTo() != s.LocalAddr() { + if m.GetTo() != p.publicAddr() { p.log.Debugw("invalid message", "type", m.Name(), "to", m.GetTo(), - "want", s.LocalAddr(), + "want", p.publicAddr(), ) return false } - // there must be a ping waiting for this pong as a reply + // there must be a Ping waiting for this pong as a reply if !s.IsExpectedReply(fromAddr, fromID, m.Type(), m) { p.log.Debugw("invalid message", "type", m.Name(), @@ -391,18 +416,18 @@ func (p *Protocol) handlePong(fromAddr string, fromID peer.ID, fromKey peer.Publ p.mgr.addVerifiedPeer(from) // update peer database - db := p.local().Database() + db := p.loc.Database() _ = db.UpdateLastPong(fromID, fromAddr, time.Now()) _ = db.UpdatePeer(from) } -func (p *Protocol) validateDiscoveryRequest(s *server.Server, fromAddr string, fromID peer.ID, m *pb.DiscoveryRequest) bool { +func (p *Protocol) validateDiscoveryRequest(fromAddr string, fromID peer.ID, m *pb.DiscoveryRequest) bool { // check that To matches the local address - if m.GetTo() != s.LocalAddr() { + if m.GetTo() != p.publicAddr() { p.log.Debugw("invalid message", "type", m.Name(), "to", m.GetTo(), - "want", s.LocalAddr(), + "want", p.publicAddr(), ) return false } @@ -435,10 +460,7 @@ func (p *Protocol) handleDiscoveryRequest(s *server.Server, fromAddr string, raw peers := p.mgr.getRandomPeers(MaxPeersInResponse, 1) res := newDiscoveryResponse(rawData, peers) - p.log.Debugw("send message", - "type", res.Name(), - "addr", fromAddr, - ) + p.logSend(fromAddr, res) s.Send(fromAddr, marshal(res)) } diff --git a/packages/autopeering/discover/protocol_test.go b/packages/autopeering/discover/protocol_test.go index 7a98bec2423dc01de0f2b98f8b296dd374aedec1..fd294f7bcacedfcbba2a13b247ddbcf088feb08e 100644 --- a/packages/autopeering/discover/protocol_test.go +++ b/packages/autopeering/discover/protocol_test.go @@ -87,11 +87,11 @@ func TestProtPingPong(t *testing.T) { peerA := getPeer(srvA) peerB := getPeer(srvB) - // send a ping from node A to B + // send a Ping from node A to B t.Run("A->B", func(t *testing.T) { assert.NoError(t, protA.Ping(peerB)) }) time.Sleep(graceTime) - // send a ping from node B to A + // send a Ping from node B to A t.Run("B->A", func(t *testing.T) { assert.NoError(t, protB.Ping(peerA)) }) time.Sleep(graceTime) } @@ -107,7 +107,7 @@ func TestProtPingTimeout(t *testing.T) { peerB := getPeer(srvB) - // send a ping from node A to B + // send a Ping from node A to B err := protA.Ping(peerB) assert.EqualError(t, err, server.ErrTimeout.Error()) } @@ -123,7 +123,7 @@ func TestProtVerifiedPeers(t *testing.T) { peerB := getPeer(srvB) - // send a ping from node A to B + // send a Ping from node A to B assert.NoError(t, protA.Ping(peerB)) time.Sleep(graceTime) @@ -146,7 +146,7 @@ func TestProtVerifiedPeer(t *testing.T) { peerA := getPeer(srvA) peerB := getPeer(srvB) - // send a ping from node A to B + // send a Ping from node A to B assert.NoError(t, protA.Ping(peerB)) time.Sleep(graceTime) @@ -172,13 +172,13 @@ func TestProtDiscoveryRequest(t *testing.T) { // request peers from node A t.Run("A->B", func(t *testing.T) { - if ps, err := protA.discoveryRequest(peerB); assert.NoError(t, err) { + if ps, err := protA.DiscoveryRequest(peerB); assert.NoError(t, err) { assert.ElementsMatch(t, []*peer.Peer{peerA}, ps) } }) // request peers from node B t.Run("B->A", func(t *testing.T) { - if ps, err := protB.discoveryRequest(peerA); assert.NoError(t, err) { + if ps, err := protB.DiscoveryRequest(peerA); assert.NoError(t, err) { assert.ElementsMatch(t, []*peer.Peer{peerB}, ps) } }) @@ -246,14 +246,14 @@ func BenchmarkPingPong(b *testing.B) { peerB := getPeer(srvB) - // send initial ping to ensure that every peer is verified + // send initial Ping to ensure that every peer is verified err := protA.Ping(peerB) require.NoError(b, err) time.Sleep(graceTime) b.ResetTimer() for n := 0; n < b.N; n++ { - // send a ping from node A to B + // send a Ping from node A to B _ = protA.Ping(peerB) } @@ -276,14 +276,14 @@ func BenchmarkDiscoveryRequest(b *testing.B) { peerB := getPeer(srvB) - // send initial request to ensure that every peer is verified - _, err := protA.discoveryRequest(peerB) + // send initial DiscoveryRequest to ensure that every peer is verified + _, err := protA.DiscoveryRequest(peerB) require.NoError(b, err) time.Sleep(graceTime) b.ResetTimer() for n := 0; n < b.N; n++ { - _, _ = protA.discoveryRequest(peerB) + _, _ = protA.DiscoveryRequest(peerB) } b.StopTimer() diff --git a/packages/autopeering/discover/query_strat.go b/packages/autopeering/discover/query_strat.go index 80247c54e443ee5490cd9f5f2c19d936679df1e8..0f70a6ba2a488af5bf99f1b1132c6855622cf1c0 100644 --- a/packages/autopeering/discover/query_strat.go +++ b/packages/autopeering/discover/query_strat.go @@ -5,10 +5,6 @@ import ( "math/rand" "sync" "time" - - "github.com/iotaledger/goshimmer/packages/autopeering/peer" - "github.com/iotaledger/goshimmer/packages/autopeering/server" - "github.com/iotaledger/hive.go/backoff" ) // doQuery is the main method of the query strategy. @@ -38,16 +34,7 @@ func (m *manager) doQuery(next chan<- time.Duration) { func (m *manager) requestWorker(p *mpeer, wg *sync.WaitGroup) { defer wg.Done() - var peers []*peer.Peer - err := backoff.Retry(networkRetryPolicy, func() error { - var err error - peers, err = m.net.discoveryRequest(unwrapPeer(p)) - if err != nil && err != server.ErrTimeout { - return backoff.Permanent(err) - } - return err - }) - + peers, err := m.net.DiscoveryRequest(unwrapPeer(p)) if err != nil || len(peers) == 0 { p.lastNewPeers = 0 diff --git a/packages/autopeering/peer/peerdb.go b/packages/autopeering/peer/peerdb.go index def6344b4f270d140c121e50a5504cb0101b5aba..b20fcd052b304c5dc9b4fac5b09a3ca31dce8fa5 100644 --- a/packages/autopeering/peer/peerdb.go +++ b/packages/autopeering/peer/peerdb.go @@ -74,7 +74,7 @@ const ( // NewPersistentDB creates a new persistent DB. func NewPersistentDB(log *logger.Logger) DB { - db, err := database.Get("peer") + db, err := database.Get(database.DBPrefixAutoPeering, database.GetBadgerInstance()) if err != nil { panic(err) } @@ -157,26 +157,26 @@ func parseInt64(blob []byte) int64 { // getInt64 retrieves an integer associated with a particular key. func (db *persistentDB) getInt64(key []byte) int64 { - blob, err := db.db.Get(key) + entry, err := db.db.Get(key) if err != nil { return 0 } - return parseInt64(blob) + return parseInt64(entry.Value) } // setInt64 stores an integer in the given key. func (db *persistentDB) setInt64(key []byte, n int64) error { blob := make([]byte, binary.MaxVarintLen64) blob = blob[:binary.PutVarint(blob, n)] - return db.db.SetWithTTL(key, blob, peerExpiration) + return db.db.Set(database.Entry{Key: key, Value: blob, TTL: peerExpiration}) } // LocalPrivateKey returns the private key stored in the database or creates a new one. func (db *persistentDB) LocalPrivateKey() (PrivateKey, error) { - key, err := db.db.Get(localFieldKey(dbLocalKey)) + entry, err := db.db.Get(localFieldKey(dbLocalKey)) if err == database.ErrKeyNotFound { - key, err = generatePrivateKey() - if err == nil { + key, genErr := generatePrivateKey() + if genErr == nil { err = db.UpdateLocalPrivateKey(key) } return key, err @@ -184,13 +184,12 @@ func (db *persistentDB) LocalPrivateKey() (PrivateKey, error) { if err != nil { return nil, err } - - return key, nil + return PrivateKey(entry.Value), nil } // UpdateLocalPrivateKey stores the provided key in the database. func (db *persistentDB) UpdateLocalPrivateKey(key PrivateKey) error { - return db.db.Set(localFieldKey(dbLocalKey), key) + return db.db.Set(database.Entry{Key: localFieldKey(dbLocalKey), Value: []byte(key)}) } // LastPing returns that property for the given peer ID and address. @@ -218,7 +217,7 @@ func (db *persistentDB) setPeerWithTTL(p *Peer, ttl time.Duration) error { if err != nil { return err } - return db.db.SetWithTTL(nodeKey(p.ID()), data, ttl) + return db.db.Set(database.Entry{Key: nodeKey(p.ID()), Value: data, TTL: ttl}) } func (db *persistentDB) UpdatePeer(p *Peer) error { @@ -238,7 +237,7 @@ func (db *persistentDB) Peer(id ID) *Peer { if err != nil { return nil } - return parsePeer(data) + return parsePeer(data.Value) } func randomSubset(peers []*Peer, m int) []*Peer { @@ -259,21 +258,22 @@ func (db *persistentDB) getPeers(maxAge time.Duration) []*Peer { peers := make([]*Peer, 0) now := time.Now() - err := db.db.ForEachWithPrefix([]byte(dbNodePrefix), func(key []byte, value []byte) { - id, rest := splitNodeKey(key) + err := db.db.StreamForEachPrefix([]byte(dbNodePrefix), func(entry database.Entry) error { + id, rest := splitNodeKey(entry.Key) if len(rest) > 0 { - return + return nil } - p := parsePeer(value) + p := parsePeer(entry.Value) if p == nil || p.ID() != id { - return + return nil } if maxAge > 0 && now.Sub(db.LastPong(p.ID(), p.Address())) > maxAge { - return + return nil } peers = append(peers, p) + return nil }) if err != nil { return []*Peer{} diff --git a/packages/autopeering/selection/manager.go b/packages/autopeering/selection/manager.go index 0ac30f101df80213593e5ad3964fc220caa2001f..ae58fd5359dbb7282568356e14edcc064dfdfa6f 100644 --- a/packages/autopeering/selection/manager.go +++ b/packages/autopeering/selection/manager.go @@ -22,8 +22,8 @@ const ( type network interface { local() *peer.Local - RequestPeering(*peer.Peer, *salt.Salt) (bool, error) - SendPeeringDrop(*peer.Peer) + PeeringRequest(*peer.Peer, *salt.Salt) (bool, error) + PeeringDrop(*peer.Peer) } type peeringRequest struct { @@ -230,8 +230,7 @@ func (m *manager) updateOutbound(resultChan chan<- peer.PeerDistance) { return } - // send peering request - status, err := m.net.RequestPeering(candidate.Remote, m.getPublicSalt()) + status, err := m.net.PeeringRequest(candidate.Remote, m.getPublicSalt()) if err != nil { m.rejectionFilter.AddPeer(candidate.Remote.ID()) m.log.Debugw("error requesting peering", @@ -317,7 +316,7 @@ func (m *manager) dropNeighborhood(nh *Neighborhood) { // dropPeering sends the peering drop over the network and triggers the corresponding event. func (m *manager) dropPeering(p *peer.Peer) { - m.net.SendPeeringDrop(p) + m.net.PeeringDrop(p) m.log.Debugw("peering dropped", "id", p.ID(), diff --git a/packages/autopeering/selection/manager_test.go b/packages/autopeering/selection/manager_test.go index 15e472a20ce4f433e275dc27019338b0e2fb4955..af24ee4161a1f0aebe465a3d868fdca54db3cff1 100644 --- a/packages/autopeering/selection/manager_test.go +++ b/packages/autopeering/selection/manager_test.go @@ -197,11 +197,11 @@ func (n *networkMock) local() *peer.Local { return n.loc } -func (n *networkMock) SendPeeringDrop(p *peer.Peer) { +func (n *networkMock) PeeringDrop(p *peer.Peer) { n.mgr[p.ID()].removeNeighbor(n.local().ID()) } -func (n *networkMock) RequestPeering(p *peer.Peer, s *salt.Salt) (bool, error) { +func (n *networkMock) PeeringRequest(p *peer.Peer, s *salt.Salt) (bool, error) { return n.mgr[p.ID()].requestPeering(&n.local().Peer, s), nil } diff --git a/packages/autopeering/selection/protocol.go b/packages/autopeering/selection/protocol.go index f82697afd88be5cf25c0e14ecb98ec3833fa2d8a..61ad178d8b46ad5b55e9d85c647e9de9ea85fa9a 100644 --- a/packages/autopeering/selection/protocol.go +++ b/packages/autopeering/selection/protocol.go @@ -2,22 +2,34 @@ package selection import ( "bytes" + "errors" "fmt" "sync" "time" "github.com/golang/protobuf/proto" "github.com/iotaledger/goshimmer/packages/autopeering/peer" + "github.com/iotaledger/goshimmer/packages/autopeering/peer/service" "github.com/iotaledger/goshimmer/packages/autopeering/salt" pb "github.com/iotaledger/goshimmer/packages/autopeering/selection/proto" "github.com/iotaledger/goshimmer/packages/autopeering/server" + "github.com/iotaledger/hive.go/backoff" "github.com/iotaledger/hive.go/logger" ) +const ( + maxRetries = 2 + logSends = true +) + +// policy for retrying failed network calls +var retryPolicy = backoff.ExponentialBackOff(500*time.Millisecond, 1.5).With( + backoff.Jitter(0.5), backoff.MaxRetries(maxRetries)) + // DiscoverProtocol specifies the methods from the peer discovery that are required. type DiscoverProtocol interface { IsVerified(id peer.ID, addr string) bool - EnsureVerified(*peer.Peer) + EnsureVerified(*peer.Peer) error GetVerifiedPeer(id peer.ID, addr string) *peer.Peer GetVerifiedPeers() []*peer.Peer @@ -49,19 +61,12 @@ func New(local *peer.Local, disc DiscoverProtocol, cfg Config) *Protocol { return p } -// Local returns the associated local peer of the neighbor selection. -func (p *Protocol) local() *peer.Local { - return p.loc -} - // Start starts the actual neighbor selection over the provided Sender. func (p *Protocol) Start(s server.Sender) { p.Protocol.Sender = s p.mgr.start() - p.log.Debugw("neighborhood started", - "addr", s.LocalAddr(), - ) + p.log.Debug("neighborhood started") } // Close finalizes the protocol. @@ -94,7 +99,7 @@ func (p *Protocol) RemoveNeighbor(id peer.ID) { } // HandleMessage responds to incoming neighbor selection messages. -func (p *Protocol) HandleMessage(s *server.Server, fromAddr string, fromID peer.ID, fromKey peer.PublicKey, data []byte) (bool, error) { +func (p *Protocol) HandleMessage(s *server.Server, fromAddr string, fromID peer.ID, _ peer.PublicKey, data []byte) (bool, error) { switch pb.MType(data[0]) { // PeeringRequest @@ -103,7 +108,7 @@ func (p *Protocol) HandleMessage(s *server.Server, fromAddr string, fromID peer. if err := proto.Unmarshal(data[1:], m); err != nil { return true, fmt.Errorf("invalid message: %w", err) } - if p.validatePeeringRequest(s, fromAddr, fromID, m) { + if p.validatePeeringRequest(fromAddr, fromID, m) { p.handlePeeringRequest(s, fromAddr, fromID, data, m) } @@ -122,7 +127,7 @@ func (p *Protocol) HandleMessage(s *server.Server, fromAddr string, fromID peer. if err := proto.Unmarshal(data[1:], m); err != nil { return true, fmt.Errorf("invalid message: %w", err) } - if p.validatePeeringDrop(s, fromAddr, m) { + if p.validatePeeringDrop(fromAddr, m) { p.handlePeeringDrop(fromID) } @@ -133,12 +138,24 @@ func (p *Protocol) HandleMessage(s *server.Server, fromAddr string, fromID peer. return true, nil } +// Local returns the associated local peer of the neighbor selection. +func (p *Protocol) local() *peer.Local { + return p.loc +} + +// publicAddr returns the public address of the peering service in string representation. +func (p *Protocol) publicAddr() string { + return p.loc.Services().Get(service.PeeringKey).String() +} + // ------ message senders ------ -// RequestPeering sends a peering request to the given peer. This method blocks +// PeeringRequest sends a PeeringRequest to the given peer. This method blocks // until a response is received and the status answer is returned. -func (p *Protocol) RequestPeering(to *peer.Peer, salt *salt.Salt) (bool, error) { - p.disc.EnsureVerified(to) +func (p *Protocol) PeeringRequest(to *peer.Peer, salt *salt.Salt) (bool, error) { + if err := p.disc.EnsureVerified(to); err != nil { + return false, err + } // create the request package toAddr := to.Address() @@ -158,29 +175,33 @@ func (p *Protocol) RequestPeering(to *peer.Peer, salt *salt.Salt) (bool, error) return true } - p.log.Debugw("send message", - "type", req.Name(), - "addr", toAddr, - ) - err := <-p.Protocol.SendExpectingReply(toAddr, to.ID(), data, pb.MPeeringResponse, callback) - + err := backoff.Retry(retryPolicy, func() error { + p.logSend(toAddr, req) + err := <-p.Protocol.SendExpectingReply(toAddr, to.ID(), data, pb.MPeeringResponse, callback) + if err != nil && !errors.Is(err, server.ErrTimeout) { + return backoff.Permanent(err) + } + return err + }) return status, err } -// SendPeeringDrop sends a peering drop to the given peer and does not wait for any responses. -func (p *Protocol) SendPeeringDrop(to *peer.Peer) { - toAddr := to.Address() - drop := newPeeringDrop(toAddr) +// PeeringDrop sends a peering drop message to the given peer, non-blocking and does not wait for any responses. +func (p *Protocol) PeeringDrop(to *peer.Peer) { + drop := newPeeringDrop(to.Address()) - p.log.Debugw("send message", - "type", drop.Name(), - "addr", toAddr, - ) + p.logSend(to.Address(), drop) p.Protocol.Send(to, marshal(drop)) } // ------ helper functions ------ +func (p *Protocol) logSend(toAddr string, msg pb.Message) { + if logSends { + p.log.Debugw("send message", "type", msg.Name(), "addr", toAddr) + } +} + func marshal(msg pb.Message) []byte { mType := msg.Type() if mType > 0xFF { @@ -194,7 +215,7 @@ func marshal(msg pb.Message) []byte { return append([]byte{byte(mType)}, data...) } -// ------ Packet Constructors ------ +// ------ Message Constructors ------ func newPeeringRequest(toAddr string, salt *salt.Salt) *pb.PeeringRequest { return &pb.PeeringRequest{ @@ -220,12 +241,13 @@ func newPeeringDrop(toAddr string) *pb.PeeringDrop { // ------ Packet Handlers ------ -func (p *Protocol) validatePeeringRequest(s *server.Server, fromAddr string, fromID peer.ID, m *pb.PeeringRequest) bool { +func (p *Protocol) validatePeeringRequest(fromAddr string, fromID peer.ID, m *pb.PeeringRequest) bool { // check that To matches the local address - if m.GetTo() != s.LocalAddr() { + if m.GetTo() != p.publicAddr() { p.log.Debugw("invalid message", "type", m.Name(), "to", m.GetTo(), + "want", p.publicAddr(), ) return false } @@ -246,7 +268,7 @@ func (p *Protocol) validatePeeringRequest(s *server.Server, fromAddr string, fro return false } // check Salt - salt, err := salt.FromProto(m.GetSalt()) + s, err := salt.FromProto(m.GetSalt()) if err != nil { p.log.Debugw("invalid message", "type", m.Name(), @@ -255,10 +277,10 @@ func (p *Protocol) validatePeeringRequest(s *server.Server, fromAddr string, fro return false } // check salt expiration - if salt.Expired() { + if s.Expired() { p.log.Debugw("invalid message", "type", m.Name(), - "salt.expiration", salt.GetExpiration(), + "salt.expiration", s.GetExpiration(), ) return false } @@ -289,10 +311,7 @@ func (p *Protocol) handlePeeringRequest(s *server.Server, fromAddr string, fromI res := newPeeringResponse(rawData, p.mgr.requestPeering(from, salt)) - p.log.Debugw("send message", - "type", res.Name(), - "addr", fromAddr, - ) + p.logSend(fromAddr, res) s.Send(fromAddr, marshal(res)) } @@ -313,12 +332,13 @@ func (p *Protocol) validatePeeringResponse(s *server.Server, fromAddr string, fr return true } -func (p *Protocol) validatePeeringDrop(s *server.Server, fromAddr string, m *pb.PeeringDrop) bool { +func (p *Protocol) validatePeeringDrop(fromAddr string, m *pb.PeeringDrop) bool { // check that To matches the local address - if m.GetTo() != s.LocalAddr() { + if m.GetTo() != p.publicAddr() { p.log.Debugw("invalid message", "type", m.Name(), "to", m.GetTo(), + "want", p.publicAddr(), ) return false } diff --git a/packages/autopeering/selection/protocol_test.go b/packages/autopeering/selection/protocol_test.go index d43820f87d7e3150a2b4cc6a7888dec69ed3722e..ce6e59e1ba299fcdf1fec458ffbabd0406101b5d 100644 --- a/packages/autopeering/selection/protocol_test.go +++ b/packages/autopeering/selection/protocol_test.go @@ -24,7 +24,7 @@ var peerMap = make(map[peer.ID]*peer.Peer) type dummyDiscovery struct{} func (d dummyDiscovery) IsVerified(peer.ID, string) bool { return true } -func (d dummyDiscovery) EnsureVerified(*peer.Peer) {} +func (d dummyDiscovery) EnsureVerified(*peer.Peer) error { return nil } func (d dummyDiscovery) GetVerifiedPeer(id peer.ID, _ string) *peer.Peer { return peerMap[id] } func (d dummyDiscovery) GetVerifiedPeers() []*peer.Peer { return []*peer.Peer{} } @@ -77,13 +77,13 @@ func TestProtocol(t *testing.T) { // request peering to peer B t.Run("A->B", func(t *testing.T) { - if services, err := protA.RequestPeering(peerB, saltA); assert.NoError(t, err) { + if services, err := protA.PeeringRequest(peerB, saltA); assert.NoError(t, err) { assert.NotEmpty(t, services) } }) // request peering to peer A t.Run("B->A", func(t *testing.T) { - if services, err := protB.RequestPeering(peerA, saltB); assert.NoError(t, err) { + if services, err := protB.PeeringRequest(peerA, saltB); assert.NoError(t, err) { assert.NotEmpty(t, services) } }) @@ -102,7 +102,7 @@ func TestProtocol(t *testing.T) { peerB := getPeer(srvB) // request peering to peer B - _, err := protA.RequestPeering(peerB, saltA) + _, err := protA.PeeringRequest(peerB, saltA) assert.EqualError(t, err, server.ErrTimeout.Error()) }) @@ -120,14 +120,14 @@ func TestProtocol(t *testing.T) { peerB := getPeer(srvB) // request peering to peer B - services, err := protA.RequestPeering(peerB, saltA) + services, err := protA.PeeringRequest(peerB, saltA) require.NoError(t, err) assert.NotEmpty(t, services) require.Contains(t, protB.GetNeighbors(), peerA) // drop peer A - protA.SendPeeringDrop(peerB) + protA.PeeringDrop(peerB) time.Sleep(graceTime) require.NotContains(t, protB.GetNeighbors(), peerA) }) diff --git a/packages/autopeering/selection/selection.go b/packages/autopeering/selection/selection.go index f97acbdbb30703559aaa643f49d749c83c394d72..66183f246131b9d9cf9cdad18888166f44d346a3 100644 --- a/packages/autopeering/selection/selection.go +++ b/packages/autopeering/selection/selection.go @@ -24,9 +24,9 @@ func NewFilter() *Filter { func (f *Filter) Apply(list []peer.PeerDistance) (filteredList []peer.PeerDistance) { f.lock.RLock() defer f.lock.RUnlock() - for _, peer := range list { - if !f.internal[peer.Remote.ID()] { - filteredList = append(filteredList, peer) + for _, p := range list { + if !f.internal[p.Remote.ID()] { + filteredList = append(filteredList, p) } } return filteredList @@ -36,11 +36,11 @@ func (f *Filter) PushBack(list []peer.PeerDistance) (filteredList []peer.PeerDis var head, tail []peer.PeerDistance f.lock.RLock() defer f.lock.RUnlock() - for _, peer := range list { - if f.internal[peer.Remote.ID()] { - tail = append(tail, peer) + for _, p := range list { + if f.internal[p.Remote.ID()] { + tail = append(tail, p) } else { - head = append(head, peer) + head = append(head, p) } } return append(head, tail...) @@ -48,8 +48,8 @@ func (f *Filter) PushBack(list []peer.PeerDistance) (filteredList []peer.PeerDis func (f *Filter) AddPeers(n []*peer.Peer) { f.lock.Lock() - for _, peer := range n { - f.internal[peer.ID()] = true + for _, p := range n { + f.internal[p.ID()] = true } f.lock.Unlock() } diff --git a/packages/autopeering/server/common.go b/packages/autopeering/server/common.go index 4defddc2db2b2af520ac791ac37fd29421321246..593d916b1ff93a8ccf36c279208ff2d4a25cecbf 100644 --- a/packages/autopeering/server/common.go +++ b/packages/autopeering/server/common.go @@ -11,9 +11,6 @@ type MType uint // The Sender interface specifies common method required to send requests. type Sender interface { - LocalAddr() string - LocalNetwork() string - Send(toAddr string, data []byte) SendExpectingReply(toAddr string, toID peer.ID, data []byte, replyType MType, callback func(interface{}) bool) <-chan error } diff --git a/packages/autopeering/server/protocol.go b/packages/autopeering/server/protocol.go index bab76b712f2d9710cdb08e255a7498707f3743b7..e3ef6a622e3126f33bf62aff78527eda663e0587 100644 --- a/packages/autopeering/server/protocol.go +++ b/packages/autopeering/server/protocol.go @@ -15,16 +15,6 @@ type Protocol struct { Sender Sender // interface to send own requests } -// LocalAddr returns the local network address in string form. -func (p *Protocol) LocalAddr() string { - return p.Sender.LocalAddr() -} - -// LocalNetwork returns the name of the local network (for example, "tcp", "udp"). -func (p *Protocol) LocalNetwork() string { - return p.Sender.LocalNetwork() -} - // Send sends the data to the given peer. func (p *Protocol) Send(to *peer.Peer, data []byte) { p.Sender.Send(to.Address(), data) diff --git a/packages/autopeering/server/server.go b/packages/autopeering/server/server.go index 74d1bfbda4d911265602e11836bbeeaa624796bd..88ca6cc55daa53d44da03e69e7f05875c2b5ee1b 100644 --- a/packages/autopeering/server/server.go +++ b/packages/autopeering/server/server.go @@ -4,6 +4,7 @@ import ( "container/list" "fmt" "io" + "net" "sync" "time" @@ -87,6 +88,8 @@ func Listen(local *peer.Local, t transport.Transport, log *logger.Logger, h ...H go srv.replyLoop() go srv.readLoop() + log.Debugw("server started", "addr", srv.LocalAddr(), "#handlers", len(h)) + return srv } @@ -104,14 +107,9 @@ func (s *Server) Local() *peer.Local { return s.local } -// LocalNetwork returns the network of the local peer. -func (s *Server) LocalNetwork() string { - return s.network -} - // LocalAddr returns the address of the local peer in string form. -func (s *Server) LocalAddr() string { - return s.address +func (s *Server) LocalAddr() net.Addr { + return s.trans.LocalAddr() } // Send sends a message to the given address diff --git a/packages/autopeering/server/server_test.go b/packages/autopeering/server/server_test.go index f3cff2c83e7509697e4f7a8626432a2f2d648d21..58bc5a60e3134a2f03bb423bf4d9bfddd13682c6 100644 --- a/packages/autopeering/server/server_test.go +++ b/packages/autopeering/server/server_test.go @@ -205,5 +205,5 @@ func TestUnexpectedPong(t *testing.T) { // there should never be a Ping.Handle // there should never be a Pong.Handle - srvA.Send(srvB.LocalAddr(), new(Pong).Marshal()) + srvA.Send(srvB.LocalAddr().String(), new(Pong).Marshal()) } diff --git a/packages/database/badger_instance.go b/packages/database/badger_instance.go deleted file mode 100644 index 439aa4727db7bd7f56c29bbc46896cbba7ae7b7e..0000000000000000000000000000000000000000 --- a/packages/database/badger_instance.go +++ /dev/null @@ -1,69 +0,0 @@ -package database - -import ( - "fmt" - "os" - "sync" - - "github.com/dgraph-io/badger" - "github.com/dgraph-io/badger/options" - "github.com/iotaledger/goshimmer/packages/parameter" -) - -var instance *badger.DB -var once sync.Once - -// Returns whether the given file or directory exists. -func exists(path string) (bool, error) { - _, err := os.Stat(path) - if err == nil { - return true, nil - } - if os.IsNotExist(err) { - return false, nil - } - return false, err -} - -func checkDir(dir string) error { - exists, err := exists(dir) - if err != nil { - return err - } - - if !exists { - return os.Mkdir(dir, 0700) - } - return nil -} - -func createDB() (*badger.DB, error) { - directory := parameter.NodeConfig.GetString(CFG_DIRECTORY) - if err := checkDir(directory); err != nil { - return nil, fmt.Errorf("could not check directory: %w", err) - } - - opts := badger.DefaultOptions(directory) - opts.Logger = &logger{} - opts.Truncate = true - opts.TableLoadingMode = options.MemoryMap - - db, err := badger.Open(opts) - if err != nil { - return nil, fmt.Errorf("could not open new DB: %w", err) - } - - return db, nil -} - -func GetBadgerInstance() *badger.DB { - once.Do(func() { - db, err := createDB() - if err != nil { - // errors should cause a panic to avoid singleton deadlocks - panic(err) - } - instance = db - }) - return instance -} diff --git a/packages/database/database.go b/packages/database/database.go index 913ffc3f91589637dddc22272a4e8ee45cb56214..4da5818cd93cde477656d97f9d069267d0f2ccba 100644 --- a/packages/database/database.go +++ b/packages/database/database.go @@ -1,134 +1,78 @@ package database import ( + "io/ioutil" + "os" + "runtime" "sync" - "time" - "github.com/dgraph-io/badger" + "github.com/dgraph-io/badger/v2" + "github.com/iotaledger/goshimmer/packages/parameter" + "github.com/iotaledger/hive.go/database" + "github.com/iotaledger/hive.go/logger" ) var ( - ErrKeyNotFound = badger.ErrKeyNotFound - - dbMap = make(map[string]*prefixDb) - mu sync.Mutex + instance *badger.DB + once sync.Once + ErrKeyNotFound = database.ErrKeyNotFound ) -type prefixDb struct { - db *badger.DB - name string - prefix []byte -} - -func getPrefix(name string) []byte { - return []byte(name + "_") -} - -func Get(name string) (Database, error) { - mu.Lock() - defer mu.Unlock() - - if db, exists := dbMap[name]; exists { - return db, nil - } - - badger := GetBadgerInstance() - db := &prefixDb{ - db: badger, - name: name, - prefix: getPrefix(name), - } - - dbMap[name] = db - - return db, nil -} - -func (this *prefixDb) setEntry(e *badger.Entry) error { - err := this.db.Update(func(txn *badger.Txn) error { - return txn.SetEntry(e) - }) - return err -} - -func (this *prefixDb) Set(key []byte, value []byte) error { - e := badger.NewEntry(append(this.prefix, key...), value) - return this.setEntry(e) -} +type ( + Database = database.Database + Entry = database.Entry + KeyOnlyEntry = database.KeyOnlyEntry + KeyPrefix = database.KeyPrefix + Value = database.Value +) -func (this *prefixDb) SetWithTTL(key []byte, value []byte, ttl time.Duration) error { - e := badger.NewEntry(append(this.prefix, key...), value).WithTTL(ttl) - return this.setEntry(e) +func Get(dbPrefix byte, optionalBadger ...*badger.DB) (Database, error) { + return database.Get(dbPrefix, optionalBadger...) } -func (this *prefixDb) Contains(key []byte) (bool, error) { - err := this.db.View(func(txn *badger.Txn) error { - _, err := txn.Get(append(this.prefix, key...)) - return err - }) - - if err == ErrKeyNotFound { - return false, nil - } else { - return err == nil, err - } -} +func GetBadgerInstance() *badger.DB { + once.Do(func() { + dbDir := parameter.NodeConfig.GetString(CFG_DIRECTORY) -func (this *prefixDb) Get(key []byte) ([]byte, error) { - var result []byte = nil - - err := this.db.View(func(txn *badger.Txn) error { - item, err := txn.Get(append(this.prefix, key...)) + var dbDirClear bool + // check whether the database is new, by checking whether any file exists within + // the database directory + fileInfos, err := ioutil.ReadDir(dbDir) if err != nil { - return err + // panic on other errors, for example permission related + if !os.IsNotExist(err) { + panic(err) + } + dbDirClear = true + } + if len(fileInfos) == 0 { + dbDirClear = true } - return item.Value(func(val []byte) error { - result = append([]byte{}, val...) - - return nil - }) - }) - - return result, err -} - -func (this *prefixDb) Delete(key []byte) error { - err := this.db.Update(func(txn *badger.Txn) error { - return txn.Delete(append(this.prefix, key...)) - }) - return err -} - -func (this *prefixDb) forEach(prefix []byte, consumer func([]byte, []byte)) error { - err := this.db.View(func(txn *badger.Txn) error { - iteratorOptions := badger.DefaultIteratorOptions - iteratorOptions.Prefix = prefix // filter by prefix - - // create an iterator the default options - it := txn.NewIterator(iteratorOptions) - defer it.Close() - - // loop through every key-value-pair and call the function - for it.Rewind(); it.Valid(); it.Next() { - item := it.Item() - - value, err := item.ValueCopy(nil) - if err != nil { - return err - } + opts := badger.DefaultOptions(dbDir) + opts.Logger = nil + if runtime.GOOS == "windows" { + opts = opts.WithTruncate(true) + } - consumer(item.Key()[len(this.prefix):], value) + db, err := database.CreateDB(dbDir, opts) + if err != nil { + // errors should cause a panic to avoid singleton deadlocks + panic(err) } - return nil - }) - return err -} + instance = db -func (this *prefixDb) ForEachWithPrefix(prefix []byte, consumer func([]byte, []byte)) error { - return this.forEach(append(this.prefix, prefix...), consumer) + // up on the first caller, check whether the version of the database is compatible + checkDatabaseVersion(dbDirClear) + }) + return instance } -func (this *prefixDb) ForEach(consumer func([]byte, []byte)) error { - return this.forEach(this.prefix, consumer) +func CleanupBadgerInstance(log *logger.Logger) { + db := GetBadgerInstance() + log.Info("Running Badger database garbage collection") + var err error + for err == nil { + err = db.RunValueLogGC(0.7) + } } diff --git a/packages/database/interfaces.go b/packages/database/interfaces.go deleted file mode 100644 index 56d2b9573836153e1b487802510ce0d7953c13ca..0000000000000000000000000000000000000000 --- a/packages/database/interfaces.go +++ /dev/null @@ -1,13 +0,0 @@ -package database - -import "time" - -type Database interface { - Set(key []byte, value []byte) error - SetWithTTL(key []byte, value []byte, ttl time.Duration) error - Contains(key []byte) (bool, error) - Get(key []byte) ([]byte, error) - ForEach(consumer func(key []byte, value []byte)) error - ForEachWithPrefix(prefix []byte, consumer func(key []byte, value []byte)) error - Delete(key []byte) error -} diff --git a/packages/database/logger.go b/packages/database/logger.go deleted file mode 100644 index ac1de091452794ddb04857acd94df5ff87df72d7..0000000000000000000000000000000000000000 --- a/packages/database/logger.go +++ /dev/null @@ -1,19 +0,0 @@ -package database - -type logger struct{} - -func (this *logger) Errorf(string, ...interface{}) { - // disable logging -} - -func (this *logger) Infof(string, ...interface{}) { - // disable logging -} - -func (this *logger) Warningf(string, ...interface{}) { - // disable logging -} - -func (this *logger) Debugf(string, ...interface{}) { - // disable logging -} diff --git a/packages/database/prefixes.go b/packages/database/prefixes.go new file mode 100644 index 0000000000000000000000000000000000000000..c9dc488d40b5fbe557467e1be275e6aa56052aec --- /dev/null +++ b/packages/database/prefixes.go @@ -0,0 +1,11 @@ +package database + +const ( + DBPrefixApprovers byte = iota + DBPrefixTransaction + DBPrefixBundle + DBPrefixTransactionMetadata + DBPrefixAddressTransactions + DBPrefixAutoPeering + DBPrefixDatabaseVersion +) diff --git a/packages/database/versioning.go b/packages/database/versioning.go new file mode 100644 index 0000000000000000000000000000000000000000..a848eb0e508a7ecbf3b9c61773700fdfdaa79b1c --- /dev/null +++ b/packages/database/versioning.go @@ -0,0 +1,47 @@ +package database + +import ( + "errors" + "fmt" +) + +const ( + // DBVersion defines the version of the database schema this version of GoShimmer supports. + // everytime there's a breaking change regarding the stored data, this version flag should be adjusted. + DBVersion = 1 +) + +var ( + ErrDBVersionIncompatible = errors.New("database version is not compatible. please delete your database folder and restart") + // the key under which the database is stored + dbVersionKey = []byte{0} +) + +// checks whether the database is compatible with the current schema version. +// also automatically sets the version if the database is new. +func checkDatabaseVersion(dbIsNew bool) { + dbInstance, err := Get(DBPrefixDatabaseVersion, instance) + if err != nil { + panic(err) + } + + if dbIsNew { + // store db version for the first time in the new database + if err = dbInstance.Set(Entry{Key: dbVersionKey, Value: []byte{DBVersion}}); err != nil { + panic(fmt.Sprintf("unable to persist db version number: %s", err.Error())) + } + return + } + + // db version must be available + entry, err := dbInstance.Get(dbVersionKey) + if err != nil { + if err == ErrKeyNotFound { + panic(err) + } + panic(fmt.Errorf("%w: no database version was persisted", ErrDBVersionIncompatible)) + } + if entry.Value[0] != DBVersion { + panic(fmt.Errorf("%w: supported version: %d, version of database: %d", ErrDBVersionIncompatible, DBVersion, entry.Value[0])) + } +} diff --git a/packages/gossip/manager.go b/packages/gossip/manager.go index 683cd24c966051d67b55cb91f8387fbbc457abd0..19933333c6755203f505b1145fe417574bd31f6e 100644 --- a/packages/gossip/manager.go +++ b/packages/gossip/manager.go @@ -7,7 +7,6 @@ import ( "github.com/golang/protobuf/proto" "github.com/iotaledger/goshimmer/packages/autopeering/peer" - "github.com/iotaledger/goshimmer/packages/autopeering/peer/service" pb "github.com/iotaledger/goshimmer/packages/gossip/proto" "github.com/iotaledger/goshimmer/packages/gossip/server" "github.com/iotaledger/hive.go/events" @@ -71,11 +70,6 @@ func (m *Manager) stop() { } } -// LocalAddr returns the public address of the gossip service. -func (m *Manager) LocalAddr() net.Addr { - return m.local.Services().Get(service.GossipKey) -} - // AddOutbound tries to add a neighbor by connecting to that peer. func (m *Manager) AddOutbound(p *peer.Peer) error { if p.ID() == m.local.ID() { diff --git a/packages/shutdown/order.go b/packages/shutdown/order.go index b16bc91e04b45b2e9744555ae229ab359d9cb18e..51ed31ec8d11251d947628842d80542e7f05d7dd 100644 --- a/packages/shutdown/order.go +++ b/packages/shutdown/order.go @@ -14,5 +14,6 @@ const ( ShutdownPriorityUI ShutdownPriorityDashboard ShutdownPriorityTangleSpammer + ShutdownPriorityBadgerGarbageCollection ShutdownPriorityStatusScreen ) diff --git a/packages/transactionspammer/transactionspammer.go b/packages/transactionspammer/transactionspammer.go index dcfb1da1ed9d5dbea4dd3ad676054a7e30940f26..67ba1fa87f950b6589a2186d47e0c104fc182ad1 100644 --- a/packages/transactionspammer/transactionspammer.go +++ b/packages/transactionspammer/transactionspammer.go @@ -1,20 +1,22 @@ package transactionspammer import ( + "strings" "sync" "time" - "github.com/iotaledger/goshimmer/packages/shutdown" - "github.com/iotaledger/goshimmer/plugins/autopeering/local" - "github.com/iotaledger/goshimmer/packages/gossip" "github.com/iotaledger/goshimmer/packages/model/meta_transaction" "github.com/iotaledger/goshimmer/packages/model/value_transaction" + "github.com/iotaledger/goshimmer/packages/shutdown" + "github.com/iotaledger/goshimmer/plugins/autopeering/local" "github.com/iotaledger/goshimmer/plugins/tipselection" "github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/logger" ) +const logEveryNTransactions = 5000 + var log *logger.Logger var spamming = false @@ -23,14 +25,14 @@ var spammingMutex sync.Mutex var shutdownSignal chan struct{} var done chan struct{} -var sentCounter = uint(0) - func init() { shutdownSignal = make(chan struct{}) done = make(chan struct{}) } -func Start(tps uint) { +var targetAddress = strings.Repeat("SPAMMMMER", 9) + +func Start(tps uint64) { log = logger.NewLogger("Transaction Spammer") spammingMutex.Lock() spamming = true @@ -38,8 +40,11 @@ func Start(tps uint) { daemon.BackgroundWorker("Transaction Spammer", func(daemonShutdownSignal <-chan struct{}) { start := time.Now() - totalSentCounter := int64(0) + var totalSentCounter, currentSentCounter uint64 + + log.Infof("started spammer...will output sent count every %d transactions", logEveryNTransactions) + defer log.Infof("spammer stopped, spammed %d transactions", totalSentCounter) for { select { case <-daemonShutdownSignal: @@ -50,13 +55,13 @@ func Start(tps uint) { return default: - sentCounter++ + currentSentCounter++ totalSentCounter++ tx := value_transaction.New() tx.SetHead(true) tx.SetTail(true) - tx.SetValue(totalSentCounter) + tx.SetAddress(targetAddress) tx.SetBranchTransactionHash(tipselection.GetRandomTip()) tx.SetTrunkTransactionHash(tipselection.GetRandomTip()) tx.SetTimestamp(uint(time.Now().Unix())) @@ -67,7 +72,12 @@ func Start(tps uint) { gossip.Events.TransactionReceived.Trigger(&gossip.TransactionReceivedEvent{Data: tx.GetBytes(), Peer: &local.GetInstance().Peer}) - if sentCounter >= tps { + if totalSentCounter%logEveryNTransactions == 0 { + log.Infof("spammed %d transactions", totalSentCounter) + } + + // rate limit to the specified TPS + if currentSentCounter >= tps { duration := time.Since(start) if duration < time.Second { @@ -75,8 +85,7 @@ func Start(tps uint) { } start = time.Now() - - sentCounter = 0 + currentSentCounter = 0 } } } diff --git a/plugins/analysis/webinterface/httpserver/plugin.go b/plugins/analysis/webinterface/httpserver/plugin.go index fa3f2b501bdb34eefa24106440042fa63fc44e10..d8330000f20d3c980ebbd95f452a07bae9ec28ba 100644 --- a/plugins/analysis/webinterface/httpserver/plugin.go +++ b/plugins/analysis/webinterface/httpserver/plugin.go @@ -1,22 +1,29 @@ package httpserver import ( + "errors" "net/http" + "sync" "time" "github.com/iotaledger/goshimmer/packages/shutdown" "github.com/iotaledger/hive.go/daemon" - "github.com/iotaledger/hive.go/node" + "github.com/iotaledger/hive.go/logger" "golang.org/x/net/context" "golang.org/x/net/websocket" ) var ( + log *logger.Logger httpServer *http.Server router *http.ServeMux ) -func Configure(plugin *node.Plugin) { +const name = "Analysis HTTP Server" + +func Configure() { + log = logger.NewLogger(name) + router = http.NewServeMux() httpServer = &http.Server{Addr: ":80", Handler: router} @@ -24,12 +31,30 @@ func Configure(plugin *node.Plugin) { router.HandleFunc("/", index) } -func Run(plugin *node.Plugin) { - daemon.BackgroundWorker("Analysis HTTP Server", func(shutdownSignal <-chan struct{}) { - go httpServer.ListenAndServe() - <-shutdownSignal - ctx, cancel := context.WithTimeout(context.Background(), 0*time.Second) - defer cancel() - httpServer.Shutdown(ctx) - }, shutdown.ShutdownPriorityAnalysis) +func Run() { + if err := daemon.BackgroundWorker(name, start, shutdown.ShutdownPriorityAnalysis); err != nil { + log.Errorf("Error starting as daemon: %s", err) + } +} + +func start(shutdownSignal <-chan struct{}) { + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + log.Infof(name+" started: address=%s", httpServer.Addr) + if err := httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Warnf("Error listening: %s", err) + } + }() + <-shutdownSignal + log.Info("Stopping " + name + " ...") + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + if err := httpServer.Shutdown(ctx); err != nil { + log.Errorf("Error closing: %s", err) + } + wg.Wait() + log.Info("Stopping " + name + " ... done") } diff --git a/plugins/analysis/webinterface/plugin.go b/plugins/analysis/webinterface/plugin.go index f217dde659b2b19a159f5b2b2b493c0893cf7046..68d1a33d2a70efaa1db252192725c5118304b1a0 100644 --- a/plugins/analysis/webinterface/plugin.go +++ b/plugins/analysis/webinterface/plugin.go @@ -7,10 +7,10 @@ import ( ) func Configure(plugin *node.Plugin) { - httpserver.Configure(plugin) + httpserver.Configure() recordedevents.Configure(plugin) } func Run(plugin *node.Plugin) { - httpserver.Run(plugin) + httpserver.Run() } diff --git a/plugins/autopeering/autopeering.go b/plugins/autopeering/autopeering.go index 33fb047aa5397424180c8d1641ce2a87926652f9..76c8122a6b0b467dc012e5e37c8d2c70cfd5b780 100644 --- a/plugins/autopeering/autopeering.go +++ b/plugins/autopeering/autopeering.go @@ -13,6 +13,7 @@ import ( "github.com/iotaledger/goshimmer/packages/autopeering/selection" "github.com/iotaledger/goshimmer/packages/autopeering/server" "github.com/iotaledger/goshimmer/packages/autopeering/transport" + "github.com/iotaledger/goshimmer/packages/netutil" "github.com/iotaledger/goshimmer/packages/parameter" "github.com/iotaledger/goshimmer/plugins/autopeering/local" "github.com/iotaledger/goshimmer/plugins/cli" @@ -56,8 +57,9 @@ func configureAP() { func start(shutdownSignal <-chan struct{}) { defer log.Info("Stopping " + name + " ... done") - addr := local.GetInstance().Services().Get(service.PeeringKey) - udpAddr, err := net.ResolveUDPAddr(addr.Network(), addr.String()) + loc := local.GetInstance() + peeringAddr := loc.Services().Get(service.PeeringKey) + udpAddr, err := net.ResolveUDPAddr(peeringAddr.Network(), peeringAddr.String()) if err != nil { log.Fatalf("ResolveUDPAddr: %v", err) } @@ -71,7 +73,12 @@ func start(shutdownSignal <-chan struct{}) { } } - conn, err := net.ListenUDP(addr.Network(), udpAddr) + // check that discovery is working and the port is open + log.Info("Testing service ...") + checkConnection(udpAddr, &loc.Peer) + log.Info("Testing service ... done") + + conn, err := net.ListenUDP(peeringAddr.Network(), udpAddr) if err != nil { log.Fatalf("ListenUDP: %v", err) } @@ -86,26 +93,21 @@ func start(shutdownSignal <-chan struct{}) { } // start a server doing discovery and peering - srv := server.Listen(local.GetInstance(), trans, log.Named("srv"), handlers...) + srv := server.Listen(loc, trans, log.Named("srv"), handlers...) defer srv.Close() // start the discovery on that connection Discovery.Start(srv) defer Discovery.Close() - //check that discovery is working and the port is open - log.Info("Testing service ...") - checkConnection(srv, &local.GetInstance().Peer) - log.Info("Testing service ... done") - if Selection != nil { // start the peering on that connection Selection.Start(srv) defer Selection.Close() } - log.Infof(name+" started: address=%s/udp", srv.LocalAddr()) - log.Debugf(name+" server started: PubKey=%s", base64.StdEncoding.EncodeToString(local.GetInstance().PublicKey())) + log.Infof(name+" started: address=%s/%s", peeringAddr.String(), peeringAddr.Network()) + log.Debugf(name+" server started: PubKey=%s", base64.StdEncoding.EncodeToString(loc.PublicKey())) <-shutdownSignal log.Info("Stopping " + name + " ...") @@ -135,14 +137,18 @@ func parseEntryNodes() (result []*peer.Peer, err error) { return result, nil } -func checkConnection(srv *server.Server, self *peer.Peer) { - if err := Discovery.Ping(self); err != nil { - if err == server.ErrTimeout { - log.Errorf("Error testing service: %s", err) - addr := self.Services().Get(service.PeeringKey) - log.Panicf("Please check that %s is publicly reachable at %s/%s", - cli.AppName, addr.String(), addr.Network()) - } - log.Panicf("Error: %s", err) +func checkConnection(localAddr *net.UDPAddr, self *peer.Peer) { + peering := self.Services().Get(service.PeeringKey) + remoteAddr, err := net.ResolveUDPAddr(peering.Network(), peering.String()) + if err != nil { + panic(err) + } + + // do not check the address as a NAT may change them for local connections + err = netutil.CheckUDP(localAddr, remoteAddr, false, true) + if err != nil { + log.Errorf("Error testing service: %s", err) + log.Panicf("Please check that %s is publicly reachable at %s/%s", + cli.AppName, peering.String(), peering.Network()) } } diff --git a/plugins/bundleprocessor/bundleprocessor_test.go b/plugins/bundleprocessor/bundleprocessor_test.go index 6505583f451516b7db36c7d5a0c20d90d56fa490..e7b90366bd676bf6a0ba3d284ba6e241a368b3f9 100644 --- a/plugins/bundleprocessor/bundleprocessor_test.go +++ b/plugins/bundleprocessor/bundleprocessor_test.go @@ -1,10 +1,13 @@ package bundleprocessor import ( + "io/ioutil" + "os" "sync" "testing" "github.com/iotaledger/goshimmer/packages/client" + "github.com/iotaledger/goshimmer/packages/database" "github.com/iotaledger/goshimmer/packages/model/bundle" "github.com/iotaledger/goshimmer/packages/model/value_transaction" "github.com/iotaledger/goshimmer/packages/parameter" @@ -16,6 +19,7 @@ import ( "github.com/iotaledger/iota.go/consts" "github.com/magiconair/properties/assert" "github.com/spf13/viper" + "github.com/stretchr/testify/require" ) var seed = client.NewSeed("YFHQWAUPCXC9S9DSHP9NDF9RLNPMZVCMSJKUKQP9SWUSUCPRQXCMDVDVZ9SHHESHIQNCXWBJF9UJSWE9Z", consts.SecurityLevelMedium) @@ -69,6 +73,12 @@ func TestValidateSignatures(t *testing.T) { } func TestProcessSolidBundleHead(t *testing.T) { + dir, err := ioutil.TempDir("", t.Name()) + require.NoError(t, err) + defer os.Remove(dir) + // use the tempdir for the database + parameter.NodeConfig.Set(database.CFG_DIRECTORY, dir) + // start a test node node.Start(node.Plugins(tangle.PLUGIN, PLUGIN)) defer node.Shutdown() diff --git a/plugins/gossip/gossip.go b/plugins/gossip/gossip.go index b6ea302a756c98149dfa9b68c849244e3a6b99d0..5a6904c5accdeb4780e1efd607f501f3f81e24cf 100644 --- a/plugins/gossip/gossip.go +++ b/plugins/gossip/gossip.go @@ -44,7 +44,8 @@ func configureGossip() { func start(shutdownSignal <-chan struct{}) { defer log.Info("Stopping " + name + " ... done") - srv, err := server.ListenTCP(local.GetInstance(), log) + loc := local.GetInstance() + srv, err := server.ListenTCP(loc, log) if err != nil { log.Fatalf("ListenTCP: %v", err) } @@ -52,13 +53,14 @@ func start(shutdownSignal <-chan struct{}) { //check that the server is working and the port is open log.Info("Testing service ...") - checkConnection(srv, &local.GetInstance().Peer) + checkConnection(srv, &loc.Peer) log.Info("Testing service ... done") mgr.Start(srv) defer mgr.Close() - log.Infof(name+" started: address=%s/%s", mgr.LocalAddr().String(), mgr.LocalAddr().Network()) + gossipAddr := loc.Services().Get(service.GossipKey) + log.Infof(name+" started: address=%s/%s", gossipAddr.String(), gossipAddr.Network()) <-shutdownSignal log.Info("Stopping " + name + " ...") diff --git a/plugins/tangle/approvers.go b/plugins/tangle/approvers.go index 10073c39435df28a04fefc943518312582ab5eee..3e50c29caec97397832490d911ca3989ebe3be6b 100644 --- a/plugins/tangle/approvers.go +++ b/plugins/tangle/approvers.go @@ -82,7 +82,7 @@ const ( var approversDatabase database.Database func configureApproversDatabase() { - if db, err := database.Get("approvers"); err != nil { + if db, err := database.Get(database.DBPrefixApprovers, database.GetBadgerInstance()); err != nil { panic(err) } else { approversDatabase = db @@ -91,10 +91,9 @@ func configureApproversDatabase() { func storeApproversInDatabase(approvers *approvers.Approvers) error { if approvers.GetModified() { - if err := approversDatabase.Set(typeutils.StringToBytes(approvers.GetHash()), approvers.Marshal()); err != nil { + if err := approversDatabase.Set(database.Entry{Key: typeutils.StringToBytes(approvers.GetHash()), Value: approvers.Marshal()}); err != nil { return fmt.Errorf("%w: failed to store approvers: %s", ErrDatabaseError, err) } - approvers.SetModified(false) } @@ -111,7 +110,7 @@ func getApproversFromDatabase(transactionHash trinary.Trytes) (*approvers.Approv } var result approvers.Approvers - if err = result.Unmarshal(approversData); err != nil { + if err = result.Unmarshal(approversData.Value); err != nil { panic(err) } diff --git a/plugins/tangle/bundle.go b/plugins/tangle/bundle.go index 55411764a2eacc04b73122a86b119352bc0d1cbd..2cb1fb896ae5cd5a8d6152a0e2398ef4651f4a87 100644 --- a/plugins/tangle/bundle.go +++ b/plugins/tangle/bundle.go @@ -86,7 +86,7 @@ const ( var bundleDatabase database.Database func configureBundleDatabase() { - if db, err := database.Get("bundle"); err != nil { + if db, err := database.Get(database.DBPrefixBundle, database.GetBadgerInstance()); err != nil { panic(err) } else { bundleDatabase = db @@ -95,10 +95,9 @@ func configureBundleDatabase() { func storeBundleInDatabase(bundle *bundle.Bundle) error { if bundle.GetModified() { - if err := bundleDatabase.Set(typeutils.StringToBytes(bundle.GetHash()), bundle.Marshal()); err != nil { + if err := bundleDatabase.Set(database.Entry{Key: typeutils.StringToBytes(bundle.GetHash()), Value: bundle.Marshal()}); err != nil { return fmt.Errorf("%w: failed to store bundle: %s", ErrDatabaseError, err) } - bundle.SetModified(false) } @@ -116,7 +115,7 @@ func getBundleFromDatabase(transactionHash trinary.Trytes) (*bundle.Bundle, erro } var result bundle.Bundle - if err = result.Unmarshal(bundleData); err != nil { + if err = result.Unmarshal(bundleData.Value); err != nil { panic(err) } diff --git a/plugins/tangle/plugin.go b/plugins/tangle/plugin.go index 252c2b987da1db3774e0def97e65582f879a8c6c..6c6bc09015f462d137784c7c9c5fad2a26c59929 100644 --- a/plugins/tangle/plugin.go +++ b/plugins/tangle/plugin.go @@ -1,11 +1,14 @@ package tangle import ( + "time" + "github.com/iotaledger/goshimmer/packages/database" "github.com/iotaledger/goshimmer/packages/shutdown" "github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/logger" "github.com/iotaledger/hive.go/node" + "github.com/iotaledger/hive.go/timeutil" "github.com/iotaledger/iota.go/trinary" ) @@ -42,6 +45,13 @@ func configure(*node.Plugin) { } func run(*node.Plugin) { + + daemon.BackgroundWorker("Badger garbage collection", func(shutdownSignal <-chan struct{}) { + timeutil.Ticker(func() { + database.CleanupBadgerInstance(log) + }, 5*time.Minute, shutdownSignal) + }, shutdown.ShutdownPriorityBadgerGarbageCollection) + runSolidifier() } diff --git a/plugins/tangle/solidifier.go b/plugins/tangle/solidifier.go index cfcb3da5f649a72afe0d2b49def62e342f8a9a94..098b30ba08c391242e26ac0b767a4820673fd15e 100644 --- a/plugins/tangle/solidifier.go +++ b/plugins/tangle/solidifier.go @@ -195,13 +195,12 @@ func processTransaction(transaction *value_transaction.ValueTransaction) { Events.TransactionStored.Trigger(transaction) // store transaction hash for address in DB - err := StoreTransactionHashForAddressInDatabase( + if err := StoreTransactionHashForAddressInDatabase( &TxHashForAddress{ Address: transaction.GetAddress(), TxHash: transaction.GetHash(), }, - ) - if err != nil { + ); err != nil { log.Errorw(err.Error()) } @@ -224,8 +223,7 @@ func processTransaction(transaction *value_transaction.ValueTransaction) { } // update the solidity flags of this transaction and its approvers - _, err = IsSolid(transaction) - if err != nil { + if _, err := IsSolid(transaction); err != nil { log.Errorf("Unable to check solidity: %s", err.Error()) return } diff --git a/plugins/tangle/solidifier_test.go b/plugins/tangle/solidifier_test.go index 850f87501e8dc43fccc6fddb55a74cf834e9ee8d..28a63bc25a1fdcd79f16fb2f82d2b66380088ae5 100644 --- a/plugins/tangle/solidifier_test.go +++ b/plugins/tangle/solidifier_test.go @@ -33,7 +33,7 @@ func init() { } func TestTangle(t *testing.T) { - dir, err := ioutil.TempDir("", "example") + dir, err := ioutil.TempDir("", t.Name()) require.NoError(t, err) defer os.Remove(dir) // use the tempdir for the database diff --git a/plugins/tangle/transaction.go b/plugins/tangle/transaction.go index fc2e2f02dd7cd3b18739eeb232f6b27b4bd791ae..bff7b6b726d8bda3b4f3fdc4278f0a3bbe80a254 100644 --- a/plugins/tangle/transaction.go +++ b/plugins/tangle/transaction.go @@ -83,7 +83,7 @@ const ( var transactionDatabase database.Database func configureTransactionDatabase() { - if db, err := database.Get("transaction"); err != nil { + if db, err := database.Get(database.DBPrefixTransaction, database.GetBadgerInstance()); err != nil { panic(err) } else { transactionDatabase = db @@ -92,10 +92,9 @@ func configureTransactionDatabase() { func storeTransactionInDatabase(transaction *value_transaction.ValueTransaction) error { if transaction.GetModified() { - if err := transactionDatabase.Set(typeutils.StringToBytes(transaction.GetHash()), transaction.MetaTransaction.GetBytes()); err != nil { + if err := transactionDatabase.Set(database.Entry{Key: typeutils.StringToBytes(transaction.GetHash()), Value: transaction.MetaTransaction.GetBytes()}); err != nil { return fmt.Errorf("%w: failed to store transaction: %s", ErrDatabaseError, err.Error()) } - transaction.SetModified(false) } @@ -111,7 +110,7 @@ func getTransactionFromDatabase(transactionHash trinary.Trytes) (*value_transact return nil, fmt.Errorf("%w: failed to retrieve transaction: %s", ErrDatabaseError, err) } - return value_transaction.FromBytes(txData), nil + return value_transaction.FromBytes(txData.Value), nil } func databaseContainsTransaction(transactionHash trinary.Trytes) (bool, error) { diff --git a/plugins/tangle/transaction_metadata.go b/plugins/tangle/transaction_metadata.go index 94d6177af989c666a75d2b1c1e75bd1c252166bc..152f0126da1f00dd741d2c9953743a8010e9f764 100644 --- a/plugins/tangle/transaction_metadata.go +++ b/plugins/tangle/transaction_metadata.go @@ -83,7 +83,7 @@ const ( var transactionMetadataDatabase database.Database func configureTransactionMetaDataDatabase() { - if db, err := database.Get("transactionMetadata"); err != nil { + if db, err := database.Get(database.DBPrefixTransactionMetadata, database.GetBadgerInstance()); err != nil { panic(err) } else { transactionMetadataDatabase = db @@ -95,7 +95,7 @@ func storeTransactionMetadataInDatabase(metadata *transactionmetadata.Transactio if marshaledMetadata, err := metadata.Marshal(); err != nil { return err } else { - if err := transactionMetadataDatabase.Set(typeutils.StringToBytes(metadata.GetHash()), marshaledMetadata); err != nil { + if err := transactionMetadataDatabase.Set(database.Entry{Key: typeutils.StringToBytes(metadata.GetHash()), Value: marshaledMetadata}); err != nil { return fmt.Errorf("%w: failed to store transaction metadata: %s", ErrDatabaseError, err) } @@ -116,7 +116,7 @@ func getTransactionMetadataFromDatabase(transactionHash trinary.Trytes) (*transa } var result transactionmetadata.TransactionMetadata - if err := result.Unmarshal(txMetadata); err != nil { + if err := result.Unmarshal(txMetadata.Value); err != nil { panic(err) } diff --git a/plugins/tangle/tx_per_address.go b/plugins/tangle/tx_per_address.go index 1ab4950e6c79405e99e4acf3242f4ebc6591eb94..092a75248419251137c6205be0abf108b9b495df 100644 --- a/plugins/tangle/tx_per_address.go +++ b/plugins/tangle/tx_per_address.go @@ -13,7 +13,7 @@ var ( ) func configureTransactionHashesForAddressDatabase() { - if db, err := database.Get("transactionsHashesForAddress"); err != nil { + if db, err := database.Get(database.DBPrefixAddressTransactions, database.GetBadgerInstance()); err != nil { panic(err) } else { transactionsHashesForAddressDatabase = db @@ -26,10 +26,10 @@ type TxHashForAddress struct { } func StoreTransactionHashForAddressInDatabase(address *TxHashForAddress) error { - if err := transactionsHashesForAddressDatabase.Set( - databaseKeyForHashPrefixedHash(address.Address, address.TxHash), - []byte{}, - ); err != nil { + if err := transactionsHashesForAddressDatabase.Set(database.Entry{ + Key: databaseKeyForHashPrefixedHash(address.Address, address.TxHash), + Value: []byte{}, + }); err != nil { return fmt.Errorf("%w: failed to store tx for address in database: %s", ErrDatabaseError, err) } return nil @@ -47,11 +47,9 @@ func DeleteTransactionHashForAddressInDatabase(address *TxHashForAddress) error func ReadTransactionHashesForAddressFromDatabase(address trinary.Hash) ([]trinary.Hash, error) { var transactionHashes []trinary.Hash - err := transactionsHashesForAddressDatabase.ForEachWithPrefix(databaseKeyForHashPrefix(address), func(key []byte, value []byte) { - k := typeutils.BytesToString(key) - if len(k) > 81 { - transactionHashes = append(transactionHashes, k[81:]) - } + err := transactionsHashesForAddressDatabase.StreamForEachPrefixKeyOnly(databaseKeyForHashPrefix(address), func(key database.KeyOnlyEntry) error { + transactionHashes = append(transactionHashes, typeutils.BytesToString(key.Key)) + return nil }) if err != nil { diff --git a/plugins/webapi/spammer/plugin.go b/plugins/webapi/spammer/plugin.go index 97b9056a44c3c4b9ea35e8a21a9ae30e758b0048..fdc1aca338a2ddbbfdbcd666ce9ac802866b63be 100644 --- a/plugins/webapi/spammer/plugin.go +++ b/plugins/webapi/spammer/plugin.go @@ -61,5 +61,5 @@ type Response struct { type Request struct { Cmd string `json:"cmd"` - Tps uint `json:"tps"` + Tps uint64 `json:"tps"` } diff --git a/tools/relay-checker/config.go b/tools/relay-checker/config.go new file mode 100644 index 0000000000000000000000000000000000000000..9096faf6f22fe6d7349d5a473c167ce04e17ca3f --- /dev/null +++ b/tools/relay-checker/config.go @@ -0,0 +1,46 @@ +package main + +import ( + "github.com/iotaledger/goshimmer/packages/parameter" +) + +var ( + nodes []string + target = "" + txnAddr = "GOSHIMMER99TEST999999999999999999999999999999999999999999999999999999999999999999" + txnData = "TEST99BROADCAST99DATA" + cooldown = 2 + maxQuery = 1 +) + +func LoadConfig() { + if err := parameter.FetchConfig(false); err != nil { + panic(err) + } +} + +func SetConfig() { + if parameter.NodeConfig.GetString(CFG_TARGET_NODE) == "" { + panic("Set the target node address\n") + } + target = parameter.NodeConfig.GetString(CFG_TARGET_NODE) + + if len(parameter.NodeConfig.GetStringSlice(CFG_TEST_NODES)) == 0 { + panic("Set node addresses\n") + } + nodes = append(nodes, parameter.NodeConfig.GetStringSlice(CFG_TEST_NODES)...) + + // optional settings + if parameter.NodeConfig.GetString(CFG_TXN_ADDRESS) != "" { + txnAddr = parameter.NodeConfig.GetString(CFG_TXN_ADDRESS) + } + if parameter.NodeConfig.GetString(CFG_DATA) != "" { + txnData = parameter.NodeConfig.GetString(CFG_DATA) + } + if parameter.NodeConfig.GetInt(CFG_COOL_DOWN_TIME) > 0 { + cooldown = parameter.NodeConfig.GetInt(CFG_COOL_DOWN_TIME) + } + if parameter.NodeConfig.GetInt(CFG_MAX_QUERY) > 0 { + maxQuery = parameter.NodeConfig.GetInt(CFG_MAX_QUERY) + } +} diff --git a/tools/relay-checker/config.json b/tools/relay-checker/config.json new file mode 100644 index 0000000000000000000000000000000000000000..d7c0df551297552275129aa199deb983d1b5bc98 --- /dev/null +++ b/tools/relay-checker/config.json @@ -0,0 +1,12 @@ +{ + "relaycheck": { + "targetnode": "http://127.0.0.1:8080", + "nodes": [ + "http://127.0.0.1:8080" + ], + "txnaddress": "SHIMMER99TEST99999999999999999999999999999999999999999999999999999999999999999999", + "data": "TEST99BROADCAST99DATA", + "cooldowntime": 10, + "maxquery": 2 + } +} diff --git a/tools/relay-checker/main.go b/tools/relay-checker/main.go new file mode 100644 index 0000000000000000000000000000000000000000..189babf7ea79072579c5872a5209a9853989109a --- /dev/null +++ b/tools/relay-checker/main.go @@ -0,0 +1,72 @@ +package main + +import ( + "fmt" + "time" + + client "github.com/iotaledger/goshimmer/client" + "github.com/iotaledger/goshimmer/packages/errors" + "github.com/iotaledger/iota.go/trinary" +) + +func testBroadcastData(api *client.GoShimmerAPI) (trinary.Hash, error) { + txnHash, err := api.BroadcastData(txnAddr, txnData) + if err != nil { + return "", errors.Wrapf(err, "Broadcast failed") + } + return txnHash, nil +} + +func testTargetGetTransactions(api *client.GoShimmerAPI, txnHash trinary.Hash) error { + // query target node for broadcasted data + _, err := api.GetTransactions([]trinary.Hash{txnHash}) + if err != nil { + return errors.Wrapf(err, "Query target failed") + } + return nil +} + +func testNodesGetTransactions(txnHash trinary.Hash) error { + // query nodes node for broadcasted data + for _, n := range nodes { + nodesApi := client.NewGoShimmerAPI(n) + _, err := nodesApi.GetTransactions([]trinary.Hash{txnHash}) + if err != nil { + return errors.Wrapf(err, "Query %s failed", n) + } + fmt.Printf("txn found in %s\n", n) + } + return nil +} + +func main() { + LoadConfig() + SetConfig() + + api := client.NewGoShimmerAPI(target) + for i := 0; i < maxQuery; i++ { + txnHash, err := testBroadcastData(api) + if err != nil { + fmt.Printf("%s\n", err) + break + } + fmt.Printf("txnHash: %s\n", txnHash) + + // cooldown time + time.Sleep(time.Duration(cooldown) * time.Second) + + // query target node + err = testTargetGetTransactions(api, txnHash) + if err != nil { + fmt.Printf("%s\n", err) + break + } + + // query nodes node + err = testNodesGetTransactions(txnHash) + if err != nil { + fmt.Printf("%s\n", err) + break + } + } +} diff --git a/tools/relay-checker/parameters.go b/tools/relay-checker/parameters.go new file mode 100644 index 0000000000000000000000000000000000000000..b1ee2b9b5ab6fa5885237e980ff329a33b4bdd13 --- /dev/null +++ b/tools/relay-checker/parameters.go @@ -0,0 +1,23 @@ +package main + +import ( + flag "github.com/spf13/pflag" +) + +const ( + CFG_TARGET_NODE = "relaycheck.targetNode" + CFG_TEST_NODES = "relaycheck.nodes" + CFG_TXN_ADDRESS = "relaycheck.txnAddress" + CFG_DATA = "relaycheck.data" + CFG_COOL_DOWN_TIME = "relaycheck.cooldownTime" + CFG_MAX_QUERY = "relaycheck.maxQuery" +) + +func init() { + flag.StringSlice(CFG_TEST_NODES, []string{""}, "list of trusted entry nodes for auto peering") + flag.String(CFG_TARGET_NODE, "http://127.0.0.1:8080", "target node to test") + flag.String(CFG_TXN_ADDRESS, "SHIMMER99TEST99999999999999999999999999999999999999999999999999999999999999999999", "transaction address") + flag.String(CFG_DATA, "TEST99BROADCAST99DATA", "data to broadcast") + flag.Int(CFG_COOL_DOWN_TIME, 10, "cooldown time after broadcast data") + flag.Int(CFG_MAX_QUERY, 1, "the repeat times of relay-checker") +}