Skip to content
Snippets Groups Projects
Commit a8c0a045 authored by capossele's avatar capossele
Browse files

:sparkles: adds new gossip

parent 821e9fb6
Branches
Tags
No related merge requests found
Showing
with 536 additions and 470 deletions
......@@ -3,8 +3,6 @@ module github.com/iotaledger/goshimmer
go 1.13
require (
github.com/StabbyCutyou/buffstreams v2.0.0+incompatible
github.com/capossele/gossip v0.0.0-20191205112840-0e578079b414
github.com/dgraph-io/badger v1.6.0
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/gdamore/tcell v1.3.0
......@@ -12,8 +10,8 @@ require (
github.com/golang/protobuf v1.3.2
github.com/google/open-location-code/go v0.0.0-20190903173953-119bc96a3a51
github.com/gorilla/websocket v1.4.1
github.com/iotaledger/autopeering-sim v0.0.0-20191203092805-a1dd5954f3f6
github.com/iotaledger/hive.go v0.0.0-20191202111738-357cee7a1c37
github.com/iotaledger/autopeering-sim v0.0.0-20191206120939-725ee12834dd
github.com/iotaledger/hive.go v0.0.0-20191206003239-3231c4584e5c
github.com/iotaledger/iota.go v1.0.0-beta.10
github.com/labstack/echo v3.3.10+incompatible
github.com/lucasb-eyer/go-colorful v1.0.3 // indirect
......@@ -32,10 +30,10 @@ require (
go.uber.org/zap v1.13.0
golang.org/x/crypto v0.0.0-20191122220453-ac88ee75c92c
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f // indirect
golang.org/x/net v0.0.0-20191126235420-ef20fe5d7933
golang.org/x/net v0.0.0-20191206103017-1ddd1de85cb0
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e // indirect
golang.org/x/sys v0.0.0-20191128015809-6d18c012aee9 // indirect
golang.org/x/tools v0.0.0-20191203134012-c197fd4bf371 // indirect
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e // indirect
golang.org/x/tools v0.0.0-20191205225056-3393d29bb9fe // indirect
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898 // indirect
gopkg.in/yaml.v2 v2.2.7 // indirect
)
cloud.google.com/go v0.26.0 h1:e0WKqKTd5BnrG8aKH3J3h+QvEIQtSUcf2n5UZ5ZgLtQ=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 h1:cTp8I5+VIoKjsnZuH8vjyaysT/ses3EvZeaV/1UkF2M=
......@@ -8,8 +9,6 @@ github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q
github.com/GeertJohan/go.incremental v1.0.0/go.mod h1:6fAjUhbVuX1KcMD3c8TEgVUqmo4seqhv0i0kdATSkM0=
github.com/GeertJohan/go.rice v1.0.0/go.mod h1:eH6gbSOAUv07dQuZVnBmoDP8mgsM1rtixis4Tib9if0=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/StabbyCutyou/buffstreams v2.0.0+incompatible h1:7bDqOlL2Ipve3YjZVVrWz/TOVOa+IEHj6XagpAtLn2I=
github.com/StabbyCutyou/buffstreams v2.0.0+incompatible/go.mod h1:gP6wgxHoC0NrobhUBwx+Y6hx2QyKFOT+ho8619aJCDk=
github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
......@@ -18,10 +17,6 @@ github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5
github.com/beevik/ntp v0.2.0/go.mod h1:hIHWr+l3+/clUnF44zdK+CWW7fO8dR5cIylAQ76NRpg=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/capossele/gossip v0.0.0-20191204191545-36eddf08c1aa h1:46C1Ce+93zGZ+JJ4JUw3EuXHQXqKYzIX7+CRG0fweNk=
github.com/capossele/gossip v0.0.0-20191204191545-36eddf08c1aa/go.mod h1:P8rJEmorO5QpCL236F8vNhUFwFFjezt2d/+/LeRlELA=
github.com/capossele/gossip v0.0.0-20191205112840-0e578079b414 h1:C9Q279xU15Qt5WVZNH6t/1L7exbTVYp1uMRS9NSmL/U=
github.com/capossele/gossip v0.0.0-20191205112840-0e578079b414/go.mod h1:DnYLNZclq7cY6s2oA6wwhQ4tDB0j38enJHPrhpzOpJc=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
......@@ -99,18 +94,17 @@ github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/iotaledger/autopeering-sim v0.0.0-20191202124431-1705dc628175 h1:/RAxj+VEPTykp9cWRKTtydkfuBDaFemI7/bdNYCD7Nw=
github.com/iotaledger/autopeering-sim v0.0.0-20191202124431-1705dc628175/go.mod h1:JiaqaxLkQVnd8e/sya9y/LlRW56WlRKRl2TQXQCVssI=
github.com/iotaledger/autopeering-sim v0.0.0-20191202192349-f8e7a238c2bb/go.mod h1:JiaqaxLkQVnd8e/sya9y/LlRW56WlRKRl2TQXQCVssI=
github.com/iotaledger/autopeering-sim v0.0.0-20191202212322-a6353b992662 h1:al3bYvSaJnFyupe7bqgXYeBZ7W0fTNWrFd9WLLNlC6I=
github.com/iotaledger/autopeering-sim v0.0.0-20191202212322-a6353b992662/go.mod h1:JiaqaxLkQVnd8e/sya9y/LlRW56WlRKRl2TQXQCVssI=
github.com/iotaledger/autopeering-sim v0.0.0-20191203092805-a1dd5954f3f6 h1:lcM/irmEr++tz/XQCHCTBsteqiBVZ3uTurLnnviCxLg=
github.com/iotaledger/autopeering-sim v0.0.0-20191203092805-a1dd5954f3f6/go.mod h1:JiaqaxLkQVnd8e/sya9y/LlRW56WlRKRl2TQXQCVssI=
github.com/iotaledger/autopeering-sim v0.0.0-20191206120939-725ee12834dd h1:CTHnqb0UdC+EwHBn20TeGvYj5F44zoZTfPOX/vEmSzQ=
github.com/iotaledger/autopeering-sim v0.0.0-20191206120939-725ee12834dd/go.mod h1:JiaqaxLkQVnd8e/sya9y/LlRW56WlRKRl2TQXQCVssI=
github.com/iotaledger/goshimmer v0.0.0-20191113134331-c2d1b2f9d533/go.mod h1:7vYiofXphp9+PkgVAEM0pvw3aoi4ksrZ7lrEgX50XHs=
github.com/iotaledger/hive.go v0.0.0-20191118130432-89eebe8fe8eb h1:nuS/LETRJ8obUyBIZeyxeei0ZPlyOMj8YPziOgSM4Og=
github.com/iotaledger/hive.go v0.0.0-20191118130432-89eebe8fe8eb/go.mod h1:1Thhlil4lHzuy53EVvmEbEvWBFY0Tasp4kCBfxBCPIk=
github.com/iotaledger/hive.go v0.0.0-20191202111738-357cee7a1c37 h1:Vex6W5Oae7xXvVmnCrl7J4o+PCx0FW3paMzXxQDr8H4=
github.com/iotaledger/hive.go v0.0.0-20191202111738-357cee7a1c37/go.mod h1:1Thhlil4lHzuy53EVvmEbEvWBFY0Tasp4kCBfxBCPIk=
github.com/iotaledger/hive.go v0.0.0-20191206003239-3231c4584e5c h1:kmx6rRpMkeO+5gqj8ngv1+0Z7MGxulV3SCP2SZn/mU4=
github.com/iotaledger/hive.go v0.0.0-20191206003239-3231c4584e5c/go.mod h1:7iqun29a1x0lymTrn0UJ3Z/yy0sUzUpoOZ1OYMrYN20=
github.com/iotaledger/iota.go v1.0.0-beta.7/go.mod h1:dMps6iMVU1pf5NDYNKIw4tRsPeC8W3ZWjOvYHOO1PMg=
github.com/iotaledger/iota.go v1.0.0-beta.9 h1:c654s9pkdhMBkABUvWg+6k91MEBbdtmZXP1xDfQpajg=
github.com/iotaledger/iota.go v1.0.0-beta.9/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8=
......@@ -286,6 +280,8 @@ golang.org/x/net v0.0.0-20191119073136-fc4aabc6c914 h1:MlY3mEfbnWGmUi4rtHOtNnnnN
golang.org/x/net v0.0.0-20191119073136-fc4aabc6c914/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191126235420-ef20fe5d7933 h1:e6HwijUxhDe+hPNjZQQn9bA5PW3vNmnN64U2ZW759Lk=
golang.org/x/net v0.0.0-20191126235420-ef20fe5d7933/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191206103017-1ddd1de85cb0 h1:LxY/gQN/MrcW24/46nLyiip1GhN/Yi14QPbeNskTvQA=
golang.org/x/net v0.0.0-20191206103017-1ddd1de85cb0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
......@@ -315,8 +311,8 @@ golang.org/x/sys v0.0.0-20191018095205-727590c5006e h1:ZtoklVMHQy6BFRHkbG6JzK+S6
golang.org/x/sys v0.0.0-20191018095205-727590c5006e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e h1:N7DeIrjYszNmSW409R3frPPwglRwMkXSBzwVbkOjLLA=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191128015809-6d18c012aee9 h1:ZBzSG/7F4eNKz2L3GE9o300RX0Az1Bw5HF7PDraD+qU=
golang.org/x/sys v0.0.0-20191128015809-6d18c012aee9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e h1:9vRrk9YW2BTzLP0VCB9ZDjU4cPqkg+IDWL7XgxA1yxQ=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
......@@ -331,19 +327,19 @@ golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191121040551-947d4aa89328 h1:t3X42h9e6xdbrCD/gPyWqAXr2BEpdJqRd1brThaaxII=
golang.org/x/tools v0.0.0-20191121040551-947d4aa89328/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191130070609-6e064ea0cf2d h1:/iIZNFGxc/a7C3yWjGcnboV+Tkc7mxr+p6fDztwoxuM=
golang.org/x/tools v0.0.0-20191130070609-6e064ea0cf2d/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191202203127-2b6af5f9ace7 h1:I7bfRTrfnb7yQSesz6OhwGVh2imeNUcbbS8YtFYC8Ck=
golang.org/x/tools v0.0.0-20191202203127-2b6af5f9ace7/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191203134012-c197fd4bf371 h1:Cjq6sG3gnKDchzWy7ouGQklhxMtWvh4AhSNJ0qGIeo4=
golang.org/x/tools v0.0.0-20191203134012-c197fd4bf371/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191205225056-3393d29bb9fe h1:BEVcKURC7E0EF+vD1l52Jb3LOM5Iwu7OI5FpdPuU50o=
golang.org/x/tools v0.0.0-20191205225056-3393d29bb9fe/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898 h1:/atklqdjdhuosWIl6AIbOeHJjicWYPqR9bpxqxYG2pA=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.21.0 h1:G+97AoqBnmZIT91cLG/EkCoK9NSelj64P8bOHHNmGn0=
google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
......
......@@ -23,6 +23,15 @@ import (
)
func main() {
// go func() {
// if err := profiler.Start(profiler.Config{
// Service: "race-service",
// ServiceVersion: "1.0",
// ProjectID: "premium-canyon-232915", // optional on GCP
// }); err != nil {
// log.Fatalf("Cannot start the profiler: %v", err)
// }
// }()
node.Run(
cli.PLUGIN,
autopeering.PLUGIN,
......
package gossip
import "github.com/pkg/errors"
var (
ErrClosed = errors.New("manager closed")
ErrDuplicateNeighbor = errors.New("peer already connected")
)
......@@ -6,9 +6,13 @@ import (
)
// Events contains all the events that are triggered during the gossip protocol.
type Events struct {
var Events = struct {
// A NewTransaction event is triggered when a new transaction is received by the gossip protocol.
NewTransaction *events.Event
DropNeighbor *events.Event
}{
NewTransaction: events.NewEvent(newTransaction),
DropNeighbor: events.NewEvent(dropNeighbor),
}
type NewTransactionEvent struct {
......
package gossip
import (
"io"
"net"
"strings"
"sync"
"github.com/golang/protobuf/proto"
"github.com/iotaledger/autopeering-sim/peer"
"github.com/iotaledger/goshimmer/packages/gossip/neighbor"
pb "github.com/iotaledger/goshimmer/packages/gossip/proto"
"github.com/iotaledger/goshimmer/packages/gossip/transport"
"github.com/iotaledger/hive.go/events"
"github.com/pkg/errors"
"go.uber.org/zap"
)
const (
maxAttempts = 3
)
var (
Event Events
maxConnectionAttempts = 3
maxPacketSize = 2048
)
type GetTransaction func(txHash []byte) ([]byte, error)
type Manager struct {
neighborhood *neighbor.NeighborMap
trans *transport.TransportTCP
trans *transport.TCP
log *zap.SugaredLogger
getTransaction GetTransaction
Events Events
wg sync.WaitGroup
mu sync.RWMutex
neighbors map[peer.ID]*neighbor
running bool
}
type neighbor struct {
peer *peer.Peer
conn *transport.Connection
}
func NewManager(t *transport.TransportTCP, log *zap.SugaredLogger, f GetTransaction) *Manager {
mgr := &Manager{
neighborhood: neighbor.NewMap(),
func NewManager(t *transport.TCP, log *zap.SugaredLogger, f GetTransaction) *Manager {
m := &Manager{
trans: t,
log: log,
getTransaction: f,
Events: Events{
NewTransaction: events.NewEvent(newTransaction),
DropNeighbor: events.NewEvent(dropNeighbor)},
neighbors: make(map[peer.ID]*neighbor),
}
Event = mgr.Events
return mgr
m.running = true
return m
}
func (m *Manager) AddOutbound(p *peer.Peer) error {
......@@ -53,41 +57,78 @@ func (m *Manager) AddInbound(p *peer.Peer) error {
return m.addNeighbor(p, m.trans.AcceptPeer)
}
func (m *Manager) DropNeighbor(id peer.ID) {
m.deleteNeighbor(id)
func (m *Manager) DropNeighbor(p peer.ID) {
m.deleteNeighbor(p)
}
func (m *Manager) RequestTransaction(data []byte, to ...*neighbor.Neighbor) {
req := &pb.TransactionRequest{}
err := proto.Unmarshal(data, req)
if err != nil {
m.log.Warnw("Data to send is not a Transaction Request", "err", err)
func (m *Manager) Close() {
m.mu.Lock()
m.running = false
// close all connections
for _, n := range m.neighbors {
_ = n.conn.Close()
}
msg := marshal(req)
m.mu.Unlock()
m.send(msg, to...)
m.wg.Wait()
}
func (m *Manager) Send(data []byte, to ...*neighbor.Neighbor) {
tx := &pb.Transaction{}
err := proto.Unmarshal(data, tx)
if err != nil {
m.log.Warnw("Data to send is not a Transaction", "err", err)
func (m *Manager) getNeighbors(ids ...peer.ID) []*neighbor {
if len(ids) > 0 {
return m.getNeighborsById(ids)
}
return m.getAllNeighbors()
}
func (m *Manager) getAllNeighbors() []*neighbor {
m.mu.Lock()
result := make([]*neighbor, 0, len(m.neighbors))
for _, n := range m.neighbors {
result = append(result, n)
}
m.mu.Unlock()
return result
}
func (m *Manager) getNeighborsById(ids []peer.ID) []*neighbor {
result := make([]*neighbor, 0, len(ids))
m.mu.RLock()
for _, id := range ids {
if n, ok := m.neighbors[id]; ok {
result = append(result, n)
}
}
m.mu.RUnlock()
return result
}
func (m *Manager) RequestTransaction(txHash []byte, to ...peer.ID) {
req := &pb.TransactionRequest{
Hash: txHash,
}
m.send(marshal(req), to...)
}
msg := marshal(tx)
m.send(msg, to...)
// SendTransaction sends the transaction data to the given neighbors.
func (m *Manager) SendTransaction(txData []byte, to ...peer.ID) {
tx := &pb.Transaction{
Body: txData,
}
m.send(marshal(tx), to...)
}
func (m *Manager) send(msg []byte, to ...*neighbor.Neighbor) {
neighbors := m.neighborhood.GetSlice()
if to != nil {
neighbors = to
func (m *Manager) send(msg []byte, to ...peer.ID) {
if l := len(msg); l > maxPacketSize {
m.log.Errorw("message too large", "len", l, "max", maxPacketSize)
}
neighbors := m.getNeighbors(to...)
for _, neighbor := range neighbors {
m.log.Debugw("Sending", "to", neighbor.Peer.ID().String(), "msg", msg)
err := neighbor.Conn.Write(msg)
for _, nbr := range neighbors {
m.log.Debugw("Sending", "to", nbr.peer.ID(), "msg", msg)
_, err := nbr.conn.Write(msg)
if err != nil {
m.log.Debugw("send error", "err", err)
}
......@@ -95,67 +136,89 @@ func (m *Manager) send(msg []byte, to ...*neighbor.Neighbor) {
}
func (m *Manager) addNeighbor(peer *peer.Peer, handshake func(*peer.Peer) (*transport.Connection, error)) error {
if _, ok := m.neighborhood.Load(peer.ID().String()); ok {
return errors.New("Neighbor already added")
}
var err error
var conn *transport.Connection
i := 0
for i = 0; i < maxAttempts; i++ {
var (
err error
conn *transport.Connection
)
for i := 0; i < maxConnectionAttempts; i++ {
conn, err = handshake(peer)
if err != nil {
m.log.Warnw("Connection attempt failed", "attempt", i+1)
} else {
if err == nil {
break
}
}
if i == maxAttempts {
m.log.Warnw("Connection failed to", "peer", peer.ID().String())
m.Events.DropNeighbor.Trigger(&DropNeighborEvent{Peer: peer})
// could not add neighbor
if err != nil {
m.log.Debugw("addNeighbor failed", "peer", peer.ID(), "err", err)
Events.DropNeighbor.Trigger(&DropNeighborEvent{Peer: peer})
return err
}
// add the new neighbor
neighbor := neighbor.New(peer, conn)
m.neighborhood.Store(peer.ID().String(), neighbor)
m.mu.Lock()
defer m.mu.Unlock()
if !m.running {
disconnect(conn)
return ErrClosed
}
if _, ok := m.neighbors[peer.ID()]; ok {
disconnect(conn)
return ErrDuplicateNeighbor
}
// start listener for the new neighbor
go m.readLoop(neighbor)
// add the neighbor
n := &neighbor{
peer: peer,
conn: conn,
}
m.neighbors[peer.ID()] = n
go m.readLoop(n)
return nil
}
func (m *Manager) deleteNeighbor(id peer.ID) {
m.log.Debugw("Deleting neighbor", "neighbor", id.String())
p, ok := m.neighborhood.Delete(id.String())
if ok {
m.Events.DropNeighbor.Trigger(&DropNeighborEvent{Peer: p.Peer})
func (m *Manager) deleteNeighbor(peer peer.ID) {
m.mu.Lock()
defer m.mu.Unlock()
if _, ok := m.neighbors[peer]; !ok {
return
}
n := m.neighbors[peer]
delete(m.neighbors, peer)
disconnect(n.conn)
}
func (m *Manager) readLoop(neighbor *neighbor.Neighbor) {
func (m *Manager) readLoop(nbr *neighbor) {
m.wg.Add(1)
defer m.wg.Done()
// create a buffer for the packages
b := make([]byte, maxPacketSize)
for {
data, err := neighbor.Conn.Read()
n, err := nbr.conn.Read(b)
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
// ignore temporary read errors.
//m.log.Debugw("temporary read error", "err", err)
m.log.Debugw("temporary read error", "err", err)
continue
} else if err != nil {
// return from the loop on all other errors
m.log.Debugw("reading stopped")
m.deleteNeighbor(neighbor.Peer.ID())
if err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") {
m.log.Warnw("read error", "err", err)
}
_ = nbr.conn.Close() // just make sure that the connection is closed as fast as possible
m.deleteNeighbor(nbr.peer.ID())
m.log.Debug("reading stopped")
return
}
if err := m.handlePacket(data, neighbor); err != nil {
m.log.Warnw("failed to handle packet", "from", neighbor.Peer.ID().String(), "err", err)
if err := m.handlePacket(b[:n], nbr); err != nil {
m.log.Warnw("failed to handle packet", "id", nbr.peer.ID(), "err", err)
}
}
}
func (m *Manager) handlePacket(data []byte, neighbor *neighbor.Neighbor) error {
func (m *Manager) handlePacket(data []byte, n *neighbor) error {
switch pb.MType(data[0]) {
// Incoming Transaction
......@@ -165,7 +228,7 @@ func (m *Manager) handlePacket(data []byte, neighbor *neighbor.Neighbor) error {
return errors.Wrap(err, "invalid message")
}
m.log.Debugw("Received Transaction", "data", msg.GetBody())
m.Events.NewTransaction.Trigger(&NewTransactionEvent{Body: msg.GetBody(), Peer: neighbor.Peer})
Events.NewTransaction.Trigger(&NewTransactionEvent{Body: msg.GetBody(), Peer: n.peer})
// Incoming Transaction request
case pb.MTransactionRequest:
......@@ -180,11 +243,10 @@ func (m *Manager) handlePacket(data []byte, neighbor *neighbor.Neighbor) error {
m.log.Debugw("Tx not available", "tx", msg.GetHash())
} else {
m.log.Debugw("Tx found", "tx", tx)
m.Send(tx, neighbor)
m.SendTransaction(tx, n.peer.ID())
}
default:
return nil
}
return nil
......@@ -202,3 +264,8 @@ func marshal(msg pb.Message) []byte {
}
return append([]byte{byte(mType)}, data...)
}
func disconnect(conn *transport.Connection) {
_ = conn.Close()
Events.DropNeighbor.Trigger(&DropNeighborEvent{Peer: conn.Peer()})
}
......@@ -13,13 +13,32 @@ import (
"github.com/iotaledger/goshimmer/packages/gossip/transport"
"github.com/iotaledger/hive.go/events"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
const graceTime = 5 * time.Millisecond
const graceTime = 10 * time.Millisecond
var logger *zap.SugaredLogger
var (
logger *zap.SugaredLogger
eventMock mock.Mock
testTxData = []byte("testTx")
)
func newTransactionEvent(ev *NewTransactionEvent) { eventMock.Called(ev) }
func dropNeighborEvent(ev *DropNeighborEvent) { eventMock.Called(ev) }
// assertEvents initializes the mock and asserts the expectations
func assertEvents(t *testing.T) func() {
eventMock = mock.Mock{}
return func() {
if !t.Failed() {
eventMock.AssertExpectations(t)
}
}
}
func init() {
l, err := zap.NewDevelopment()
......@@ -27,31 +46,33 @@ func init() {
log.Fatalf("cannot initialize logger: %v", err)
}
logger = l.Sugar()
// mock the events
Events.NewTransaction.Attach(events.NewClosure(newTransactionEvent))
Events.DropNeighbor.Attach(events.NewClosure(dropNeighborEvent))
}
func testGetTransaction([]byte) ([]byte, error) {
tx := &pb.TransactionRequest{
Hash: []byte("testTx"),
}
b, _ := proto.Marshal(tx)
return b, nil
func getTestTransaction([]byte) ([]byte, error) {
return testTxData, nil
}
func newTest(t require.TestingT, name string) (*Manager, func(), *peer.Peer) {
log := logger.Named(name)
db := peer.NewMemoryDB(log.Named("db"))
l := logger.Named(name)
db := peer.NewMemoryDB(l.Named("db"))
local, err := peer.NewLocal("peering", name, db)
require.NoError(t, err)
require.NoError(t, local.UpdateService(service.GossipKey, "tcp", "localhost:0"))
trans, err := transport.Listen(local, log)
trans, err := transport.Listen(local, l)
require.NoError(t, err)
mgr := NewManager(trans, log, testGetTransaction)
mgr := NewManager(trans, l, getTestTransaction)
// update the service with the actual address
require.NoError(t, local.UpdateService(service.GossipKey, trans.LocalAddr().Network(), trans.LocalAddr().String()))
teardown := func() {
mgr.Close()
trans.Close()
db.Close()
}
......@@ -59,11 +80,15 @@ func newTest(t require.TestingT, name string) (*Manager, func(), *peer.Peer) {
}
func TestClose(t *testing.T) {
defer assertEvents(t)()
_, teardown, _ := newTest(t, "A")
teardown()
}
func TestUnicast(t *testing.T) {
func TestClosedConnection(t *testing.T) {
defer assertEvents(t)()
mgrA, closeA, peerA := newTest(t, "A")
defer closeA()
mgrB, closeB, peerB := newTest(t, "B")
......@@ -72,6 +97,8 @@ func TestUnicast(t *testing.T) {
var wg sync.WaitGroup
wg.Add(2)
// connect in the following way
// B -> A
go func() {
defer wg.Done()
err := mgrA.addNeighbor(peerB, mgrA.trans.AcceptPeer)
......@@ -87,25 +114,100 @@ func TestUnicast(t *testing.T) {
// wait for the connections to establish
wg.Wait()
tx := &pb.Transaction{Body: []byte("Hello!")}
eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerA}).Once()
eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerB}).Once()
triggered := make(chan struct{}, 1)
mgrB.Events.NewTransaction.Attach(events.NewClosure(func(ev *NewTransactionEvent) {
require.Empty(t, triggered) // only once
assert.Equal(t, tx.GetBody(), ev.Body)
assert.Equal(t, peerA, ev.Peer)
triggered <- struct{}{}
}))
// A drops B
mgrA.deleteNeighbor(peerB.ID())
time.Sleep(graceTime)
b, err := proto.Marshal(tx)
require.NoError(t, err)
mgrA.Send(b)
// the events should be there even before we close
eventMock.AssertExpectations(t)
}
func TestP2PSend(t *testing.T) {
defer assertEvents(t)()
mgrA, closeA, peerA := newTest(t, "A")
defer closeA()
mgrB, closeB, peerB := newTest(t, "B")
defer closeB()
var wg sync.WaitGroup
wg.Add(2)
// connect in the following way
// B -> A
go func() {
defer wg.Done()
err := mgrA.addNeighbor(peerB, mgrA.trans.AcceptPeer)
assert.NoError(t, err)
}()
time.Sleep(graceTime)
go func() {
defer wg.Done()
err := mgrB.addNeighbor(peerA, mgrB.trans.DialPeer)
assert.NoError(t, err)
}()
// wait for the connections to establish
wg.Wait()
eventMock.On("newTransactionEvent", &NewTransactionEvent{
Body: testTxData,
Peer: peerA,
}).Once()
eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerA}).Once()
eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerB}).Once()
mgrA.SendTransaction(testTxData)
time.Sleep(graceTime)
}
func TestP2PSendTwice(t *testing.T) {
defer assertEvents(t)()
// eventually the event should be triggered
assert.Eventually(t, func() bool { return len(triggered) >= 1 }, time.Second, 10*time.Millisecond)
mgrA, closeA, peerA := newTest(t, "A")
defer closeA()
mgrB, closeB, peerB := newTest(t, "B")
defer closeB()
var wg sync.WaitGroup
wg.Add(2)
// connect in the following way
// B -> A
go func() {
defer wg.Done()
err := mgrA.addNeighbor(peerB, mgrA.trans.AcceptPeer)
assert.NoError(t, err)
}()
time.Sleep(graceTime)
go func() {
defer wg.Done()
err := mgrB.addNeighbor(peerA, mgrB.trans.DialPeer)
assert.NoError(t, err)
}()
// wait for the connections to establish
wg.Wait()
eventMock.On("newTransactionEvent", &NewTransactionEvent{
Body: testTxData,
Peer: peerA,
}).Twice()
eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerA}).Once()
eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerB}).Once()
mgrA.SendTransaction(testTxData)
time.Sleep(1 * time.Second) // wait a bit between the sends, to test timeouts
mgrA.SendTransaction(testTxData)
time.Sleep(graceTime)
}
func TestBroadcast(t *testing.T) {
defer assertEvents(t)()
mgrA, closeA, peerA := newTest(t, "A")
defer closeA()
mgrB, closeB, peerB := newTest(t, "B")
......@@ -116,6 +218,8 @@ func TestBroadcast(t *testing.T) {
var wg sync.WaitGroup
wg.Add(4)
// connect in the following way
// B -> A <- C
go func() {
defer wg.Done()
err := mgrA.addNeighbor(peerB, mgrA.trans.AcceptPeer)
......@@ -141,56 +245,37 @@ func TestBroadcast(t *testing.T) {
// wait for the connections to establish
wg.Wait()
tx := &pb.Transaction{Body: []byte("Hello!")}
triggeredB := make(chan struct{}, 1)
mgrB.Events.NewTransaction.Attach(events.NewClosure(func(ev *NewTransactionEvent) {
require.Empty(t, triggeredB) // only once
assert.Equal(t, tx.GetBody(), ev.Body)
assert.Equal(t, peerA, ev.Peer)
triggeredB <- struct{}{}
}))
triggeredC := make(chan struct{}, 1)
mgrC.Events.NewTransaction.Attach(events.NewClosure(func(ev *NewTransactionEvent) {
require.Empty(t, triggeredC) // only once
assert.Equal(t, tx.GetBody(), ev.Body)
assert.Equal(t, peerA, ev.Peer)
triggeredC <- struct{}{}
}))
b, err := proto.Marshal(tx)
assert.NoError(t, err)
mgrA.Send(b)
eventMock.On("newTransactionEvent", &NewTransactionEvent{
Body: testTxData,
Peer: peerA,
}).Twice()
eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerA}).Twice()
eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerB}).Once()
eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerC}).Once()
// eventually the events should be triggered
success := func() bool {
return len(triggeredB) >= 1 && len(triggeredC) >= 1
}
assert.Eventually(t, success, time.Second, 10*time.Millisecond)
mgrA.SendTransaction(testTxData)
time.Sleep(graceTime)
}
func TestDropUnsuccessfulAccept(t *testing.T) {
defer assertEvents(t)()
mgrA, closeA, _ := newTest(t, "A")
defer closeA()
_, closeB, peerB := newTest(t, "B")
defer closeB()
triggered := make(chan struct{}, 1)
mgrA.Events.DropNeighbor.Attach(events.NewClosure(func(ev *DropNeighborEvent) {
require.Empty(t, triggered) // only once
assert.Equal(t, peerB, ev.Peer)
triggered <- struct{}{}
}))
eventMock.On("dropNeighborEvent", &DropNeighborEvent{
Peer: peerB,
}).Once()
err := mgrA.addNeighbor(peerB, mgrA.trans.AcceptPeer)
assert.Error(t, err)
// eventually the event should be triggered
assert.Eventually(t, func() bool { return len(triggered) >= 1 }, time.Second, 10*time.Millisecond)
}
func TestTxRequest(t *testing.T) {
defer assertEvents(t)()
mgrA, closeA, peerA := newTest(t, "A")
defer closeA()
mgrB, closeB, peerB := newTest(t, "B")
......@@ -199,48 +284,32 @@ func TestTxRequest(t *testing.T) {
var wg sync.WaitGroup
wg.Add(2)
// connect in the following way
// B -> A
go func() {
defer wg.Done()
err := mgrA.addNeighbor(peerB, mgrA.trans.AcceptPeer)
assert.NoError(t, err)
logger.Debugw("Len", "len", mgrA.neighborhood.Len())
}()
time.Sleep(graceTime)
go func() {
defer wg.Done()
err := mgrB.addNeighbor(peerA, mgrB.trans.DialPeer)
assert.NoError(t, err)
logger.Debugw("Len", "len", mgrB.neighborhood.Len())
}()
// wait for the connections to establish
wg.Wait()
tx := &pb.TransactionRequest{
Hash: []byte("Hello!"),
}
b, err := proto.Marshal(tx)
assert.NoError(t, err)
sendChan := make(chan struct{})
sendSuccess := false
mgrA.Events.NewTransaction.Attach(events.NewClosure(func(ev *NewTransactionEvent) {
logger.Debugw("New TX Event triggered", "data", ev.Body, "from", ev.Peer.ID().String())
assert.Equal(t, []byte("testTx"), ev.Body)
assert.Equal(t, peerB, ev.Peer)
sendChan <- struct{}{}
}))
eventMock.On("newTransactionEvent", &NewTransactionEvent{
Body: testTxData,
Peer: peerB,
}).Once()
eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerA}).Once()
eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerB}).Once()
b, err := proto.Marshal(&pb.TransactionRequest{Hash: []byte("Hello!")})
require.NoError(t, err)
mgrA.RequestTransaction(b)
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
select {
case <-sendChan:
sendSuccess = true
case <-timer.C:
sendSuccess = false
}
assert.True(t, sendSuccess)
time.Sleep(graceTime)
}
package neighbor
import (
"sync"
"github.com/iotaledger/autopeering-sim/peer"
"github.com/iotaledger/goshimmer/packages/gossip/transport"
)
// Neighbor defines a neighbor
type Neighbor struct {
Peer *peer.Peer
Conn *transport.Connection
}
// NeighborMap implements a map of neighbors thread safe
type NeighborMap struct {
sync.RWMutex
internal map[string]*Neighbor
}
// NewMap returns a new NeighborMap
func NewMap() *NeighborMap {
return &NeighborMap{
internal: make(map[string]*Neighbor),
}
}
// New returns a new Neighbor
func New(peer *peer.Peer, conn *transport.Connection) *Neighbor {
return &Neighbor{
Peer: peer,
Conn: conn,
}
}
// Len returns the number of neighbors stored in a NeighborMap
func (nm *NeighborMap) Len() int {
nm.RLock()
defer nm.RUnlock()
return len(nm.internal)
}
// GetMap returns the content of the entire internal map
func (nm *NeighborMap) GetMap() map[string]*Neighbor {
newMap := make(map[string]*Neighbor)
nm.RLock()
defer nm.RUnlock()
for k, v := range nm.internal {
newMap[k] = v
}
return newMap
}
// GetSlice returns a slice of the content of the entire internal map
func (nm *NeighborMap) GetSlice() []*Neighbor {
newSlice := make([]*Neighbor, nm.Len())
nm.RLock()
defer nm.RUnlock()
i := 0
for _, v := range nm.internal {
newSlice[i] = v
i++
}
return newSlice
}
// Load returns the neighbor for a given key.
// It also return a bool to communicate the presence of the given
// neighbor into the internal map
func (nm *NeighborMap) Load(key string) (value *Neighbor, ok bool) {
nm.RLock()
defer nm.RUnlock()
result, ok := nm.internal[key]
return result, ok
}
// Delete removes the entire entry for a given key and return true if successful
func (nm *NeighborMap) Delete(key string) (deletedNeighbor *Neighbor, ok bool) {
deletedNeighbor, ok = nm.Load(key)
if !ok {
return nil, false
}
nm.Lock()
defer nm.Unlock()
if deletedNeighbor.Conn != nil {
deletedNeighbor.Conn.Close()
}
delete(nm.internal, key)
return deletedNeighbor, true
}
// Store adds a new neighbor to the NeighborMap
func (nm *NeighborMap) Store(key string, value *Neighbor) {
nm.Lock()
defer nm.Unlock()
nm.internal[key] = value
}
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: proto/message.proto
// source: packages/gossip/proto/message.proto
package proto
......@@ -32,7 +32,7 @@ func (m *Transaction) Reset() { *m = Transaction{} }
func (m *Transaction) String() string { return proto.CompactTextString(m) }
func (*Transaction) ProtoMessage() {}
func (*Transaction) Descriptor() ([]byte, []int) {
return fileDescriptor_33f3a5e1293a7bcd, []int{0}
return fileDescriptor_fcce9e84825f2fa5, []int{0}
}
func (m *Transaction) XXX_Unmarshal(b []byte) error {
......@@ -72,7 +72,7 @@ func (m *TransactionRequest) Reset() { *m = TransactionRequest{} }
func (m *TransactionRequest) String() string { return proto.CompactTextString(m) }
func (*TransactionRequest) ProtoMessage() {}
func (*TransactionRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_33f3a5e1293a7bcd, []int{1}
return fileDescriptor_fcce9e84825f2fa5, []int{1}
}
func (m *TransactionRequest) XXX_Unmarshal(b []byte) error {
......@@ -105,17 +105,20 @@ func init() {
proto.RegisterType((*TransactionRequest)(nil), "proto.TransactionRequest")
}
func init() { proto.RegisterFile("proto/message.proto", fileDescriptor_33f3a5e1293a7bcd) }
var fileDescriptor_33f3a5e1293a7bcd = []byte{
// 139 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x2e, 0x28, 0xca, 0x2f,
0xc9, 0xd7, 0xcf, 0x4d, 0x2d, 0x2e, 0x4e, 0x4c, 0x4f, 0xd5, 0x03, 0xf3, 0x84, 0x58, 0xc1, 0x94,
0x92, 0x22, 0x17, 0x77, 0x48, 0x51, 0x62, 0x5e, 0x71, 0x62, 0x72, 0x49, 0x66, 0x7e, 0x9e, 0x90,
0x10, 0x17, 0x4b, 0x52, 0x7e, 0x4a, 0xa5, 0x04, 0xa3, 0x02, 0xa3, 0x06, 0x4f, 0x10, 0x98, 0xad,
0xa4, 0xc1, 0x25, 0x84, 0xa4, 0x24, 0x28, 0xb5, 0xb0, 0x34, 0xb5, 0xb8, 0x04, 0xa4, 0x32, 0x23,
0xb1, 0x38, 0x03, 0xa6, 0x12, 0xc4, 0x76, 0x52, 0x8e, 0x52, 0x4c, 0xcf, 0x2c, 0xc9, 0x28, 0x4d,
0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x4f, 0x4e, 0x2c, 0xc8, 0x2f, 0x2e, 0x4e, 0xcd, 0x49, 0xd5, 0x4f,
0xcf, 0x2f, 0x2e, 0xce, 0x2c, 0xd0, 0x07, 0xdb, 0x98, 0xc4, 0x06, 0xa6, 0x8c, 0x01, 0x01, 0x00,
0x00, 0xff, 0xff, 0x34, 0x46, 0xa5, 0x0f, 0x96, 0x00, 0x00, 0x00,
func init() {
proto.RegisterFile("packages/gossip/proto/message.proto", fileDescriptor_fcce9e84825f2fa5)
}
var fileDescriptor_fcce9e84825f2fa5 = []byte{
// 157 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x8e, 0x31, 0x0b, 0xc2, 0x30,
0x10, 0x46, 0x29, 0xa8, 0x43, 0x74, 0xca, 0xe4, 0xa8, 0x75, 0xe9, 0xd4, 0x0c, 0x22, 0xee, 0xfe,
0x84, 0xe2, 0xe4, 0x76, 0x49, 0x8f, 0x24, 0x68, 0x7a, 0x31, 0x97, 0x0e, 0xfe, 0x7b, 0x49, 0x40,
0x70, 0xe8, 0xf4, 0xbd, 0x0f, 0xde, 0xf0, 0xc4, 0x29, 0x82, 0x79, 0x82, 0x45, 0x56, 0x96, 0x98,
0x7d, 0x54, 0x31, 0x51, 0x26, 0x15, 0x90, 0x19, 0x2c, 0xf6, 0xf5, 0xc9, 0x75, 0x9d, 0xf6, 0x28,
0xb6, 0xf7, 0x04, 0x13, 0x83, 0xc9, 0x9e, 0x26, 0x29, 0xc5, 0x4a, 0xd3, 0xf8, 0xd9, 0x37, 0x87,
0xa6, 0xdb, 0x0d, 0x95, 0xdb, 0x4e, 0xc8, 0x3f, 0x65, 0xc0, 0xf7, 0x8c, 0x9c, 0x8b, 0xe9, 0x80,
0xdd, 0xcf, 0x2c, 0x7c, 0xbb, 0x3e, 0x2e, 0xd6, 0x67, 0x37, 0xeb, 0xde, 0x50, 0x50, 0x9e, 0x32,
0xbc, 0x70, 0xb4, 0x98, 0x4a, 0x87, 0xf3, 0x21, 0x60, 0x52, 0x8b, 0x69, 0x7a, 0x53, 0xe7, 0xfc,
0x0d, 0x00, 0x00, 0xff, 0xff, 0xd7, 0x71, 0x33, 0x9a, 0xba, 0x00, 0x00, 0x00,
}
......@@ -2,43 +2,28 @@ package transport
import (
"net"
"time"
"github.com/iotaledger/autopeering-sim/peer"
)
const (
// MaxPacketSize specifies the maximum allowed size of packets.
// Packets larger than this will be cut and thus treated as invalid.
MaxPacketSize = 1280
)
// Connection represents a network connection to a neighbor peer.
type Connection struct {
net.Conn
peer *peer.Peer
conn net.Conn
}
func newConnection(p *peer.Peer, c net.Conn) *Connection {
func newConnection(c net.Conn, p *peer.Peer) *Connection {
// make sure the connection has no timeouts
_ = c.SetDeadline(time.Time{})
return &Connection{
Conn: c,
peer: p,
conn: c,
}
}
func (c *Connection) Close() {
c.conn.Close()
}
func (c *Connection) Read() ([]byte, error) {
b := make([]byte, MaxPacketSize)
n, err := c.conn.Read(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
func (c *Connection) Write(b []byte) error {
_, err := c.conn.Write(b)
return err
// Peer returns the peer associated with that connection.
func (c *Connection) Peer() *peer.Peer {
return c.peer
}
......@@ -10,18 +10,18 @@ import (
)
const (
HandshakeExpiration = 20 * time.Second
VersionNum = 0
versionNum = 0
handshakeExpiration = 20 * time.Second
)
// isExpired checks whether the given UNIX time stamp is too far in the past.
func isExpired(ts int64) bool {
return time.Since(time.Unix(ts, 0)) >= HandshakeExpiration
return time.Since(time.Unix(ts, 0)) >= handshakeExpiration
}
func newHandshakeRequest(fromAddr string, toAddr string) ([]byte, error) {
m := &pb.HandshakeRequest{
Version: VersionNum,
Version: versionNum,
From: fromAddr,
To: toAddr,
Timestamp: time.Now().Unix(),
......@@ -36,7 +36,7 @@ func newHandshakeResponse(reqData []byte) ([]byte, error) {
return proto.Marshal(m)
}
func (t *TransportTCP) validateHandshakeRequest(reqData []byte, fromAddr string) bool {
func (t *TCP) validateHandshakeRequest(reqData []byte, fromAddr string) bool {
m := new(pb.HandshakeRequest)
if err := proto.Unmarshal(reqData, m); err != nil {
t.log.Debugw("invalid handshake",
......@@ -44,7 +44,7 @@ func (t *TransportTCP) validateHandshakeRequest(reqData []byte, fromAddr string)
)
return false
}
if m.GetVersion() != VersionNum {
if m.GetVersion() != versionNum {
t.log.Debugw("invalid handshake",
"version", m.GetVersion(),
)
......@@ -71,7 +71,7 @@ func (t *TransportTCP) validateHandshakeRequest(reqData []byte, fromAddr string)
return true
}
func (t *TransportTCP) validateHandshakeResponse(resData []byte, reqData []byte) bool {
func (t *TCP) validateHandshakeResponse(resData []byte, reqData []byte) bool {
m := new(pb.HandshakeResponse)
if err := proto.Unmarshal(resData, m); err != nil {
t.log.Debugw("invalid handshake",
......
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: transport/proto/handshake.proto
// source: packages/gossip/transport/proto/handshake.proto
package proto
......@@ -38,7 +38,7 @@ func (m *HandshakeRequest) Reset() { *m = HandshakeRequest{} }
func (m *HandshakeRequest) String() string { return proto.CompactTextString(m) }
func (*HandshakeRequest) ProtoMessage() {}
func (*HandshakeRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_d7101ffe19b05443, []int{0}
return fileDescriptor_f9e96b60881ea276, []int{0}
}
func (m *HandshakeRequest) XXX_Unmarshal(b []byte) error {
......@@ -99,7 +99,7 @@ func (m *HandshakeResponse) Reset() { *m = HandshakeResponse{} }
func (m *HandshakeResponse) String() string { return proto.CompactTextString(m) }
func (*HandshakeResponse) ProtoMessage() {}
func (*HandshakeResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_d7101ffe19b05443, []int{1}
return fileDescriptor_f9e96b60881ea276, []int{1}
}
func (m *HandshakeResponse) XXX_Unmarshal(b []byte) error {
......@@ -132,21 +132,24 @@ func init() {
proto.RegisterType((*HandshakeResponse)(nil), "proto.HandshakeResponse")
}
func init() { proto.RegisterFile("transport/proto/handshake.proto", fileDescriptor_d7101ffe19b05443) }
var fileDescriptor_d7101ffe19b05443 = []byte{
// 203 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x5c, 0x8f, 0x3f, 0x4b, 0x04, 0x31,
0x10, 0xc5, 0xb9, 0x3f, 0x7a, 0xde, 0xa0, 0xa2, 0xa9, 0x22, 0x08, 0xca, 0x55, 0x82, 0xb8, 0x29,
0xfc, 0x06, 0x56, 0x57, 0xa7, 0xb4, 0x91, 0xec, 0x3a, 0x6e, 0x82, 0x26, 0x93, 0xcd, 0x64, 0xfd,
0xfc, 0x0e, 0x81, 0x45, 0xb1, 0x7a, 0xef, 0xf7, 0x0b, 0xe4, 0x25, 0x70, 0x57, 0x8b, 0x4b, 0x9c,
0xa9, 0x54, 0x93, 0x0b, 0x55, 0x32, 0xde, 0xa5, 0x77, 0xf6, 0xee, 0x13, 0xbb, 0xc6, 0xea, 0xa4,
0xc5, 0x21, 0xc1, 0xd5, 0x71, 0x39, 0xb1, 0x38, 0xcd, 0xc8, 0x55, 0x69, 0xd8, 0x7d, 0x63, 0xe1,
0x40, 0x49, 0xaf, 0xee, 0x57, 0x0f, 0x17, 0x76, 0x41, 0xa5, 0x60, 0xfb, 0x51, 0x28, 0xea, 0xb5,
0xe8, 0xbd, 0x6d, 0x5d, 0x5d, 0xc2, 0xba, 0x92, 0xde, 0x34, 0x23, 0x4d, 0xdd, 0xc2, 0xbe, 0x86,
0x28, 0xf7, 0xb8, 0x98, 0xf5, 0x56, 0xf4, 0xc6, 0xfe, 0x8a, 0x43, 0x07, 0xd7, 0x7f, 0xf6, 0xe4,
0x81, 0x89, 0x51, 0xdd, 0xc0, 0x59, 0xc1, 0xe9, 0xcd, 0x3b, 0xf6, 0x6d, 0xf1, 0xdc, 0xee, 0x84,
0x8f, 0x82, 0x2f, 0x4f, 0xaf, 0x8f, 0x63, 0xa8, 0x7e, 0xee, 0xbb, 0x81, 0xa2, 0x19, 0x5c, 0x26,
0x66, 0xfc, 0x42, 0x33, 0x4a, 0x86, 0x6c, 0xfe, 0xfd, 0xb2, 0x3f, 0x6d, 0xf1, 0xfc, 0x13, 0x00,
0x00, 0xff, 0xff, 0x51, 0xe0, 0x08, 0xd0, 0xff, 0x00, 0x00, 0x00,
func init() {
proto.RegisterFile("packages/gossip/transport/proto/handshake.proto", fileDescriptor_f9e96b60881ea276)
}
var fileDescriptor_f9e96b60881ea276 = []byte{
// 219 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x8f, 0x31, 0x4f, 0xc3, 0x30,
0x10, 0x85, 0x95, 0xb4, 0x50, 0x6a, 0x01, 0x02, 0x4f, 0x41, 0x62, 0x88, 0x3a, 0x65, 0x8a, 0x07,
0x7e, 0x00, 0x82, 0xa9, 0xb3, 0x47, 0x16, 0x74, 0x6d, 0x8f, 0xd8, 0x2a, 0xf6, 0x39, 0x77, 0x57,
0x7e, 0x3f, 0xc2, 0x52, 0x05, 0x1b, 0xd3, 0xbb, 0xf7, 0x6e, 0xf8, 0xf4, 0x19, 0x57, 0x60, 0x7f,
0x84, 0x09, 0xc5, 0x4d, 0x24, 0x12, 0x8b, 0x53, 0x86, 0x2c, 0x85, 0x58, 0x5d, 0x61, 0x52, 0x72,
0x01, 0xf2, 0x41, 0x02, 0x1c, 0x71, 0xac, 0xdd, 0x5e, 0xd4, 0xd8, 0x64, 0x73, 0xb7, 0x3d, 0x7f,
0x3c, 0xce, 0x27, 0x14, 0xb5, 0x9d, 0x59, 0x7d, 0x21, 0x4b, 0xa4, 0xdc, 0x35, 0x7d, 0x33, 0xdc,
0xf8, 0x73, 0xb5, 0xd6, 0x2c, 0x3f, 0x98, 0x52, 0xd7, 0xf6, 0xcd, 0xb0, 0xf6, 0xf5, 0xb6, 0xb7,
0xa6, 0x55, 0xea, 0x16, 0x75, 0x69, 0x95, 0xec, 0xa3, 0x59, 0x6b, 0x4c, 0x28, 0x0a, 0xa9, 0x74,
0xcb, 0xbe, 0x19, 0x16, 0xfe, 0x77, 0xd8, 0x8c, 0xe6, 0xfe, 0x0f, 0x4f, 0x0a, 0x65, 0x41, 0xfb,
0x60, 0xae, 0x18, 0xe7, 0xf7, 0x00, 0x12, 0x2a, 0xf1, 0xda, 0xaf, 0x18, 0xe7, 0x2d, 0x48, 0x78,
0x7d, 0x79, 0x7b, 0x9e, 0xa2, 0x86, 0xd3, 0x6e, 0xdc, 0x53, 0x72, 0x91, 0x14, 0x3e, 0xf1, 0x30,
0x21, 0xff, 0x68, 0x86, 0x98, 0x12, 0xf2, 0x7f, 0xe6, 0xbb, 0xcb, 0x1a, 0x4f, 0xdf, 0x01, 0x00,
0x00, 0xff, 0xff, 0x2a, 0x53, 0xc3, 0xbb, 0x23, 0x01, 0x00, 0x00,
}
......@@ -3,8 +3,10 @@ package transport
import (
"bytes"
"container/list"
"errors"
"fmt"
"io"
"net"
"strings"
"sync"
"time"
......@@ -12,13 +14,18 @@ import (
"github.com/iotaledger/autopeering-sim/peer"
"github.com/iotaledger/autopeering-sim/peer/service"
pb "github.com/iotaledger/autopeering-sim/server/proto"
"github.com/pkg/errors"
"go.uber.org/zap"
)
var (
// ErrTimeout is returned when an expected incoming connection was not received in time.
ErrTimeout = errors.New("accept timeout")
ErrClosed = errors.New("listener closed")
// ErrClosed means that the transport was shut down before a response could be received.
ErrClosed = errors.New("transport closed")
// ErrInvalidHandshake is returned when no correct handshake could be established.
ErrInvalidHandshake = errors.New("invalid handshake")
// ErrNoGossip means that the given peer does not support the gossip service.
ErrNoGossip = errors.New("peer does not have a gossip service")
)
......@@ -27,9 +34,12 @@ const (
acceptTimeout = 500 * time.Millisecond
handshakeTimeout = 100 * time.Millisecond
connectionTimeout = acceptTimeout + handshakeTimeout
maxHandshakePacketSize = 256
)
type TransportTCP struct {
// TCP establishes verified incoming and outgoing TCP connections to other peers.
type TCP struct {
local *peer.Local
listener *net.TCPListener
log *zap.SugaredLogger
......@@ -60,8 +70,9 @@ type accept struct {
conn net.Conn // the actual network connection
}
func Listen(local *peer.Local, log *zap.SugaredLogger) (*TransportTCP, error) {
t := &TransportTCP{
// Listen creates the object and starts listening for incoming connections.
func Listen(local *peer.Local, log *zap.SugaredLogger) (*TCP, error) {
t := &TCP{
local: local,
log: log,
addAcceptMatcher: make(chan *acceptMatcher),
......@@ -91,7 +102,7 @@ func Listen(local *peer.Local, log *zap.SugaredLogger) (*TransportTCP, error) {
}
// Close stops listening on the gossip address.
func (t *TransportTCP) Close() {
func (t *TCP) Close() {
t.closeOnce.Do(func() {
close(t.closing)
if err := t.listener.Close(); err != nil {
......@@ -102,13 +113,13 @@ func (t *TransportTCP) Close() {
}
// LocalAddr returns the listener's network address,
func (t *TransportTCP) LocalAddr() net.Addr {
func (t *TCP) LocalAddr() net.Addr {
return t.listener.Addr()
}
// DialPeer establishes a gossip connection to the given peer.
// If the peer does not accept the connection or the handshake fails, an error is returned.
func (t *TransportTCP) DialPeer(p *peer.Peer) (*Connection, error) {
func (t *TCP) DialPeer(p *peer.Peer) (*Connection, error) {
gossipAddr := p.Services().Get(service.GossipKey)
if gossipAddr == nil {
return nil, ErrNoGossip
......@@ -125,12 +136,12 @@ func (t *TransportTCP) DialPeer(p *peer.Peer) (*Connection, error) {
}
t.log.Debugw("connected", "id", p.ID(), "addr", conn.RemoteAddr(), "direction", "out")
return newConnection(p, conn), nil
return newConnection(conn, p), nil
}
// AcceptPeer awaits an incoming connection from the given peer.
// If the peer does not establish the connection or the handshake fails, an error is returned.
func (t *TransportTCP) AcceptPeer(p *peer.Peer) (*Connection, error) {
func (t *TCP) AcceptPeer(p *peer.Peer) (*Connection, error) {
if p.Services().Get(service.GossipKey) == nil {
return nil, ErrNoGossip
}
......@@ -139,11 +150,12 @@ func (t *TransportTCP) AcceptPeer(p *peer.Peer) (*Connection, error) {
if connected.err != nil {
return nil, connected.err
}
t.log.Debugw("connected", "id", p.ID(), "addr", connected.c.conn.RemoteAddr(), "direction", "in")
t.log.Debugw("connected", "id", p.ID(), "addr", connected.c.RemoteAddr(), "direction", "in")
return connected.c, nil
}
func (t *TransportTCP) acceptPeer(p *peer.Peer) <-chan connect {
func (t *TCP) acceptPeer(p *peer.Peer) <-chan connect {
connected := make(chan connect, 1)
// add the matcher
select {
......@@ -154,17 +166,17 @@ func (t *TransportTCP) acceptPeer(p *peer.Peer) <-chan connect {
return connected
}
func (t *TransportTCP) closeConnection(c net.Conn) {
func (t *TCP) closeConnection(c net.Conn) {
if err := c.Close(); err != nil {
t.log.Warnw("close error", "err", err)
}
}
func (t *TransportTCP) run() {
func (t *TCP) run() {
defer t.wg.Done()
var (
mlist = list.New()
matcherList = list.New()
timeout = time.NewTimer(0)
)
defer timeout.Stop()
......@@ -174,9 +186,9 @@ func (t *TransportTCP) run() {
for {
// Set the timer so that it fires when the next accept expires
if el := mlist.Front(); el != nil {
if e := matcherList.Front(); e != nil {
// the first element always has the closest deadline
m := el.Value.(*acceptMatcher)
m := e.Value.(*acceptMatcher)
timeout.Reset(time.Until(m.deadline))
} else {
timeout.Stop()
......@@ -187,16 +199,16 @@ func (t *TransportTCP) run() {
// add a new matcher to the list
case m := <-t.addAcceptMatcher:
m.deadline = time.Now().Add(connectionTimeout)
mlist.PushBack(m)
matcherList.PushBack(m)
// on accept received, check all matchers for a fit
case a := <-t.acceptReceived:
matched := false
for el := mlist.Front(); el != nil; el = el.Next() {
m := el.Value.(*acceptMatcher)
for e := matcherList.Front(); e != nil; e = e.Next() {
m := e.Value.(*acceptMatcher)
if m.peer.ID() == a.fromID {
matched = true
mlist.Remove(el)
matcherList.Remove(e)
// finish the handshake
go t.matchAccept(m, a.req, a.conn)
}
......@@ -212,18 +224,18 @@ func (t *TransportTCP) run() {
now := time.Now()
// notify and remove any expired matchers
for el := mlist.Front(); el != nil; el = el.Next() {
m := el.Value.(*acceptMatcher)
for e := matcherList.Front(); e != nil; e = e.Next() {
m := e.Value.(*acceptMatcher)
if now.After(m.deadline) || now.Equal(m.deadline) {
m.connected <- connect{nil, ErrTimeout}
mlist.Remove(el)
matcherList.Remove(e)
}
}
// on close, notify all the matchers
case <-t.closing:
for el := mlist.Front(); el != nil; el = el.Next() {
el.Value.(*acceptMatcher).connected <- connect{nil, ErrClosed}
for e := matcherList.Front(); e != nil; e = e.Next() {
e.Value.(*acceptMatcher).connected <- connect{nil, ErrClosed}
}
return
......@@ -231,7 +243,7 @@ func (t *TransportTCP) run() {
}
}
func (t *TransportTCP) matchAccept(m *acceptMatcher, req []byte, conn net.Conn) {
func (t *TCP) matchAccept(m *acceptMatcher, req []byte, conn net.Conn) {
t.wg.Add(1)
defer t.wg.Done()
......@@ -241,10 +253,10 @@ func (t *TransportTCP) matchAccept(m *acceptMatcher, req []byte, conn net.Conn)
t.closeConnection(conn)
return
}
m.connected <- connect{newConnection(m.peer, conn), nil}
m.connected <- connect{newConnection(conn, m.peer), nil}
}
func (t *TransportTCP) listenLoop() {
func (t *TCP) listenLoop() {
defer t.wg.Done()
for {
......@@ -254,7 +266,10 @@ func (t *TransportTCP) listenLoop() {
continue
} else if err != nil {
// return from the loop on all other errors
t.log.Warnw("read error", "err", err)
if err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") {
t.log.Warnw("listen error", "err", err)
}
t.log.Debug("listening stopped")
return
}
......@@ -278,7 +293,7 @@ func (t *TransportTCP) listenLoop() {
}
}
func (t *TransportTCP) doHandshake(key peer.PublicKey, remoteAddr string, conn net.Conn) error {
func (t *TCP) doHandshake(key peer.PublicKey, remoteAddr string, conn net.Conn) error {
reqData, err := newHandshakeRequest(conn.LocalAddr().String(), remoteAddr)
if err != nil {
return err
......@@ -291,7 +306,10 @@ func (t *TransportTCP) doHandshake(key peer.PublicKey, remoteAddr string, conn n
}
b, err := proto.Marshal(pkt)
if err != nil {
return err
return errors.Wrap(err, ErrInvalidHandshake.Error())
}
if l := len(b); l > maxHandshakePacketSize {
return fmt.Errorf("handshake size too large: %d, max %d", l, maxHandshakePacketSize)
}
if err := conn.SetWriteDeadline(time.Now().Add(handshakeTimeout)); err != nil {
......@@ -305,7 +323,7 @@ func (t *TransportTCP) doHandshake(key peer.PublicKey, remoteAddr string, conn n
if err := conn.SetReadDeadline(time.Now().Add(handshakeTimeout)); err != nil {
return err
}
b = make([]byte, MaxPacketSize)
b = make([]byte, maxHandshakePacketSize)
n, err := conn.Read(b)
if err != nil {
return err
......@@ -313,17 +331,13 @@ func (t *TransportTCP) doHandshake(key peer.PublicKey, remoteAddr string, conn n
pkt = new(pb.Packet)
if err := proto.Unmarshal(b[:n], pkt); err != nil {
return err
return errors.Wrap(err, ErrInvalidHandshake.Error())
}
signer, err := peer.RecoverKeyFromSignedData(pkt)
if err != nil {
return err
}
if !bytes.Equal(key, signer) {
return errors.New("invalid key")
if err != nil || !bytes.Equal(key, signer) {
return ErrInvalidHandshake
}
if !t.validateHandshakeResponse(pkt.GetData(), reqData) {
return ErrInvalidHandshake
}
......@@ -331,14 +345,14 @@ func (t *TransportTCP) doHandshake(key peer.PublicKey, remoteAddr string, conn n
return nil
}
func (t *TransportTCP) readHandshakeRequest(conn net.Conn) (peer.PublicKey, []byte, error) {
func (t *TCP) readHandshakeRequest(conn net.Conn) (peer.PublicKey, []byte, error) {
if err := conn.SetReadDeadline(time.Now().Add(handshakeTimeout)); err != nil {
return nil, nil, err
}
b := make([]byte, MaxPacketSize)
b := make([]byte, maxHandshakePacketSize)
n, err := conn.Read(b)
if err != nil {
return nil, nil, err
return nil, nil, errors.Wrap(err, ErrInvalidHandshake.Error())
}
pkt := new(pb.Packet)
......@@ -358,7 +372,7 @@ func (t *TransportTCP) readHandshakeRequest(conn net.Conn) (peer.PublicKey, []by
return key, pkt.GetData(), nil
}
func (t *TransportTCP) writeHandshakeResponse(reqData []byte, conn net.Conn) error {
func (t *TCP) writeHandshakeResponse(reqData []byte, conn net.Conn) error {
data, err := newHandshakeResponse(reqData)
if err != nil {
return err
......@@ -373,6 +387,9 @@ func (t *TransportTCP) writeHandshakeResponse(reqData []byte, conn net.Conn) err
if err != nil {
return err
}
if l := len(b); l > maxHandshakePacketSize {
return fmt.Errorf("handshake size too large: %d, max %d", l, maxHandshakePacketSize)
}
if err := conn.SetWriteDeadline(time.Now().Add(handshakeTimeout)); err != nil {
return err
......
......@@ -26,14 +26,14 @@ func init() {
logger = l.Sugar()
}
func newTest(t require.TestingT, name string) (*TransportTCP, func()) {
func newTest(t require.TestingT, name string) (*TCP, func()) {
l := logger.Named(name)
db := peer.NewMemoryDB(l.Named("db"))
local, err := peer.NewLocal("peering", name, db)
require.NoError(t, err)
// enable TCP gossipping
require.NoError(t, local.UpdateService(service.GossipKey, "tcp", ":0"))
require.NoError(t, local.UpdateService(service.GossipKey, "tcp", "localhost:0"))
trans, err := Listen(local, l)
require.NoError(t, err)
......@@ -48,7 +48,7 @@ func newTest(t require.TestingT, name string) (*TransportTCP, func()) {
return trans, teardown
}
func getPeer(t *TransportTCP) *peer.Peer {
func getPeer(t *TCP) *peer.Peer {
return &t.local.Peer
}
......@@ -87,7 +87,7 @@ func TestUnansweredDial(t *testing.T) {
// create peer with invalid gossip address
services := getPeer(transA).Services().CreateRecord()
services.Update(service.GossipKey, "tcp", ":0")
services.Update(service.GossipKey, "tcp", "localhost:0")
unreachablePeer := peer.NewPeer(getPeer(transA).PublicKey(), services)
_, err := transA.DialPeer(unreachablePeer)
......@@ -99,12 +99,12 @@ func TestNoHandshakeResponse(t *testing.T) {
defer closeA()
// accept and read incoming connections
lis, err := net.Listen("tcp", ":0")
lis, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
go func() {
conn, err := lis.Accept()
require.NoError(t, err)
n, _ := conn.Read(make([]byte, MaxPacketSize))
n, _ := conn.Read(make([]byte, maxHandshakePacketSize))
assert.NotZero(t, n)
_ = conn.Close()
_ = lis.Close()
......
......@@ -53,7 +53,7 @@ func Start(tps uint) {
mtx := &pb.Transaction{Body: tx.MetaTransaction.GetBytes()}
b, _ := proto.Marshal(mtx)
gossip.Event.NewTransaction.Trigger(b)
gossip.Events.NewTransaction.Trigger(b)
if sentCounter >= tps {
duration := time.Since(start)
......
......@@ -51,11 +51,15 @@ const defaultZLC = `{
}
}`
func start() {
var (
err error
db peer.DB
trans *transport.TransportConn
zLogger *zap.SugaredLogger
handlers []server.Handler
conn *net.UDPConn
)
func configureAP() {
host := parameter.NodeConfig.GetString(CFG_ADDRESS)
localhost := host
apPort := strconv.Itoa(parameter.NodeConfig.GetInt(CFG_PORT))
......@@ -68,19 +72,16 @@ func start() {
listenAddr := host + ":" + apPort
gossipAddr := host + ":" + gossipPort
logger := logger.NewLogger(defaultZLC, debugLevel)
defer func() { _ = logger.Sync() }() // ignore the returned error
zLogger = logger.NewLogger(defaultZLC, debugLevel)
addr, err := net.ResolveUDPAddr("udp", localhost+":"+apPort)
if err != nil {
log.Fatalf("ResolveUDPAddr: %v", err)
}
conn, err := net.ListenUDP("udp", addr)
conn, err = net.ListenUDP("udp", addr)
if err != nil {
log.Fatalf("ListenUDP: %v", err)
}
defer conn.Close()
masterPeers := []*peer.Peer{}
master, err := parseEntryNodes()
......@@ -91,12 +92,11 @@ func start() {
}
// use the UDP connection for transport
trans := transport.Conn(conn, func(network, address string) (net.Addr, error) { return net.ResolveUDPAddr(network, address) })
defer trans.Close()
trans = transport.Conn(conn, func(network, address string) (net.Addr, error) { return net.ResolveUDPAddr(network, address) })
// create a new local node
db := peer.NewPersistentDB(logger.Named("db"))
defer db.Close()
db = peer.NewPersistentDB(zLogger.Named("db"))
local.INSTANCE, err = peer.NewLocal("udp", listenAddr, db)
if err != nil {
log.Fatalf("ListenUDP: %v", err)
......@@ -108,14 +108,14 @@ func start() {
}
Discovery = discover.New(local.INSTANCE, discover.Config{
Log: logger.Named("disc"),
Log: zLogger.Named("disc"),
MasterPeers: masterPeers,
})
handlers := append([]server.Handler{}, Discovery)
handlers = append([]server.Handler{}, Discovery)
if parameter.NodeConfig.GetBool(CFG_SELECTION) {
Selection = selection.New(local.INSTANCE, Discovery, selection.Config{
Log: logger.Named("sel"),
Log: zLogger.Named("sel"),
Param: &selection.Parameters{
SaltLifetime: selection.DefaultSaltLifetime,
RequiredService: []service.Key{service.GossipKey},
......@@ -123,14 +123,14 @@ func start() {
})
handlers = append(handlers, Selection)
}
}
func start() {
// start a server doing discovery and peering
srv = server.Listen(local.INSTANCE, trans, logger.Named("srv"), handlers...)
defer srv.Close()
srv = server.Listen(local.INSTANCE, trans, zLogger.Named("srv"), handlers...)
// start the discovery on that connection
Discovery.Start(srv)
defer Discovery.Close()
// start the peering on that connection
if parameter.NodeConfig.GetBool(CFG_SELECTION) {
......@@ -139,12 +139,22 @@ func start() {
}
id := base64.StdEncoding.EncodeToString(local.INSTANCE.PublicKey())
logger.Info("Discovery protocol started: ID=" + id + ", address=" + srv.LocalAddr())
zLogger.Info("Discovery protocol started: ID=" + id + ", address=" + srv.LocalAddr())
defer func() {
_ = zLogger.Sync() // ignore the returned error
trans.Close()
db.Close()
conn.Close()
srv.Close()
Discovery.Close()
}()
// Only for debug
go func() {
for t := range time.NewTicker(2 * time.Second).C {
_ = t
printReport(logger)
printReport(zLogger)
}
}()
......
......@@ -3,7 +3,6 @@ package autopeering
import (
"github.com/iotaledger/autopeering-sim/discover"
"github.com/iotaledger/goshimmer/packages/gossip"
"github.com/iotaledger/goshimmer/packages/gossip/neighbor"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
......@@ -18,6 +17,7 @@ func configure(plugin *node.Plugin) {
close <- struct{}{}
}))
configureAP()
configureLogging(plugin)
}
......@@ -26,9 +26,10 @@ func run(plugin *node.Plugin) {
}
func configureLogging(plugin *node.Plugin) {
gossip.Event.DropNeighbor.Attach(events.NewClosure(func(peer *neighbor.Neighbor) {
gossip.Events.DropNeighbor.Attach(events.NewClosure(func(ev *gossip.DropNeighborEvent) {
log.Info("neighbor dropped: " + ev.Peer.Address() + " / " + ev.Peer.ID().String())
if Selection != nil {
Selection.DropPeer(peer.Peer)
Selection.DropPeer(ev.Peer)
}
}))
......
package gossip
import (
"github.com/iotaledger/goshimmer/packages/gossip/transport"
"github.com/iotaledger/goshimmer/packages/model/meta_transaction"
"github.com/golang/protobuf/proto"
"github.com/iotaledger/autopeering-sim/peer/service"
"github.com/iotaledger/autopeering-sim/selection"
gp "github.com/iotaledger/goshimmer/packages/gossip"
pb "github.com/iotaledger/goshimmer/packages/gossip/proto"
"github.com/iotaledger/goshimmer/packages/gossip/transport"
"github.com/iotaledger/goshimmer/packages/model/meta_transaction"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/autopeering-sim/peer/service"
"github.com/iotaledger/autopeering-sim/selection"
"github.com/iotaledger/goshimmer/plugins/tangle"
"go.uber.org/zap"
"github.com/golang/protobuf/proto"
"github.com/iotaledger/hive.go/events"
"go.uber.org/zap"
)
var (
zLogger *zap.SugaredLogger
mgr *gp.Manager
SendTransaction = mgr.Send
SendTransaction = mgr.SendTransaction
RequestTransaction = mgr.RequestTransaction
AddInbound = mgr.AddInbound
AddOutbound = mgr.AddOutbound
......@@ -46,43 +45,38 @@ func configureGossip() {
trans, err := transport.Listen(local.INSTANCE, zLogger)
if err != nil {
// TODO: handle error
log.Fatal(err)
}
mgr = gp.NewManager(trans, zLogger, getTransaction)
log.Info("Gossip started @", trans.LocalAddr().String())
}
func configureEvents() {
selection.Events.Dropped.Attach(events.NewClosure(func(ev *selection.DroppedEvent) {
log.Debug("neighbor removed: " + ev.DroppedID.String())
DropNeighbor(ev.DroppedID)
log.Info("neighbor removed: " + ev.DroppedID.String())
mgr.DropNeighbor(ev.DroppedID)
}))
selection.Events.IncomingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) {
gossipService := ev.Peer.Services().Get(service.GossipKey)
if gossipService != nil {
log.Debug("accepted neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String())
log.Info("accepted neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String())
//address, port, _ := net.SplitHostPort(ev.Peer.Services().Get(service.GossipKey).String())
AddInbound(ev.Peer)
go mgr.AddInbound(ev.Peer)
}
}))
selection.Events.OutgoingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) {
gossipService := ev.Peer.Services().Get(service.GossipKey)
if gossipService != nil {
log.Debug("chosen neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String())
log.Info("chosen neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String())
//address, port, _ := net.SplitHostPort(ev.Peer.Services().Get(service.GossipKey).String())
AddOutbound(ev.Peer)
go mgr.AddOutbound(ev.Peer)
}
}))
// mgr.Events.NewTransaction.Attach(events.NewClosure(func(ev *gp.NewTransactionEvent) {
// tx := ev.Body
// metaTx := meta_transaction.FromBytes(tx)
// Events.NewTransaction.Trigger(metaTx)
// }))
tangle.Events.TransactionSolid.Attach(events.NewClosure(func(tx *meta_transaction.MetaTransaction) {
t := &pb.Transaction{
Body: tx.GetBytes(),
......@@ -91,6 +85,6 @@ func configureEvents() {
if err != nil {
return
}
SendTransaction(b)
go SendTransaction(b)
}))
}
package gossip
import (
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
)
var PLUGIN = node.NewPlugin("Gossip", node.Enabled, configure, run)
......
......@@ -14,7 +14,7 @@ var PLUGIN = node.NewPlugin("Metrics", node.Enabled, configure, run)
func configure(plugin *node.Plugin) {
// increase received TPS counter whenever we receive a new transaction
gossip.Event.NewTransaction.Attach(events.NewClosure(func(_ *gossip.NewTransactionEvent) { increaseReceivedTPSCounter() }))
gossip.Events.NewTransaction.Attach(events.NewClosure(func(_ *gossip.NewTransactionEvent) { increaseReceivedTPSCounter() }))
}
func run(plugin *node.Plugin) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment