diff --git a/go.mod b/go.mod index 83b07b2bc429e6e02a87d7f3ce2edf306558f01d..bc1fe018a2bb3db8acc2fe4579e1687eb42ad672 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,6 @@ module github.com/iotaledger/goshimmer go 1.13 require ( - github.com/StabbyCutyou/buffstreams v2.0.0+incompatible - github.com/capossele/gossip v0.0.0-20191205112840-0e578079b414 github.com/dgraph-io/badger v1.6.0 github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/gdamore/tcell v1.3.0 @@ -12,8 +10,8 @@ require ( github.com/golang/protobuf v1.3.2 github.com/google/open-location-code/go v0.0.0-20190903173953-119bc96a3a51 github.com/gorilla/websocket v1.4.1 - github.com/iotaledger/autopeering-sim v0.0.0-20191203092805-a1dd5954f3f6 - github.com/iotaledger/hive.go v0.0.0-20191202111738-357cee7a1c37 + github.com/iotaledger/autopeering-sim v0.0.0-20191206120939-725ee12834dd + github.com/iotaledger/hive.go v0.0.0-20191206003239-3231c4584e5c github.com/iotaledger/iota.go v1.0.0-beta.10 github.com/labstack/echo v3.3.10+incompatible github.com/lucasb-eyer/go-colorful v1.0.3 // indirect @@ -32,10 +30,10 @@ require ( go.uber.org/zap v1.13.0 golang.org/x/crypto v0.0.0-20191122220453-ac88ee75c92c golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f // indirect - golang.org/x/net v0.0.0-20191126235420-ef20fe5d7933 + golang.org/x/net v0.0.0-20191206103017-1ddd1de85cb0 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e // indirect - golang.org/x/sys v0.0.0-20191128015809-6d18c012aee9 // indirect - golang.org/x/tools v0.0.0-20191203134012-c197fd4bf371 // indirect + golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e // indirect + golang.org/x/tools v0.0.0-20191205225056-3393d29bb9fe // indirect golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898 // indirect gopkg.in/yaml.v2 v2.2.7 // indirect ) diff --git a/go.sum b/go.sum index ee5fc3160896cc63b93b629dc2264347070b1236..3db57d9a3445241419ef4db1021f37cfc2c41f76 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,4 @@ +cloud.google.com/go v0.26.0 h1:e0WKqKTd5BnrG8aKH3J3h+QvEIQtSUcf2n5UZ5ZgLtQ= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 h1:cTp8I5+VIoKjsnZuH8vjyaysT/ses3EvZeaV/1UkF2M= @@ -8,8 +9,6 @@ github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q github.com/GeertJohan/go.incremental v1.0.0/go.mod h1:6fAjUhbVuX1KcMD3c8TEgVUqmo4seqhv0i0kdATSkM0= github.com/GeertJohan/go.rice v1.0.0/go.mod h1:eH6gbSOAUv07dQuZVnBmoDP8mgsM1rtixis4Tib9if0= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= -github.com/StabbyCutyou/buffstreams v2.0.0+incompatible h1:7bDqOlL2Ipve3YjZVVrWz/TOVOa+IEHj6XagpAtLn2I= -github.com/StabbyCutyou/buffstreams v2.0.0+incompatible/go.mod h1:gP6wgxHoC0NrobhUBwx+Y6hx2QyKFOT+ho8619aJCDk= github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -18,10 +17,6 @@ github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5 github.com/beevik/ntp v0.2.0/go.mod h1:hIHWr+l3+/clUnF44zdK+CWW7fO8dR5cIylAQ76NRpg= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= -github.com/capossele/gossip v0.0.0-20191204191545-36eddf08c1aa h1:46C1Ce+93zGZ+JJ4JUw3EuXHQXqKYzIX7+CRG0fweNk= -github.com/capossele/gossip v0.0.0-20191204191545-36eddf08c1aa/go.mod h1:P8rJEmorO5QpCL236F8vNhUFwFFjezt2d/+/LeRlELA= -github.com/capossele/gossip v0.0.0-20191205112840-0e578079b414 h1:C9Q279xU15Qt5WVZNH6t/1L7exbTVYp1uMRS9NSmL/U= -github.com/capossele/gossip v0.0.0-20191205112840-0e578079b414/go.mod h1:DnYLNZclq7cY6s2oA6wwhQ4tDB0j38enJHPrhpzOpJc= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= @@ -99,18 +94,17 @@ github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= -github.com/iotaledger/autopeering-sim v0.0.0-20191202124431-1705dc628175 h1:/RAxj+VEPTykp9cWRKTtydkfuBDaFemI7/bdNYCD7Nw= -github.com/iotaledger/autopeering-sim v0.0.0-20191202124431-1705dc628175/go.mod h1:JiaqaxLkQVnd8e/sya9y/LlRW56WlRKRl2TQXQCVssI= -github.com/iotaledger/autopeering-sim v0.0.0-20191202192349-f8e7a238c2bb/go.mod h1:JiaqaxLkQVnd8e/sya9y/LlRW56WlRKRl2TQXQCVssI= -github.com/iotaledger/autopeering-sim v0.0.0-20191202212322-a6353b992662 h1:al3bYvSaJnFyupe7bqgXYeBZ7W0fTNWrFd9WLLNlC6I= -github.com/iotaledger/autopeering-sim v0.0.0-20191202212322-a6353b992662/go.mod h1:JiaqaxLkQVnd8e/sya9y/LlRW56WlRKRl2TQXQCVssI= github.com/iotaledger/autopeering-sim v0.0.0-20191203092805-a1dd5954f3f6 h1:lcM/irmEr++tz/XQCHCTBsteqiBVZ3uTurLnnviCxLg= github.com/iotaledger/autopeering-sim v0.0.0-20191203092805-a1dd5954f3f6/go.mod h1:JiaqaxLkQVnd8e/sya9y/LlRW56WlRKRl2TQXQCVssI= +github.com/iotaledger/autopeering-sim v0.0.0-20191206120939-725ee12834dd h1:CTHnqb0UdC+EwHBn20TeGvYj5F44zoZTfPOX/vEmSzQ= +github.com/iotaledger/autopeering-sim v0.0.0-20191206120939-725ee12834dd/go.mod h1:JiaqaxLkQVnd8e/sya9y/LlRW56WlRKRl2TQXQCVssI= github.com/iotaledger/goshimmer v0.0.0-20191113134331-c2d1b2f9d533/go.mod h1:7vYiofXphp9+PkgVAEM0pvw3aoi4ksrZ7lrEgX50XHs= github.com/iotaledger/hive.go v0.0.0-20191118130432-89eebe8fe8eb h1:nuS/LETRJ8obUyBIZeyxeei0ZPlyOMj8YPziOgSM4Og= github.com/iotaledger/hive.go v0.0.0-20191118130432-89eebe8fe8eb/go.mod h1:1Thhlil4lHzuy53EVvmEbEvWBFY0Tasp4kCBfxBCPIk= github.com/iotaledger/hive.go v0.0.0-20191202111738-357cee7a1c37 h1:Vex6W5Oae7xXvVmnCrl7J4o+PCx0FW3paMzXxQDr8H4= github.com/iotaledger/hive.go v0.0.0-20191202111738-357cee7a1c37/go.mod h1:1Thhlil4lHzuy53EVvmEbEvWBFY0Tasp4kCBfxBCPIk= +github.com/iotaledger/hive.go v0.0.0-20191206003239-3231c4584e5c h1:kmx6rRpMkeO+5gqj8ngv1+0Z7MGxulV3SCP2SZn/mU4= +github.com/iotaledger/hive.go v0.0.0-20191206003239-3231c4584e5c/go.mod h1:7iqun29a1x0lymTrn0UJ3Z/yy0sUzUpoOZ1OYMrYN20= github.com/iotaledger/iota.go v1.0.0-beta.7/go.mod h1:dMps6iMVU1pf5NDYNKIw4tRsPeC8W3ZWjOvYHOO1PMg= github.com/iotaledger/iota.go v1.0.0-beta.9 h1:c654s9pkdhMBkABUvWg+6k91MEBbdtmZXP1xDfQpajg= github.com/iotaledger/iota.go v1.0.0-beta.9/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8= @@ -286,6 +280,8 @@ golang.org/x/net v0.0.0-20191119073136-fc4aabc6c914 h1:MlY3mEfbnWGmUi4rtHOtNnnnN golang.org/x/net v0.0.0-20191119073136-fc4aabc6c914/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191126235420-ef20fe5d7933 h1:e6HwijUxhDe+hPNjZQQn9bA5PW3vNmnN64U2ZW759Lk= golang.org/x/net v0.0.0-20191126235420-ef20fe5d7933/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20191206103017-1ddd1de85cb0 h1:LxY/gQN/MrcW24/46nLyiip1GhN/Yi14QPbeNskTvQA= +golang.org/x/net v0.0.0-20191206103017-1ddd1de85cb0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -315,8 +311,8 @@ golang.org/x/sys v0.0.0-20191018095205-727590c5006e h1:ZtoklVMHQy6BFRHkbG6JzK+S6 golang.org/x/sys v0.0.0-20191018095205-727590c5006e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e h1:N7DeIrjYszNmSW409R3frPPwglRwMkXSBzwVbkOjLLA= golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191128015809-6d18c012aee9 h1:ZBzSG/7F4eNKz2L3GE9o300RX0Az1Bw5HF7PDraD+qU= -golang.org/x/sys v0.0.0-20191128015809-6d18c012aee9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e h1:9vRrk9YW2BTzLP0VCB9ZDjU4cPqkg+IDWL7XgxA1yxQ= +golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= @@ -331,19 +327,19 @@ golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191121040551-947d4aa89328 h1:t3X42h9e6xdbrCD/gPyWqAXr2BEpdJqRd1brThaaxII= golang.org/x/tools v0.0.0-20191121040551-947d4aa89328/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20191130070609-6e064ea0cf2d h1:/iIZNFGxc/a7C3yWjGcnboV+Tkc7mxr+p6fDztwoxuM= -golang.org/x/tools v0.0.0-20191130070609-6e064ea0cf2d/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20191202203127-2b6af5f9ace7 h1:I7bfRTrfnb7yQSesz6OhwGVh2imeNUcbbS8YtFYC8Ck= -golang.org/x/tools v0.0.0-20191202203127-2b6af5f9ace7/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191203134012-c197fd4bf371 h1:Cjq6sG3gnKDchzWy7ouGQklhxMtWvh4AhSNJ0qGIeo4= golang.org/x/tools v0.0.0-20191203134012-c197fd4bf371/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191205225056-3393d29bb9fe h1:BEVcKURC7E0EF+vD1l52Jb3LOM5Iwu7OI5FpdPuU50o= +golang.org/x/tools v0.0.0-20191205225056-3393d29bb9fe/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898 h1:/atklqdjdhuosWIl6AIbOeHJjicWYPqR9bpxqxYG2pA= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.21.0 h1:G+97AoqBnmZIT91cLG/EkCoK9NSelj64P8bOHHNmGn0= google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/main.go b/main.go index 2d313d68467dd7608bca855ec349d279762c30f1..1f8fa01c4ed805109575588fcc219eab210b72df 100644 --- a/main.go +++ b/main.go @@ -23,6 +23,15 @@ import ( ) func main() { + // go func() { + // if err := profiler.Start(profiler.Config{ + // Service: "race-service", + // ServiceVersion: "1.0", + // ProjectID: "premium-canyon-232915", // optional on GCP + // }); err != nil { + // log.Fatalf("Cannot start the profiler: %v", err) + // } + // }() node.Run( cli.PLUGIN, autopeering.PLUGIN, diff --git a/packages/gossip/errors.go b/packages/gossip/errors.go new file mode 100644 index 0000000000000000000000000000000000000000..66a2a23e41387327f5a61d936258112383528095 --- /dev/null +++ b/packages/gossip/errors.go @@ -0,0 +1,8 @@ +package gossip + +import "github.com/pkg/errors" + +var ( + ErrClosed = errors.New("manager closed") + ErrDuplicateNeighbor = errors.New("peer already connected") +) diff --git a/packages/gossip/events.go b/packages/gossip/events.go index 4c4e34a0675f6632d1a52c60f0626cbec28ed8a7..e8f6642b967922b49462611dff162ff53fc1bcf2 100644 --- a/packages/gossip/events.go +++ b/packages/gossip/events.go @@ -6,9 +6,13 @@ import ( ) // Events contains all the events that are triggered during the gossip protocol. -type Events struct { +var Events = struct { + // A NewTransaction event is triggered when a new transaction is received by the gossip protocol. NewTransaction *events.Event DropNeighbor *events.Event +}{ + NewTransaction: events.NewEvent(newTransaction), + DropNeighbor: events.NewEvent(dropNeighbor), } type NewTransactionEvent struct { diff --git a/packages/gossip/manager.go b/packages/gossip/manager.go index 681f6b07c89cc888fe27f66c1d467179607514e8..52efd9ae171a3c0b38b136bea60b7ef662d1278e 100644 --- a/packages/gossip/manager.go +++ b/packages/gossip/manager.go @@ -1,48 +1,52 @@ package gossip import ( + "io" "net" + "strings" + "sync" "github.com/golang/protobuf/proto" "github.com/iotaledger/autopeering-sim/peer" - "github.com/iotaledger/goshimmer/packages/gossip/neighbor" pb "github.com/iotaledger/goshimmer/packages/gossip/proto" "github.com/iotaledger/goshimmer/packages/gossip/transport" - "github.com/iotaledger/hive.go/events" "github.com/pkg/errors" "go.uber.org/zap" ) const ( - maxAttempts = 3 -) - -var ( - Event Events + maxConnectionAttempts = 3 + maxPacketSize = 2048 ) type GetTransaction func(txHash []byte) ([]byte, error) type Manager struct { - neighborhood *neighbor.NeighborMap - trans *transport.TransportTCP + trans *transport.TCP log *zap.SugaredLogger getTransaction GetTransaction - Events Events + + wg sync.WaitGroup + + mu sync.RWMutex + neighbors map[peer.ID]*neighbor + running bool +} + +type neighbor struct { + peer *peer.Peer + conn *transport.Connection } -func NewManager(t *transport.TransportTCP, log *zap.SugaredLogger, f GetTransaction) *Manager { - mgr := &Manager{ - neighborhood: neighbor.NewMap(), +func NewManager(t *transport.TCP, log *zap.SugaredLogger, f GetTransaction) *Manager { + m := &Manager{ trans: t, log: log, getTransaction: f, - Events: Events{ - NewTransaction: events.NewEvent(newTransaction), - DropNeighbor: events.NewEvent(dropNeighbor)}, + neighbors: make(map[peer.ID]*neighbor), } - Event = mgr.Events - return mgr + m.running = true + return m } func (m *Manager) AddOutbound(p *peer.Peer) error { @@ -53,41 +57,78 @@ func (m *Manager) AddInbound(p *peer.Peer) error { return m.addNeighbor(p, m.trans.AcceptPeer) } -func (m *Manager) DropNeighbor(id peer.ID) { - m.deleteNeighbor(id) +func (m *Manager) DropNeighbor(p peer.ID) { + m.deleteNeighbor(p) } -func (m *Manager) RequestTransaction(data []byte, to ...*neighbor.Neighbor) { - req := &pb.TransactionRequest{} - err := proto.Unmarshal(data, req) - if err != nil { - m.log.Warnw("Data to send is not a Transaction Request", "err", err) +func (m *Manager) Close() { + m.mu.Lock() + m.running = false + // close all connections + for _, n := range m.neighbors { + _ = n.conn.Close() } - msg := marshal(req) + m.mu.Unlock() - m.send(msg, to...) + m.wg.Wait() } -func (m *Manager) Send(data []byte, to ...*neighbor.Neighbor) { - tx := &pb.Transaction{} - err := proto.Unmarshal(data, tx) - if err != nil { - m.log.Warnw("Data to send is not a Transaction", "err", err) +func (m *Manager) getNeighbors(ids ...peer.ID) []*neighbor { + if len(ids) > 0 { + return m.getNeighborsById(ids) + } + return m.getAllNeighbors() +} + +func (m *Manager) getAllNeighbors() []*neighbor { + m.mu.Lock() + result := make([]*neighbor, 0, len(m.neighbors)) + for _, n := range m.neighbors { + result = append(result, n) + } + m.mu.Unlock() + + return result +} + +func (m *Manager) getNeighborsById(ids []peer.ID) []*neighbor { + result := make([]*neighbor, 0, len(ids)) + + m.mu.RLock() + for _, id := range ids { + if n, ok := m.neighbors[id]; ok { + result = append(result, n) + } + } + m.mu.RUnlock() + + return result +} + +func (m *Manager) RequestTransaction(txHash []byte, to ...peer.ID) { + req := &pb.TransactionRequest{ + Hash: txHash, } - msg := marshal(tx) + m.send(marshal(req), to...) +} - m.send(msg, to...) +// SendTransaction sends the transaction data to the given neighbors. +func (m *Manager) SendTransaction(txData []byte, to ...peer.ID) { + tx := &pb.Transaction{ + Body: txData, + } + m.send(marshal(tx), to...) } -func (m *Manager) send(msg []byte, to ...*neighbor.Neighbor) { - neighbors := m.neighborhood.GetSlice() - if to != nil { - neighbors = to +func (m *Manager) send(msg []byte, to ...peer.ID) { + if l := len(msg); l > maxPacketSize { + m.log.Errorw("message too large", "len", l, "max", maxPacketSize) } + neighbors := m.getNeighbors(to...) - for _, neighbor := range neighbors { - m.log.Debugw("Sending", "to", neighbor.Peer.ID().String(), "msg", msg) - err := neighbor.Conn.Write(msg) + for _, nbr := range neighbors { + m.log.Debugw("Sending", "to", nbr.peer.ID(), "msg", msg) + _, err := nbr.conn.Write(msg) if err != nil { m.log.Debugw("send error", "err", err) } @@ -95,67 +136,89 @@ func (m *Manager) send(msg []byte, to ...*neighbor.Neighbor) { } func (m *Manager) addNeighbor(peer *peer.Peer, handshake func(*peer.Peer) (*transport.Connection, error)) error { - if _, ok := m.neighborhood.Load(peer.ID().String()); ok { - return errors.New("Neighbor already added") - } - - var err error - var conn *transport.Connection - i := 0 - for i = 0; i < maxAttempts; i++ { + var ( + err error + conn *transport.Connection + ) + for i := 0; i < maxConnectionAttempts; i++ { conn, err = handshake(peer) - if err != nil { - m.log.Warnw("Connection attempt failed", "attempt", i+1) - } else { + if err == nil { break } } - if i == maxAttempts { - m.log.Warnw("Connection failed to", "peer", peer.ID().String()) - m.Events.DropNeighbor.Trigger(&DropNeighborEvent{Peer: peer}) + + // could not add neighbor + if err != nil { + m.log.Debugw("addNeighbor failed", "peer", peer.ID(), "err", err) + Events.DropNeighbor.Trigger(&DropNeighborEvent{Peer: peer}) return err } - // add the new neighbor - neighbor := neighbor.New(peer, conn) - m.neighborhood.Store(peer.ID().String(), neighbor) + m.mu.Lock() + defer m.mu.Unlock() + if !m.running { + disconnect(conn) + return ErrClosed + } + if _, ok := m.neighbors[peer.ID()]; ok { + disconnect(conn) + return ErrDuplicateNeighbor + } - // start listener for the new neighbor - go m.readLoop(neighbor) + // add the neighbor + n := &neighbor{ + peer: peer, + conn: conn, + } + m.neighbors[peer.ID()] = n + go m.readLoop(n) return nil } -func (m *Manager) deleteNeighbor(id peer.ID) { - m.log.Debugw("Deleting neighbor", "neighbor", id.String()) - - p, ok := m.neighborhood.Delete(id.String()) - if ok { - m.Events.DropNeighbor.Trigger(&DropNeighborEvent{Peer: p.Peer}) +func (m *Manager) deleteNeighbor(peer peer.ID) { + m.mu.Lock() + defer m.mu.Unlock() + if _, ok := m.neighbors[peer]; !ok { + return } + + n := m.neighbors[peer] + delete(m.neighbors, peer) + disconnect(n.conn) } -func (m *Manager) readLoop(neighbor *neighbor.Neighbor) { +func (m *Manager) readLoop(nbr *neighbor) { + m.wg.Add(1) + defer m.wg.Done() + + // create a buffer for the packages + b := make([]byte, maxPacketSize) + for { - data, err := neighbor.Conn.Read() + n, err := nbr.conn.Read(b) if nerr, ok := err.(net.Error); ok && nerr.Temporary() { // ignore temporary read errors. - //m.log.Debugw("temporary read error", "err", err) + m.log.Debugw("temporary read error", "err", err) continue } else if err != nil { // return from the loop on all other errors - m.log.Debugw("reading stopped") - m.deleteNeighbor(neighbor.Peer.ID()) - + if err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") { + m.log.Warnw("read error", "err", err) + } + _ = nbr.conn.Close() // just make sure that the connection is closed as fast as possible + m.deleteNeighbor(nbr.peer.ID()) + m.log.Debug("reading stopped") return } - if err := m.handlePacket(data, neighbor); err != nil { - m.log.Warnw("failed to handle packet", "from", neighbor.Peer.ID().String(), "err", err) + + if err := m.handlePacket(b[:n], nbr); err != nil { + m.log.Warnw("failed to handle packet", "id", nbr.peer.ID(), "err", err) } } } -func (m *Manager) handlePacket(data []byte, neighbor *neighbor.Neighbor) error { +func (m *Manager) handlePacket(data []byte, n *neighbor) error { switch pb.MType(data[0]) { // Incoming Transaction @@ -165,7 +228,7 @@ func (m *Manager) handlePacket(data []byte, neighbor *neighbor.Neighbor) error { return errors.Wrap(err, "invalid message") } m.log.Debugw("Received Transaction", "data", msg.GetBody()) - m.Events.NewTransaction.Trigger(&NewTransactionEvent{Body: msg.GetBody(), Peer: neighbor.Peer}) + Events.NewTransaction.Trigger(&NewTransactionEvent{Body: msg.GetBody(), Peer: n.peer}) // Incoming Transaction request case pb.MTransactionRequest: @@ -180,11 +243,10 @@ func (m *Manager) handlePacket(data []byte, neighbor *neighbor.Neighbor) error { m.log.Debugw("Tx not available", "tx", msg.GetHash()) } else { m.log.Debugw("Tx found", "tx", tx) - m.Send(tx, neighbor) + m.SendTransaction(tx, n.peer.ID()) } default: - return nil } return nil @@ -202,3 +264,8 @@ func marshal(msg pb.Message) []byte { } return append([]byte{byte(mType)}, data...) } + +func disconnect(conn *transport.Connection) { + _ = conn.Close() + Events.DropNeighbor.Trigger(&DropNeighborEvent{Peer: conn.Peer()}) +} diff --git a/packages/gossip/manager_test.go b/packages/gossip/manager_test.go index 51df26e9ebeca0d85336c2354ba188d39b732ab6..5d4a1e6b8f50ea1b092d2f22e40d6392cac77f5c 100644 --- a/packages/gossip/manager_test.go +++ b/packages/gossip/manager_test.go @@ -13,13 +13,32 @@ import ( "github.com/iotaledger/goshimmer/packages/gossip/transport" "github.com/iotaledger/hive.go/events" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/zap" ) -const graceTime = 5 * time.Millisecond +const graceTime = 10 * time.Millisecond -var logger *zap.SugaredLogger +var ( + logger *zap.SugaredLogger + eventMock mock.Mock + + testTxData = []byte("testTx") +) + +func newTransactionEvent(ev *NewTransactionEvent) { eventMock.Called(ev) } +func dropNeighborEvent(ev *DropNeighborEvent) { eventMock.Called(ev) } + +// assertEvents initializes the mock and asserts the expectations +func assertEvents(t *testing.T) func() { + eventMock = mock.Mock{} + return func() { + if !t.Failed() { + eventMock.AssertExpectations(t) + } + } +} func init() { l, err := zap.NewDevelopment() @@ -27,31 +46,33 @@ func init() { log.Fatalf("cannot initialize logger: %v", err) } logger = l.Sugar() + + // mock the events + Events.NewTransaction.Attach(events.NewClosure(newTransactionEvent)) + Events.DropNeighbor.Attach(events.NewClosure(dropNeighborEvent)) } -func testGetTransaction([]byte) ([]byte, error) { - tx := &pb.TransactionRequest{ - Hash: []byte("testTx"), - } - b, _ := proto.Marshal(tx) - return b, nil + +func getTestTransaction([]byte) ([]byte, error) { + return testTxData, nil } func newTest(t require.TestingT, name string) (*Manager, func(), *peer.Peer) { - log := logger.Named(name) - db := peer.NewMemoryDB(log.Named("db")) + l := logger.Named(name) + db := peer.NewMemoryDB(l.Named("db")) local, err := peer.NewLocal("peering", name, db) require.NoError(t, err) require.NoError(t, local.UpdateService(service.GossipKey, "tcp", "localhost:0")) - trans, err := transport.Listen(local, log) + trans, err := transport.Listen(local, l) require.NoError(t, err) - mgr := NewManager(trans, log, testGetTransaction) + mgr := NewManager(trans, l, getTestTransaction) // update the service with the actual address require.NoError(t, local.UpdateService(service.GossipKey, trans.LocalAddr().Network(), trans.LocalAddr().String())) teardown := func() { + mgr.Close() trans.Close() db.Close() } @@ -59,11 +80,15 @@ func newTest(t require.TestingT, name string) (*Manager, func(), *peer.Peer) { } func TestClose(t *testing.T) { + defer assertEvents(t)() + _, teardown, _ := newTest(t, "A") teardown() } -func TestUnicast(t *testing.T) { +func TestClosedConnection(t *testing.T) { + defer assertEvents(t)() + mgrA, closeA, peerA := newTest(t, "A") defer closeA() mgrB, closeB, peerB := newTest(t, "B") @@ -72,6 +97,8 @@ func TestUnicast(t *testing.T) { var wg sync.WaitGroup wg.Add(2) + // connect in the following way + // B -> A go func() { defer wg.Done() err := mgrA.addNeighbor(peerB, mgrA.trans.AcceptPeer) @@ -87,25 +114,100 @@ func TestUnicast(t *testing.T) { // wait for the connections to establish wg.Wait() - tx := &pb.Transaction{Body: []byte("Hello!")} + eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerA}).Once() + eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerB}).Once() - triggered := make(chan struct{}, 1) - mgrB.Events.NewTransaction.Attach(events.NewClosure(func(ev *NewTransactionEvent) { - require.Empty(t, triggered) // only once - assert.Equal(t, tx.GetBody(), ev.Body) - assert.Equal(t, peerA, ev.Peer) - triggered <- struct{}{} - })) + // A drops B + mgrA.deleteNeighbor(peerB.ID()) + time.Sleep(graceTime) - b, err := proto.Marshal(tx) - require.NoError(t, err) - mgrA.Send(b) + // the events should be there even before we close + eventMock.AssertExpectations(t) +} + +func TestP2PSend(t *testing.T) { + defer assertEvents(t)() + + mgrA, closeA, peerA := newTest(t, "A") + defer closeA() + mgrB, closeB, peerB := newTest(t, "B") + defer closeB() + + var wg sync.WaitGroup + wg.Add(2) + + // connect in the following way + // B -> A + go func() { + defer wg.Done() + err := mgrA.addNeighbor(peerB, mgrA.trans.AcceptPeer) + assert.NoError(t, err) + }() + time.Sleep(graceTime) + go func() { + defer wg.Done() + err := mgrB.addNeighbor(peerA, mgrB.trans.DialPeer) + assert.NoError(t, err) + }() + + // wait for the connections to establish + wg.Wait() + + eventMock.On("newTransactionEvent", &NewTransactionEvent{ + Body: testTxData, + Peer: peerA, + }).Once() + eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerA}).Once() + eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerB}).Once() + + mgrA.SendTransaction(testTxData) + time.Sleep(graceTime) +} + +func TestP2PSendTwice(t *testing.T) { + defer assertEvents(t)() - // eventually the event should be triggered - assert.Eventually(t, func() bool { return len(triggered) >= 1 }, time.Second, 10*time.Millisecond) + mgrA, closeA, peerA := newTest(t, "A") + defer closeA() + mgrB, closeB, peerB := newTest(t, "B") + defer closeB() + + var wg sync.WaitGroup + wg.Add(2) + + // connect in the following way + // B -> A + go func() { + defer wg.Done() + err := mgrA.addNeighbor(peerB, mgrA.trans.AcceptPeer) + assert.NoError(t, err) + }() + time.Sleep(graceTime) + go func() { + defer wg.Done() + err := mgrB.addNeighbor(peerA, mgrB.trans.DialPeer) + assert.NoError(t, err) + }() + + // wait for the connections to establish + wg.Wait() + + eventMock.On("newTransactionEvent", &NewTransactionEvent{ + Body: testTxData, + Peer: peerA, + }).Twice() + eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerA}).Once() + eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerB}).Once() + + mgrA.SendTransaction(testTxData) + time.Sleep(1 * time.Second) // wait a bit between the sends, to test timeouts + mgrA.SendTransaction(testTxData) + time.Sleep(graceTime) } func TestBroadcast(t *testing.T) { + defer assertEvents(t)() + mgrA, closeA, peerA := newTest(t, "A") defer closeA() mgrB, closeB, peerB := newTest(t, "B") @@ -116,6 +218,8 @@ func TestBroadcast(t *testing.T) { var wg sync.WaitGroup wg.Add(4) + // connect in the following way + // B -> A <- C go func() { defer wg.Done() err := mgrA.addNeighbor(peerB, mgrA.trans.AcceptPeer) @@ -141,56 +245,37 @@ func TestBroadcast(t *testing.T) { // wait for the connections to establish wg.Wait() - tx := &pb.Transaction{Body: []byte("Hello!")} - - triggeredB := make(chan struct{}, 1) - mgrB.Events.NewTransaction.Attach(events.NewClosure(func(ev *NewTransactionEvent) { - require.Empty(t, triggeredB) // only once - assert.Equal(t, tx.GetBody(), ev.Body) - assert.Equal(t, peerA, ev.Peer) - triggeredB <- struct{}{} - })) - - triggeredC := make(chan struct{}, 1) - mgrC.Events.NewTransaction.Attach(events.NewClosure(func(ev *NewTransactionEvent) { - require.Empty(t, triggeredC) // only once - assert.Equal(t, tx.GetBody(), ev.Body) - assert.Equal(t, peerA, ev.Peer) - triggeredC <- struct{}{} - })) - - b, err := proto.Marshal(tx) - assert.NoError(t, err) - mgrA.Send(b) - - // eventually the events should be triggered - success := func() bool { - return len(triggeredB) >= 1 && len(triggeredC) >= 1 - } - assert.Eventually(t, success, time.Second, 10*time.Millisecond) + eventMock.On("newTransactionEvent", &NewTransactionEvent{ + Body: testTxData, + Peer: peerA, + }).Twice() + eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerA}).Twice() + eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerB}).Once() + eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerC}).Once() + + mgrA.SendTransaction(testTxData) + time.Sleep(graceTime) } func TestDropUnsuccessfulAccept(t *testing.T) { + defer assertEvents(t)() + mgrA, closeA, _ := newTest(t, "A") defer closeA() _, closeB, peerB := newTest(t, "B") defer closeB() - triggered := make(chan struct{}, 1) - mgrA.Events.DropNeighbor.Attach(events.NewClosure(func(ev *DropNeighborEvent) { - require.Empty(t, triggered) // only once - assert.Equal(t, peerB, ev.Peer) - triggered <- struct{}{} - })) + eventMock.On("dropNeighborEvent", &DropNeighborEvent{ + Peer: peerB, + }).Once() err := mgrA.addNeighbor(peerB, mgrA.trans.AcceptPeer) assert.Error(t, err) - - // eventually the event should be triggered - assert.Eventually(t, func() bool { return len(triggered) >= 1 }, time.Second, 10*time.Millisecond) } func TestTxRequest(t *testing.T) { + defer assertEvents(t)() + mgrA, closeA, peerA := newTest(t, "A") defer closeA() mgrB, closeB, peerB := newTest(t, "B") @@ -199,48 +284,32 @@ func TestTxRequest(t *testing.T) { var wg sync.WaitGroup wg.Add(2) + // connect in the following way + // B -> A go func() { defer wg.Done() err := mgrA.addNeighbor(peerB, mgrA.trans.AcceptPeer) assert.NoError(t, err) - logger.Debugw("Len", "len", mgrA.neighborhood.Len()) }() + time.Sleep(graceTime) go func() { defer wg.Done() err := mgrB.addNeighbor(peerA, mgrB.trans.DialPeer) assert.NoError(t, err) - logger.Debugw("Len", "len", mgrB.neighborhood.Len()) }() + // wait for the connections to establish wg.Wait() - tx := &pb.TransactionRequest{ - Hash: []byte("Hello!"), - } - b, err := proto.Marshal(tx) - assert.NoError(t, err) - - sendChan := make(chan struct{}) - sendSuccess := false - - mgrA.Events.NewTransaction.Attach(events.NewClosure(func(ev *NewTransactionEvent) { - logger.Debugw("New TX Event triggered", "data", ev.Body, "from", ev.Peer.ID().String()) - assert.Equal(t, []byte("testTx"), ev.Body) - assert.Equal(t, peerB, ev.Peer) - sendChan <- struct{}{} - })) + eventMock.On("newTransactionEvent", &NewTransactionEvent{ + Body: testTxData, + Peer: peerB, + }).Once() + eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerA}).Once() + eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerB}).Once() + b, err := proto.Marshal(&pb.TransactionRequest{Hash: []byte("Hello!")}) + require.NoError(t, err) mgrA.RequestTransaction(b) - - timer := time.NewTimer(5 * time.Second) - defer timer.Stop() - - select { - case <-sendChan: - sendSuccess = true - case <-timer.C: - sendSuccess = false - } - - assert.True(t, sendSuccess) + time.Sleep(graceTime) } diff --git a/packages/gossip/neighbor/neighbor.go b/packages/gossip/neighbor/neighbor.go deleted file mode 100644 index e3ec6bda8799fddd95aa67b74447ee13d44e994d..0000000000000000000000000000000000000000 --- a/packages/gossip/neighbor/neighbor.go +++ /dev/null @@ -1,98 +0,0 @@ -package neighbor - -import ( - "sync" - - "github.com/iotaledger/autopeering-sim/peer" - "github.com/iotaledger/goshimmer/packages/gossip/transport" -) - -// Neighbor defines a neighbor -type Neighbor struct { - Peer *peer.Peer - Conn *transport.Connection -} - -// NeighborMap implements a map of neighbors thread safe -type NeighborMap struct { - sync.RWMutex - internal map[string]*Neighbor -} - -// NewMap returns a new NeighborMap -func NewMap() *NeighborMap { - return &NeighborMap{ - internal: make(map[string]*Neighbor), - } -} - -// New returns a new Neighbor -func New(peer *peer.Peer, conn *transport.Connection) *Neighbor { - return &Neighbor{ - Peer: peer, - Conn: conn, - } -} - -// Len returns the number of neighbors stored in a NeighborMap -func (nm *NeighborMap) Len() int { - nm.RLock() - defer nm.RUnlock() - return len(nm.internal) -} - -// GetMap returns the content of the entire internal map -func (nm *NeighborMap) GetMap() map[string]*Neighbor { - newMap := make(map[string]*Neighbor) - nm.RLock() - defer nm.RUnlock() - for k, v := range nm.internal { - newMap[k] = v - } - return newMap -} - -// GetSlice returns a slice of the content of the entire internal map -func (nm *NeighborMap) GetSlice() []*Neighbor { - newSlice := make([]*Neighbor, nm.Len()) - nm.RLock() - defer nm.RUnlock() - i := 0 - for _, v := range nm.internal { - newSlice[i] = v - i++ - } - return newSlice -} - -// Load returns the neighbor for a given key. -// It also return a bool to communicate the presence of the given -// neighbor into the internal map -func (nm *NeighborMap) Load(key string) (value *Neighbor, ok bool) { - nm.RLock() - defer nm.RUnlock() - result, ok := nm.internal[key] - return result, ok -} - -// Delete removes the entire entry for a given key and return true if successful -func (nm *NeighborMap) Delete(key string) (deletedNeighbor *Neighbor, ok bool) { - deletedNeighbor, ok = nm.Load(key) - if !ok { - return nil, false - } - nm.Lock() - defer nm.Unlock() - if deletedNeighbor.Conn != nil { - deletedNeighbor.Conn.Close() - } - delete(nm.internal, key) - return deletedNeighbor, true -} - -// Store adds a new neighbor to the NeighborMap -func (nm *NeighborMap) Store(key string, value *Neighbor) { - nm.Lock() - defer nm.Unlock() - nm.internal[key] = value -} diff --git a/packages/gossip/proto/message.pb.go b/packages/gossip/proto/message.pb.go index 1e67ece10c370d5957078761427d411d983a93b6..df48ced344f27ed3cea9bdd2f51d404400e40585 100644 --- a/packages/gossip/proto/message.pb.go +++ b/packages/gossip/proto/message.pb.go @@ -1,5 +1,5 @@ // Code generated by protoc-gen-go. DO NOT EDIT. -// source: proto/message.proto +// source: packages/gossip/proto/message.proto package proto @@ -32,7 +32,7 @@ func (m *Transaction) Reset() { *m = Transaction{} } func (m *Transaction) String() string { return proto.CompactTextString(m) } func (*Transaction) ProtoMessage() {} func (*Transaction) Descriptor() ([]byte, []int) { - return fileDescriptor_33f3a5e1293a7bcd, []int{0} + return fileDescriptor_fcce9e84825f2fa5, []int{0} } func (m *Transaction) XXX_Unmarshal(b []byte) error { @@ -72,7 +72,7 @@ func (m *TransactionRequest) Reset() { *m = TransactionRequest{} } func (m *TransactionRequest) String() string { return proto.CompactTextString(m) } func (*TransactionRequest) ProtoMessage() {} func (*TransactionRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_33f3a5e1293a7bcd, []int{1} + return fileDescriptor_fcce9e84825f2fa5, []int{1} } func (m *TransactionRequest) XXX_Unmarshal(b []byte) error { @@ -105,17 +105,20 @@ func init() { proto.RegisterType((*TransactionRequest)(nil), "proto.TransactionRequest") } -func init() { proto.RegisterFile("proto/message.proto", fileDescriptor_33f3a5e1293a7bcd) } - -var fileDescriptor_33f3a5e1293a7bcd = []byte{ - // 139 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x2e, 0x28, 0xca, 0x2f, - 0xc9, 0xd7, 0xcf, 0x4d, 0x2d, 0x2e, 0x4e, 0x4c, 0x4f, 0xd5, 0x03, 0xf3, 0x84, 0x58, 0xc1, 0x94, - 0x92, 0x22, 0x17, 0x77, 0x48, 0x51, 0x62, 0x5e, 0x71, 0x62, 0x72, 0x49, 0x66, 0x7e, 0x9e, 0x90, - 0x10, 0x17, 0x4b, 0x52, 0x7e, 0x4a, 0xa5, 0x04, 0xa3, 0x02, 0xa3, 0x06, 0x4f, 0x10, 0x98, 0xad, - 0xa4, 0xc1, 0x25, 0x84, 0xa4, 0x24, 0x28, 0xb5, 0xb0, 0x34, 0xb5, 0xb8, 0x04, 0xa4, 0x32, 0x23, - 0xb1, 0x38, 0x03, 0xa6, 0x12, 0xc4, 0x76, 0x52, 0x8e, 0x52, 0x4c, 0xcf, 0x2c, 0xc9, 0x28, 0x4d, - 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x4f, 0x4e, 0x2c, 0xc8, 0x2f, 0x2e, 0x4e, 0xcd, 0x49, 0xd5, 0x4f, - 0xcf, 0x2f, 0x2e, 0xce, 0x2c, 0xd0, 0x07, 0xdb, 0x98, 0xc4, 0x06, 0xa6, 0x8c, 0x01, 0x01, 0x00, - 0x00, 0xff, 0xff, 0x34, 0x46, 0xa5, 0x0f, 0x96, 0x00, 0x00, 0x00, +func init() { + proto.RegisterFile("packages/gossip/proto/message.proto", fileDescriptor_fcce9e84825f2fa5) +} + +var fileDescriptor_fcce9e84825f2fa5 = []byte{ + // 157 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x8e, 0x31, 0x0b, 0xc2, 0x30, + 0x10, 0x46, 0x29, 0xa8, 0x43, 0x74, 0xca, 0xe4, 0xa8, 0x75, 0xe9, 0xd4, 0x0c, 0x22, 0xee, 0xfe, + 0x84, 0xe2, 0xe4, 0x76, 0x49, 0x8f, 0x24, 0x68, 0x7a, 0x31, 0x97, 0x0e, 0xfe, 0x7b, 0x49, 0x40, + 0x70, 0xe8, 0xf4, 0xbd, 0x0f, 0xde, 0xf0, 0xc4, 0x29, 0x82, 0x79, 0x82, 0x45, 0x56, 0x96, 0x98, + 0x7d, 0x54, 0x31, 0x51, 0x26, 0x15, 0x90, 0x19, 0x2c, 0xf6, 0xf5, 0xc9, 0x75, 0x9d, 0xf6, 0x28, + 0xb6, 0xf7, 0x04, 0x13, 0x83, 0xc9, 0x9e, 0x26, 0x29, 0xc5, 0x4a, 0xd3, 0xf8, 0xd9, 0x37, 0x87, + 0xa6, 0xdb, 0x0d, 0x95, 0xdb, 0x4e, 0xc8, 0x3f, 0x65, 0xc0, 0xf7, 0x8c, 0x9c, 0x8b, 0xe9, 0x80, + 0xdd, 0xcf, 0x2c, 0x7c, 0xbb, 0x3e, 0x2e, 0xd6, 0x67, 0x37, 0xeb, 0xde, 0x50, 0x50, 0x9e, 0x32, + 0xbc, 0x70, 0xb4, 0x98, 0x4a, 0x87, 0xf3, 0x21, 0x60, 0x52, 0x8b, 0x69, 0x7a, 0x53, 0xe7, 0xfc, + 0x0d, 0x00, 0x00, 0xff, 0xff, 0xd7, 0x71, 0x33, 0x9a, 0xba, 0x00, 0x00, 0x00, } diff --git a/packages/gossip/transport/connection.go b/packages/gossip/transport/connection.go index f6886129735517fe451df9c6cf11d34fddeb3e94..d677040f0002b1fc40a04469751fbd24cee96097 100644 --- a/packages/gossip/transport/connection.go +++ b/packages/gossip/transport/connection.go @@ -2,43 +2,28 @@ package transport import ( "net" + "time" "github.com/iotaledger/autopeering-sim/peer" ) -const ( - // MaxPacketSize specifies the maximum allowed size of packets. - // Packets larger than this will be cut and thus treated as invalid. - MaxPacketSize = 1280 -) - +// Connection represents a network connection to a neighbor peer. type Connection struct { + net.Conn peer *peer.Peer - conn net.Conn } -func newConnection(p *peer.Peer, c net.Conn) *Connection { +func newConnection(c net.Conn, p *peer.Peer) *Connection { + // make sure the connection has no timeouts + _ = c.SetDeadline(time.Time{}) + return &Connection{ + Conn: c, peer: p, - conn: c, } } -func (c *Connection) Close() { - c.conn.Close() -} - -func (c *Connection) Read() ([]byte, error) { - b := make([]byte, MaxPacketSize) - n, err := c.conn.Read(b) - if err != nil { - return nil, err - } - - return b[:n], nil -} - -func (c *Connection) Write(b []byte) error { - _, err := c.conn.Write(b) - return err +// Peer returns the peer associated with that connection. +func (c *Connection) Peer() *peer.Peer { + return c.peer } diff --git a/packages/gossip/transport/handshake.go b/packages/gossip/transport/handshake.go index ef3e8608e518d668678034971c5180e8f8a7e9da..e5e928c4e2ca079fbc6dbc61cd82e43f8c27bce1 100644 --- a/packages/gossip/transport/handshake.go +++ b/packages/gossip/transport/handshake.go @@ -10,18 +10,18 @@ import ( ) const ( - HandshakeExpiration = 20 * time.Second - VersionNum = 0 + versionNum = 0 + handshakeExpiration = 20 * time.Second ) // isExpired checks whether the given UNIX time stamp is too far in the past. func isExpired(ts int64) bool { - return time.Since(time.Unix(ts, 0)) >= HandshakeExpiration + return time.Since(time.Unix(ts, 0)) >= handshakeExpiration } func newHandshakeRequest(fromAddr string, toAddr string) ([]byte, error) { m := &pb.HandshakeRequest{ - Version: VersionNum, + Version: versionNum, From: fromAddr, To: toAddr, Timestamp: time.Now().Unix(), @@ -36,7 +36,7 @@ func newHandshakeResponse(reqData []byte) ([]byte, error) { return proto.Marshal(m) } -func (t *TransportTCP) validateHandshakeRequest(reqData []byte, fromAddr string) bool { +func (t *TCP) validateHandshakeRequest(reqData []byte, fromAddr string) bool { m := new(pb.HandshakeRequest) if err := proto.Unmarshal(reqData, m); err != nil { t.log.Debugw("invalid handshake", @@ -44,7 +44,7 @@ func (t *TransportTCP) validateHandshakeRequest(reqData []byte, fromAddr string) ) return false } - if m.GetVersion() != VersionNum { + if m.GetVersion() != versionNum { t.log.Debugw("invalid handshake", "version", m.GetVersion(), ) @@ -71,7 +71,7 @@ func (t *TransportTCP) validateHandshakeRequest(reqData []byte, fromAddr string) return true } -func (t *TransportTCP) validateHandshakeResponse(resData []byte, reqData []byte) bool { +func (t *TCP) validateHandshakeResponse(resData []byte, reqData []byte) bool { m := new(pb.HandshakeResponse) if err := proto.Unmarshal(resData, m); err != nil { t.log.Debugw("invalid handshake", diff --git a/packages/gossip/transport/proto/handshake.pb.go b/packages/gossip/transport/proto/handshake.pb.go index c7f368873432c4456bb4f7cd8376f023905c9bf1..610f02b3b94fdfced881d7401ab9b89e6d8d5439 100644 --- a/packages/gossip/transport/proto/handshake.pb.go +++ b/packages/gossip/transport/proto/handshake.pb.go @@ -1,5 +1,5 @@ // Code generated by protoc-gen-go. DO NOT EDIT. -// source: transport/proto/handshake.proto +// source: packages/gossip/transport/proto/handshake.proto package proto @@ -38,7 +38,7 @@ func (m *HandshakeRequest) Reset() { *m = HandshakeRequest{} } func (m *HandshakeRequest) String() string { return proto.CompactTextString(m) } func (*HandshakeRequest) ProtoMessage() {} func (*HandshakeRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_d7101ffe19b05443, []int{0} + return fileDescriptor_f9e96b60881ea276, []int{0} } func (m *HandshakeRequest) XXX_Unmarshal(b []byte) error { @@ -99,7 +99,7 @@ func (m *HandshakeResponse) Reset() { *m = HandshakeResponse{} } func (m *HandshakeResponse) String() string { return proto.CompactTextString(m) } func (*HandshakeResponse) ProtoMessage() {} func (*HandshakeResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_d7101ffe19b05443, []int{1} + return fileDescriptor_f9e96b60881ea276, []int{1} } func (m *HandshakeResponse) XXX_Unmarshal(b []byte) error { @@ -132,21 +132,24 @@ func init() { proto.RegisterType((*HandshakeResponse)(nil), "proto.HandshakeResponse") } -func init() { proto.RegisterFile("transport/proto/handshake.proto", fileDescriptor_d7101ffe19b05443) } - -var fileDescriptor_d7101ffe19b05443 = []byte{ - // 203 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x5c, 0x8f, 0x3f, 0x4b, 0x04, 0x31, - 0x10, 0xc5, 0xb9, 0x3f, 0x7a, 0xde, 0xa0, 0xa2, 0xa9, 0x22, 0x08, 0xca, 0x55, 0x82, 0xb8, 0x29, - 0xfc, 0x06, 0x56, 0x57, 0xa7, 0xb4, 0x91, 0xec, 0x3a, 0x6e, 0x82, 0x26, 0x93, 0xcd, 0x64, 0xfd, - 0xfc, 0x0e, 0x81, 0x45, 0xb1, 0x7a, 0xef, 0xf7, 0x0b, 0xe4, 0x25, 0x70, 0x57, 0x8b, 0x4b, 0x9c, - 0xa9, 0x54, 0x93, 0x0b, 0x55, 0x32, 0xde, 0xa5, 0x77, 0xf6, 0xee, 0x13, 0xbb, 0xc6, 0xea, 0xa4, - 0xc5, 0x21, 0xc1, 0xd5, 0x71, 0x39, 0xb1, 0x38, 0xcd, 0xc8, 0x55, 0x69, 0xd8, 0x7d, 0x63, 0xe1, - 0x40, 0x49, 0xaf, 0xee, 0x57, 0x0f, 0x17, 0x76, 0x41, 0xa5, 0x60, 0xfb, 0x51, 0x28, 0xea, 0xb5, - 0xe8, 0xbd, 0x6d, 0x5d, 0x5d, 0xc2, 0xba, 0x92, 0xde, 0x34, 0x23, 0x4d, 0xdd, 0xc2, 0xbe, 0x86, - 0x28, 0xf7, 0xb8, 0x98, 0xf5, 0x56, 0xf4, 0xc6, 0xfe, 0x8a, 0x43, 0x07, 0xd7, 0x7f, 0xf6, 0xe4, - 0x81, 0x89, 0x51, 0xdd, 0xc0, 0x59, 0xc1, 0xe9, 0xcd, 0x3b, 0xf6, 0x6d, 0xf1, 0xdc, 0xee, 0x84, - 0x8f, 0x82, 0x2f, 0x4f, 0xaf, 0x8f, 0x63, 0xa8, 0x7e, 0xee, 0xbb, 0x81, 0xa2, 0x19, 0x5c, 0x26, - 0x66, 0xfc, 0x42, 0x33, 0x4a, 0x86, 0x6c, 0xfe, 0xfd, 0xb2, 0x3f, 0x6d, 0xf1, 0xfc, 0x13, 0x00, - 0x00, 0xff, 0xff, 0x51, 0xe0, 0x08, 0xd0, 0xff, 0x00, 0x00, 0x00, +func init() { + proto.RegisterFile("packages/gossip/transport/proto/handshake.proto", fileDescriptor_f9e96b60881ea276) +} + +var fileDescriptor_f9e96b60881ea276 = []byte{ + // 219 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x8f, 0x31, 0x4f, 0xc3, 0x30, + 0x10, 0x85, 0x95, 0xb4, 0x50, 0x6a, 0x01, 0x02, 0x4f, 0x41, 0x62, 0x88, 0x3a, 0x65, 0x8a, 0x07, + 0x7e, 0x00, 0x82, 0xa9, 0xb3, 0x47, 0x16, 0x74, 0x6d, 0x8f, 0xd8, 0x2a, 0xf6, 0x39, 0x77, 0x57, + 0x7e, 0x3f, 0xc2, 0x52, 0x05, 0x1b, 0xd3, 0xbb, 0xf7, 0x6e, 0xf8, 0xf4, 0x19, 0x57, 0x60, 0x7f, + 0x84, 0x09, 0xc5, 0x4d, 0x24, 0x12, 0x8b, 0x53, 0x86, 0x2c, 0x85, 0x58, 0x5d, 0x61, 0x52, 0x72, + 0x01, 0xf2, 0x41, 0x02, 0x1c, 0x71, 0xac, 0xdd, 0x5e, 0xd4, 0xd8, 0x64, 0x73, 0xb7, 0x3d, 0x7f, + 0x3c, 0xce, 0x27, 0x14, 0xb5, 0x9d, 0x59, 0x7d, 0x21, 0x4b, 0xa4, 0xdc, 0x35, 0x7d, 0x33, 0xdc, + 0xf8, 0x73, 0xb5, 0xd6, 0x2c, 0x3f, 0x98, 0x52, 0xd7, 0xf6, 0xcd, 0xb0, 0xf6, 0xf5, 0xb6, 0xb7, + 0xa6, 0x55, 0xea, 0x16, 0x75, 0x69, 0x95, 0xec, 0xa3, 0x59, 0x6b, 0x4c, 0x28, 0x0a, 0xa9, 0x74, + 0xcb, 0xbe, 0x19, 0x16, 0xfe, 0x77, 0xd8, 0x8c, 0xe6, 0xfe, 0x0f, 0x4f, 0x0a, 0x65, 0x41, 0xfb, + 0x60, 0xae, 0x18, 0xe7, 0xf7, 0x00, 0x12, 0x2a, 0xf1, 0xda, 0xaf, 0x18, 0xe7, 0x2d, 0x48, 0x78, + 0x7d, 0x79, 0x7b, 0x9e, 0xa2, 0x86, 0xd3, 0x6e, 0xdc, 0x53, 0x72, 0x91, 0x14, 0x3e, 0xf1, 0x30, + 0x21, 0xff, 0x68, 0x86, 0x98, 0x12, 0xf2, 0x7f, 0xe6, 0xbb, 0xcb, 0x1a, 0x4f, 0xdf, 0x01, 0x00, + 0x00, 0xff, 0xff, 0x2a, 0x53, 0xc3, 0xbb, 0x23, 0x01, 0x00, 0x00, } diff --git a/packages/gossip/transport/transport.go b/packages/gossip/transport/transport.go index f9ae8101dbb42d4f8f77ee559a7100dade21e0e1..791b8e310a0195375bf2cd68fd62b08c4976b682 100644 --- a/packages/gossip/transport/transport.go +++ b/packages/gossip/transport/transport.go @@ -3,8 +3,10 @@ package transport import ( "bytes" "container/list" - "errors" + "fmt" + "io" "net" + "strings" "sync" "time" @@ -12,14 +14,19 @@ import ( "github.com/iotaledger/autopeering-sim/peer" "github.com/iotaledger/autopeering-sim/peer/service" pb "github.com/iotaledger/autopeering-sim/server/proto" + "github.com/pkg/errors" "go.uber.org/zap" ) var ( - ErrTimeout = errors.New("accept timeout") - ErrClosed = errors.New("listener closed") + // ErrTimeout is returned when an expected incoming connection was not received in time. + ErrTimeout = errors.New("accept timeout") + // ErrClosed means that the transport was shut down before a response could be received. + ErrClosed = errors.New("transport closed") + // ErrInvalidHandshake is returned when no correct handshake could be established. ErrInvalidHandshake = errors.New("invalid handshake") - ErrNoGossip = errors.New("peer does not have a gossip service") + // ErrNoGossip means that the given peer does not support the gossip service. + ErrNoGossip = errors.New("peer does not have a gossip service") ) // connection timeouts @@ -27,9 +34,12 @@ const ( acceptTimeout = 500 * time.Millisecond handshakeTimeout = 100 * time.Millisecond connectionTimeout = acceptTimeout + handshakeTimeout + + maxHandshakePacketSize = 256 ) -type TransportTCP struct { +// TCP establishes verified incoming and outgoing TCP connections to other peers. +type TCP struct { local *peer.Local listener *net.TCPListener log *zap.SugaredLogger @@ -60,8 +70,9 @@ type accept struct { conn net.Conn // the actual network connection } -func Listen(local *peer.Local, log *zap.SugaredLogger) (*TransportTCP, error) { - t := &TransportTCP{ +// Listen creates the object and starts listening for incoming connections. +func Listen(local *peer.Local, log *zap.SugaredLogger) (*TCP, error) { + t := &TCP{ local: local, log: log, addAcceptMatcher: make(chan *acceptMatcher), @@ -91,7 +102,7 @@ func Listen(local *peer.Local, log *zap.SugaredLogger) (*TransportTCP, error) { } // Close stops listening on the gossip address. -func (t *TransportTCP) Close() { +func (t *TCP) Close() { t.closeOnce.Do(func() { close(t.closing) if err := t.listener.Close(); err != nil { @@ -102,13 +113,13 @@ func (t *TransportTCP) Close() { } // LocalAddr returns the listener's network address, -func (t *TransportTCP) LocalAddr() net.Addr { +func (t *TCP) LocalAddr() net.Addr { return t.listener.Addr() } // DialPeer establishes a gossip connection to the given peer. // If the peer does not accept the connection or the handshake fails, an error is returned. -func (t *TransportTCP) DialPeer(p *peer.Peer) (*Connection, error) { +func (t *TCP) DialPeer(p *peer.Peer) (*Connection, error) { gossipAddr := p.Services().Get(service.GossipKey) if gossipAddr == nil { return nil, ErrNoGossip @@ -125,12 +136,12 @@ func (t *TransportTCP) DialPeer(p *peer.Peer) (*Connection, error) { } t.log.Debugw("connected", "id", p.ID(), "addr", conn.RemoteAddr(), "direction", "out") - return newConnection(p, conn), nil + return newConnection(conn, p), nil } // AcceptPeer awaits an incoming connection from the given peer. // If the peer does not establish the connection or the handshake fails, an error is returned. -func (t *TransportTCP) AcceptPeer(p *peer.Peer) (*Connection, error) { +func (t *TCP) AcceptPeer(p *peer.Peer) (*Connection, error) { if p.Services().Get(service.GossipKey) == nil { return nil, ErrNoGossip } @@ -139,11 +150,12 @@ func (t *TransportTCP) AcceptPeer(p *peer.Peer) (*Connection, error) { if connected.err != nil { return nil, connected.err } - t.log.Debugw("connected", "id", p.ID(), "addr", connected.c.conn.RemoteAddr(), "direction", "in") + + t.log.Debugw("connected", "id", p.ID(), "addr", connected.c.RemoteAddr(), "direction", "in") return connected.c, nil } -func (t *TransportTCP) acceptPeer(p *peer.Peer) <-chan connect { +func (t *TCP) acceptPeer(p *peer.Peer) <-chan connect { connected := make(chan connect, 1) // add the matcher select { @@ -154,18 +166,18 @@ func (t *TransportTCP) acceptPeer(p *peer.Peer) <-chan connect { return connected } -func (t *TransportTCP) closeConnection(c net.Conn) { +func (t *TCP) closeConnection(c net.Conn) { if err := c.Close(); err != nil { t.log.Warnw("close error", "err", err) } } -func (t *TransportTCP) run() { +func (t *TCP) run() { defer t.wg.Done() var ( - mlist = list.New() - timeout = time.NewTimer(0) + matcherList = list.New() + timeout = time.NewTimer(0) ) defer timeout.Stop() @@ -174,9 +186,9 @@ func (t *TransportTCP) run() { for { // Set the timer so that it fires when the next accept expires - if el := mlist.Front(); el != nil { + if e := matcherList.Front(); e != nil { // the first element always has the closest deadline - m := el.Value.(*acceptMatcher) + m := e.Value.(*acceptMatcher) timeout.Reset(time.Until(m.deadline)) } else { timeout.Stop() @@ -187,16 +199,16 @@ func (t *TransportTCP) run() { // add a new matcher to the list case m := <-t.addAcceptMatcher: m.deadline = time.Now().Add(connectionTimeout) - mlist.PushBack(m) + matcherList.PushBack(m) // on accept received, check all matchers for a fit case a := <-t.acceptReceived: matched := false - for el := mlist.Front(); el != nil; el = el.Next() { - m := el.Value.(*acceptMatcher) + for e := matcherList.Front(); e != nil; e = e.Next() { + m := e.Value.(*acceptMatcher) if m.peer.ID() == a.fromID { matched = true - mlist.Remove(el) + matcherList.Remove(e) // finish the handshake go t.matchAccept(m, a.req, a.conn) } @@ -212,18 +224,18 @@ func (t *TransportTCP) run() { now := time.Now() // notify and remove any expired matchers - for el := mlist.Front(); el != nil; el = el.Next() { - m := el.Value.(*acceptMatcher) + for e := matcherList.Front(); e != nil; e = e.Next() { + m := e.Value.(*acceptMatcher) if now.After(m.deadline) || now.Equal(m.deadline) { m.connected <- connect{nil, ErrTimeout} - mlist.Remove(el) + matcherList.Remove(e) } } // on close, notify all the matchers case <-t.closing: - for el := mlist.Front(); el != nil; el = el.Next() { - el.Value.(*acceptMatcher).connected <- connect{nil, ErrClosed} + for e := matcherList.Front(); e != nil; e = e.Next() { + e.Value.(*acceptMatcher).connected <- connect{nil, ErrClosed} } return @@ -231,7 +243,7 @@ func (t *TransportTCP) run() { } } -func (t *TransportTCP) matchAccept(m *acceptMatcher, req []byte, conn net.Conn) { +func (t *TCP) matchAccept(m *acceptMatcher, req []byte, conn net.Conn) { t.wg.Add(1) defer t.wg.Done() @@ -241,10 +253,10 @@ func (t *TransportTCP) matchAccept(m *acceptMatcher, req []byte, conn net.Conn) t.closeConnection(conn) return } - m.connected <- connect{newConnection(m.peer, conn), nil} + m.connected <- connect{newConnection(conn, m.peer), nil} } -func (t *TransportTCP) listenLoop() { +func (t *TCP) listenLoop() { defer t.wg.Done() for { @@ -254,7 +266,10 @@ func (t *TransportTCP) listenLoop() { continue } else if err != nil { // return from the loop on all other errors - t.log.Warnw("read error", "err", err) + if err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") { + t.log.Warnw("listen error", "err", err) + } + t.log.Debug("listening stopped") return } @@ -278,7 +293,7 @@ func (t *TransportTCP) listenLoop() { } } -func (t *TransportTCP) doHandshake(key peer.PublicKey, remoteAddr string, conn net.Conn) error { +func (t *TCP) doHandshake(key peer.PublicKey, remoteAddr string, conn net.Conn) error { reqData, err := newHandshakeRequest(conn.LocalAddr().String(), remoteAddr) if err != nil { return err @@ -291,7 +306,10 @@ func (t *TransportTCP) doHandshake(key peer.PublicKey, remoteAddr string, conn n } b, err := proto.Marshal(pkt) if err != nil { - return err + return errors.Wrap(err, ErrInvalidHandshake.Error()) + } + if l := len(b); l > maxHandshakePacketSize { + return fmt.Errorf("handshake size too large: %d, max %d", l, maxHandshakePacketSize) } if err := conn.SetWriteDeadline(time.Now().Add(handshakeTimeout)); err != nil { @@ -305,7 +323,7 @@ func (t *TransportTCP) doHandshake(key peer.PublicKey, remoteAddr string, conn n if err := conn.SetReadDeadline(time.Now().Add(handshakeTimeout)); err != nil { return err } - b = make([]byte, MaxPacketSize) + b = make([]byte, maxHandshakePacketSize) n, err := conn.Read(b) if err != nil { return err @@ -313,17 +331,13 @@ func (t *TransportTCP) doHandshake(key peer.PublicKey, remoteAddr string, conn n pkt = new(pb.Packet) if err := proto.Unmarshal(b[:n], pkt); err != nil { - return err + return errors.Wrap(err, ErrInvalidHandshake.Error()) } signer, err := peer.RecoverKeyFromSignedData(pkt) - if err != nil { - return err - } - if !bytes.Equal(key, signer) { - return errors.New("invalid key") + if err != nil || !bytes.Equal(key, signer) { + return ErrInvalidHandshake } - if !t.validateHandshakeResponse(pkt.GetData(), reqData) { return ErrInvalidHandshake } @@ -331,14 +345,14 @@ func (t *TransportTCP) doHandshake(key peer.PublicKey, remoteAddr string, conn n return nil } -func (t *TransportTCP) readHandshakeRequest(conn net.Conn) (peer.PublicKey, []byte, error) { +func (t *TCP) readHandshakeRequest(conn net.Conn) (peer.PublicKey, []byte, error) { if err := conn.SetReadDeadline(time.Now().Add(handshakeTimeout)); err != nil { return nil, nil, err } - b := make([]byte, MaxPacketSize) + b := make([]byte, maxHandshakePacketSize) n, err := conn.Read(b) if err != nil { - return nil, nil, err + return nil, nil, errors.Wrap(err, ErrInvalidHandshake.Error()) } pkt := new(pb.Packet) @@ -358,7 +372,7 @@ func (t *TransportTCP) readHandshakeRequest(conn net.Conn) (peer.PublicKey, []by return key, pkt.GetData(), nil } -func (t *TransportTCP) writeHandshakeResponse(reqData []byte, conn net.Conn) error { +func (t *TCP) writeHandshakeResponse(reqData []byte, conn net.Conn) error { data, err := newHandshakeResponse(reqData) if err != nil { return err @@ -373,6 +387,9 @@ func (t *TransportTCP) writeHandshakeResponse(reqData []byte, conn net.Conn) err if err != nil { return err } + if l := len(b); l > maxHandshakePacketSize { + return fmt.Errorf("handshake size too large: %d, max %d", l, maxHandshakePacketSize) + } if err := conn.SetWriteDeadline(time.Now().Add(handshakeTimeout)); err != nil { return err diff --git a/packages/gossip/transport/transport_test.go b/packages/gossip/transport/transport_test.go index c0b670476d80b8e49a946474c08a22e7dd2a7ed6..51b64b8143e0495cbfc2af013cdee501bcb40b04 100644 --- a/packages/gossip/transport/transport_test.go +++ b/packages/gossip/transport/transport_test.go @@ -26,14 +26,14 @@ func init() { logger = l.Sugar() } -func newTest(t require.TestingT, name string) (*TransportTCP, func()) { +func newTest(t require.TestingT, name string) (*TCP, func()) { l := logger.Named(name) db := peer.NewMemoryDB(l.Named("db")) local, err := peer.NewLocal("peering", name, db) require.NoError(t, err) // enable TCP gossipping - require.NoError(t, local.UpdateService(service.GossipKey, "tcp", ":0")) + require.NoError(t, local.UpdateService(service.GossipKey, "tcp", "localhost:0")) trans, err := Listen(local, l) require.NoError(t, err) @@ -48,7 +48,7 @@ func newTest(t require.TestingT, name string) (*TransportTCP, func()) { return trans, teardown } -func getPeer(t *TransportTCP) *peer.Peer { +func getPeer(t *TCP) *peer.Peer { return &t.local.Peer } @@ -87,7 +87,7 @@ func TestUnansweredDial(t *testing.T) { // create peer with invalid gossip address services := getPeer(transA).Services().CreateRecord() - services.Update(service.GossipKey, "tcp", ":0") + services.Update(service.GossipKey, "tcp", "localhost:0") unreachablePeer := peer.NewPeer(getPeer(transA).PublicKey(), services) _, err := transA.DialPeer(unreachablePeer) @@ -99,12 +99,12 @@ func TestNoHandshakeResponse(t *testing.T) { defer closeA() // accept and read incoming connections - lis, err := net.Listen("tcp", ":0") + lis, err := net.Listen("tcp", "localhost:0") require.NoError(t, err) go func() { conn, err := lis.Accept() require.NoError(t, err) - n, _ := conn.Read(make([]byte, MaxPacketSize)) + n, _ := conn.Read(make([]byte, maxHandshakePacketSize)) assert.NotZero(t, n) _ = conn.Close() _ = lis.Close() diff --git a/packages/transactionspammer/transactionspammer.go b/packages/transactionspammer/transactionspammer.go index f37a2fc3bfbaf9eb476a74e75f6be58947157a2d..e9fd616c3c31d932f229c88c771a4d41041b215c 100644 --- a/packages/transactionspammer/transactionspammer.go +++ b/packages/transactionspammer/transactionspammer.go @@ -53,7 +53,7 @@ func Start(tps uint) { mtx := &pb.Transaction{Body: tx.MetaTransaction.GetBytes()} b, _ := proto.Marshal(mtx) - gossip.Event.NewTransaction.Trigger(b) + gossip.Events.NewTransaction.Trigger(b) if sentCounter >= tps { duration := time.Since(start) diff --git a/plugins/autopeering/autopeering.go b/plugins/autopeering/autopeering.go index af5b20f894cdb790b495574bbeead06bb19d0f15..f980e91c83cc05d234a2c94aef75475262f0cbbc 100644 --- a/plugins/autopeering/autopeering.go +++ b/plugins/autopeering/autopeering.go @@ -51,11 +51,15 @@ const defaultZLC = `{ } }` -func start() { - var ( - err error - ) +var ( + db peer.DB + trans *transport.TransportConn + zLogger *zap.SugaredLogger + handlers []server.Handler + conn *net.UDPConn +) +func configureAP() { host := parameter.NodeConfig.GetString(CFG_ADDRESS) localhost := host apPort := strconv.Itoa(parameter.NodeConfig.GetInt(CFG_PORT)) @@ -68,19 +72,16 @@ func start() { listenAddr := host + ":" + apPort gossipAddr := host + ":" + gossipPort - logger := logger.NewLogger(defaultZLC, debugLevel) - - defer func() { _ = logger.Sync() }() // ignore the returned error + zLogger = logger.NewLogger(defaultZLC, debugLevel) addr, err := net.ResolveUDPAddr("udp", localhost+":"+apPort) if err != nil { log.Fatalf("ResolveUDPAddr: %v", err) } - conn, err := net.ListenUDP("udp", addr) + conn, err = net.ListenUDP("udp", addr) if err != nil { log.Fatalf("ListenUDP: %v", err) } - defer conn.Close() masterPeers := []*peer.Peer{} master, err := parseEntryNodes() @@ -91,12 +92,11 @@ func start() { } // use the UDP connection for transport - trans := transport.Conn(conn, func(network, address string) (net.Addr, error) { return net.ResolveUDPAddr(network, address) }) - defer trans.Close() + trans = transport.Conn(conn, func(network, address string) (net.Addr, error) { return net.ResolveUDPAddr(network, address) }) // create a new local node - db := peer.NewPersistentDB(logger.Named("db")) - defer db.Close() + db = peer.NewPersistentDB(zLogger.Named("db")) + local.INSTANCE, err = peer.NewLocal("udp", listenAddr, db) if err != nil { log.Fatalf("ListenUDP: %v", err) @@ -108,14 +108,14 @@ func start() { } Discovery = discover.New(local.INSTANCE, discover.Config{ - Log: logger.Named("disc"), + Log: zLogger.Named("disc"), MasterPeers: masterPeers, }) - handlers := append([]server.Handler{}, Discovery) + handlers = append([]server.Handler{}, Discovery) if parameter.NodeConfig.GetBool(CFG_SELECTION) { Selection = selection.New(local.INSTANCE, Discovery, selection.Config{ - Log: logger.Named("sel"), + Log: zLogger.Named("sel"), Param: &selection.Parameters{ SaltLifetime: selection.DefaultSaltLifetime, RequiredService: []service.Key{service.GossipKey}, @@ -123,14 +123,14 @@ func start() { }) handlers = append(handlers, Selection) } +} +func start() { // start a server doing discovery and peering - srv = server.Listen(local.INSTANCE, trans, logger.Named("srv"), handlers...) - defer srv.Close() + srv = server.Listen(local.INSTANCE, trans, zLogger.Named("srv"), handlers...) // start the discovery on that connection Discovery.Start(srv) - defer Discovery.Close() // start the peering on that connection if parameter.NodeConfig.GetBool(CFG_SELECTION) { @@ -139,12 +139,22 @@ func start() { } id := base64.StdEncoding.EncodeToString(local.INSTANCE.PublicKey()) - logger.Info("Discovery protocol started: ID=" + id + ", address=" + srv.LocalAddr()) + zLogger.Info("Discovery protocol started: ID=" + id + ", address=" + srv.LocalAddr()) + + defer func() { + _ = zLogger.Sync() // ignore the returned error + trans.Close() + db.Close() + conn.Close() + srv.Close() + Discovery.Close() + }() + // Only for debug go func() { for t := range time.NewTicker(2 * time.Second).C { _ = t - printReport(logger) + printReport(zLogger) } }() diff --git a/plugins/autopeering/plugin.go b/plugins/autopeering/plugin.go index eefa8de3dec4fab5a49198b15903a99ac5a3b6bc..6190e497f489c99762467cb0e7606c601dd1946b 100644 --- a/plugins/autopeering/plugin.go +++ b/plugins/autopeering/plugin.go @@ -3,7 +3,6 @@ package autopeering import ( "github.com/iotaledger/autopeering-sim/discover" "github.com/iotaledger/goshimmer/packages/gossip" - "github.com/iotaledger/goshimmer/packages/gossip/neighbor" "github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/logger" @@ -18,6 +17,7 @@ func configure(plugin *node.Plugin) { close <- struct{}{} })) + configureAP() configureLogging(plugin) } @@ -26,9 +26,10 @@ func run(plugin *node.Plugin) { } func configureLogging(plugin *node.Plugin) { - gossip.Event.DropNeighbor.Attach(events.NewClosure(func(peer *neighbor.Neighbor) { + gossip.Events.DropNeighbor.Attach(events.NewClosure(func(ev *gossip.DropNeighborEvent) { + log.Info("neighbor dropped: " + ev.Peer.Address() + " / " + ev.Peer.ID().String()) if Selection != nil { - Selection.DropPeer(peer.Peer) + Selection.DropPeer(ev.Peer) } })) diff --git a/plugins/gossip/gossip.go b/plugins/gossip/gossip.go index 89683109d7bcfb8c960be6764ee9559a6b6b7e23..00598e5fc4d7be1f7ace2b8a73c15952325cf6bb 100644 --- a/plugins/gossip/gossip.go +++ b/plugins/gossip/gossip.go @@ -1,28 +1,27 @@ package gossip import ( - - "github.com/iotaledger/goshimmer/packages/gossip/transport" - "github.com/iotaledger/goshimmer/packages/model/meta_transaction" + "github.com/golang/protobuf/proto" + "github.com/iotaledger/autopeering-sim/peer/service" + "github.com/iotaledger/autopeering-sim/selection" gp "github.com/iotaledger/goshimmer/packages/gossip" pb "github.com/iotaledger/goshimmer/packages/gossip/proto" + "github.com/iotaledger/goshimmer/packages/gossip/transport" + "github.com/iotaledger/goshimmer/packages/model/meta_transaction" "github.com/iotaledger/goshimmer/plugins/autopeering/local" - "github.com/iotaledger/autopeering-sim/peer/service" - "github.com/iotaledger/autopeering-sim/selection" "github.com/iotaledger/goshimmer/plugins/tangle" - "go.uber.org/zap" - "github.com/golang/protobuf/proto" "github.com/iotaledger/hive.go/events" + "go.uber.org/zap" ) var ( - zLogger *zap.SugaredLogger - mgr *gp.Manager - SendTransaction = mgr.Send + zLogger *zap.SugaredLogger + mgr *gp.Manager + SendTransaction = mgr.SendTransaction RequestTransaction = mgr.RequestTransaction - AddInbound = mgr.AddInbound - AddOutbound = mgr.AddOutbound - DropNeighbor = mgr.DropNeighbor + AddInbound = mgr.AddInbound + AddOutbound = mgr.AddOutbound + DropNeighbor = mgr.DropNeighbor ) func init() { @@ -46,43 +45,38 @@ func configureGossip() { trans, err := transport.Listen(local.INSTANCE, zLogger) if err != nil { - // TODO: handle error + log.Fatal(err) } mgr = gp.NewManager(trans, zLogger, getTransaction) + log.Info("Gossip started @", trans.LocalAddr().String()) } func configureEvents() { - + selection.Events.Dropped.Attach(events.NewClosure(func(ev *selection.DroppedEvent) { - log.Debug("neighbor removed: " + ev.DroppedID.String()) - DropNeighbor(ev.DroppedID) + log.Info("neighbor removed: " + ev.DroppedID.String()) + mgr.DropNeighbor(ev.DroppedID) })) selection.Events.IncomingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) { gossipService := ev.Peer.Services().Get(service.GossipKey) if gossipService != nil { - log.Debug("accepted neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String()) + log.Info("accepted neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String()) //address, port, _ := net.SplitHostPort(ev.Peer.Services().Get(service.GossipKey).String()) - AddInbound(ev.Peer) + go mgr.AddInbound(ev.Peer) } })) selection.Events.OutgoingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) { gossipService := ev.Peer.Services().Get(service.GossipKey) if gossipService != nil { - log.Debug("chosen neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String()) + log.Info("chosen neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String()) //address, port, _ := net.SplitHostPort(ev.Peer.Services().Get(service.GossipKey).String()) - AddOutbound(ev.Peer) + go mgr.AddOutbound(ev.Peer) } })) - // mgr.Events.NewTransaction.Attach(events.NewClosure(func(ev *gp.NewTransactionEvent) { - // tx := ev.Body - // metaTx := meta_transaction.FromBytes(tx) - // Events.NewTransaction.Trigger(metaTx) - // })) - tangle.Events.TransactionSolid.Attach(events.NewClosure(func(tx *meta_transaction.MetaTransaction) { t := &pb.Transaction{ Body: tx.GetBytes(), @@ -91,6 +85,6 @@ func configureEvents() { if err != nil { return } - SendTransaction(b) + go SendTransaction(b) })) } diff --git a/plugins/gossip/plugin.go b/plugins/gossip/plugin.go index 2e6a902f914edb05efaae8f051096b4d4d8ec1b7..3cfadd2416c3af6bce48797069c374cba6746f3d 100644 --- a/plugins/gossip/plugin.go +++ b/plugins/gossip/plugin.go @@ -1,10 +1,10 @@ package gossip import ( - "github.com/iotaledger/hive.go/logger" - "github.com/iotaledger/hive.go/node" "github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/events" + "github.com/iotaledger/hive.go/logger" + "github.com/iotaledger/hive.go/node" ) var PLUGIN = node.NewPlugin("Gossip", node.Enabled, configure, run) diff --git a/plugins/metrics/plugin.go b/plugins/metrics/plugin.go index b379f012f359f706a3050cd29c5fc7517e08abee..f51ac06b8914cac26bdf5a5fe8a46ab3e65a1ecd 100644 --- a/plugins/metrics/plugin.go +++ b/plugins/metrics/plugin.go @@ -14,7 +14,7 @@ var PLUGIN = node.NewPlugin("Metrics", node.Enabled, configure, run) func configure(plugin *node.Plugin) { // increase received TPS counter whenever we receive a new transaction - gossip.Event.NewTransaction.Attach(events.NewClosure(func(_ *gossip.NewTransactionEvent) { increaseReceivedTPSCounter() })) + gossip.Events.NewTransaction.Attach(events.NewClosure(func(_ *gossip.NewTransactionEvent) { increaseReceivedTPSCounter() })) } func run(plugin *node.Plugin) { diff --git a/plugins/statusscreen-tps/plugin.go b/plugins/statusscreen-tps/plugin.go index c05fedb2baf6bb9e34a32c5db7337d06c281361d..aac5ceff6e9c2221679a0d739a9a20064c86bf82 100644 --- a/plugins/statusscreen-tps/plugin.go +++ b/plugins/statusscreen-tps/plugin.go @@ -23,7 +23,7 @@ var receivedTps uint64 var solidTps uint64 var PLUGIN = node.NewPlugin("Statusscreen TPS", node.Enabled, func(plugin *node.Plugin) { - gossip.Event.NewTransaction.Attach(events.NewClosure(func(_ *gossip.NewTransactionEvent) { + gossip.Events.NewTransaction.Attach(events.NewClosure(func(_ *gossip.NewTransactionEvent) { atomic.AddUint64(&receivedTpsCounter, 1) })) diff --git a/plugins/tangle/solidifier.go b/plugins/tangle/solidifier.go index 899764fcfa8958a1e620cfccdc519f9a750d37d0..6e7b19912cff4b8197d599dad358241b026a773e 100644 --- a/plugins/tangle/solidifier.go +++ b/plugins/tangle/solidifier.go @@ -27,7 +27,7 @@ func configureSolidifier(plugin *node.Plugin) { task.Return(nil) }, workerpool.WorkerCount(WORKER_COUNT), workerpool.QueueSize(10000)) - gossip.Event.NewTransaction.Attach(events.NewClosure(func(ev *gossip.NewTransactionEvent) { + gossip.Events.NewTransaction.Attach(events.NewClosure(func(ev *gossip.NewTransactionEvent) { tx := ev.Body metaTx := meta_transaction.FromBytes(tx) workerPool.Submit(metaTx) diff --git a/plugins/ui/ui.go b/plugins/ui/ui.go index 66ef3099417a8d4ecf659b74f06d9ca0a1600958..555271ecb32b436d6241bb01ad44c9ffb7d9cd37 100644 --- a/plugins/ui/ui.go +++ b/plugins/ui/ui.go @@ -34,7 +34,7 @@ func configure(plugin *node.Plugin) { return c.JSON(http.StatusOK, tpsQueue) }) - gossip.Event.NewTransaction.Attach(events.NewClosure(func(_ *gossip.NewTransactionEvent) { + gossip.Events.NewTransaction.Attach(events.NewClosure(func(_ *gossip.NewTransactionEvent) { atomic.AddUint64(&receivedTpsCounter, 1) })) tangle.Events.TransactionSolid.Attach(events.NewClosure(func(_ *value_transaction.ValueTransaction) {