diff --git a/packages/gossip/errors.go b/packages/gossip/errors.go index 7be5d0fc553b260d367c5b63df37756e7c7fb70b..160ed5513dc18c8b571f0860cac92806192cd5bb 100644 --- a/packages/gossip/errors.go +++ b/packages/gossip/errors.go @@ -3,6 +3,7 @@ package gossip import "github.com/pkg/errors" var ( + ErrNotStarted = errors.New("manager not started") ErrClosed = errors.New("manager closed") ErrNotANeighbor = errors.New("peer is not a neighbor") ErrDuplicateNeighbor = errors.New("peer already connected") diff --git a/packages/gossip/manager.go b/packages/gossip/manager.go index 8c11efba42eec7b7628f6387d8e31a03094387f7..897f1c50c9bd3c9dac9991d074f76d30aa5f4cfc 100644 --- a/packages/gossip/manager.go +++ b/packages/gossip/manager.go @@ -6,47 +6,57 @@ import ( "strings" "sync" + "github.com/iotaledger/autopeering-sim/peer/service" + "github.com/golang/protobuf/proto" "github.com/iotaledger/autopeering-sim/peer" pb "github.com/iotaledger/goshimmer/packages/gossip/proto" - "github.com/iotaledger/goshimmer/packages/gossip/transport" + "github.com/iotaledger/goshimmer/packages/gossip/server" "github.com/pkg/errors" "go.uber.org/zap" ) const ( - maxConnectionAttempts = 3 - maxPacketSize = 2048 + maxPacketSize = 2048 ) type GetTransaction func(txHash []byte) ([]byte, error) type Manager struct { - trans *transport.TCP - log *zap.SugaredLogger + local *peer.Local getTransaction GetTransaction + log *zap.SugaredLogger wg sync.WaitGroup mu sync.RWMutex + srv *server.TCP neighbors map[peer.ID]*neighbor running bool } type neighbor struct { peer *peer.Peer - conn *transport.Connection + conn *server.Connection } -func NewManager(t *transport.TCP, log *zap.SugaredLogger, f GetTransaction) *Manager { - m := &Manager{ - trans: t, - log: log, +func NewManager(local *peer.Local, f GetTransaction, log *zap.SugaredLogger) *Manager { + return &Manager{ + local: local, getTransaction: f, + log: log, + srv: nil, neighbors: make(map[peer.ID]*neighbor), + running: false, } +} + +func (m *Manager) Start(srv *server.TCP) { + m.mu.Lock() + defer m.mu.Unlock() + + m.srv = srv m.running = true - return m } // Close stops the manager and closes all established connections. @@ -62,14 +72,35 @@ func (m *Manager) Close() { m.wg.Wait() } +// LocalAddr returns the public address of the gossip service. +func (m *Manager) LocalAddr() net.Addr { + return m.local.Services().Get(service.GossipKey) +} + // AddOutbound tries to add a neighbor by connecting to that peer. func (m *Manager) AddOutbound(p *peer.Peer) error { - return m.addNeighbor(p, m.trans.DialPeer) + var srv *server.TCP + m.mu.RLock() + if m.srv == nil { + return ErrNotStarted + } + srv = m.srv + m.mu.RUnlock() + + return m.addNeighbor(p, srv.DialPeer) } // AddInbound tries to add a neighbor by accepting an incoming connection from that peer. func (m *Manager) AddInbound(p *peer.Peer) error { - return m.addNeighbor(p, m.trans.AcceptPeer) + var srv *server.TCP + m.mu.RLock() + if m.srv == nil { + return ErrNotStarted + } + srv = m.srv + m.mu.RUnlock() + + return m.addNeighbor(p, srv.AcceptPeer) } // NeighborDropped disconnects the neighbor with the given ID. @@ -151,19 +182,8 @@ func (m *Manager) send(msg []byte, to ...peer.ID) { } } -func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (*transport.Connection, error)) error { - var ( - err error - conn *transport.Connection - ) - for i := 0; i < maxConnectionAttempts; i++ { - conn, err = connectorFunc(peer) - if err == nil { - break - } - } - - // could not add neighbor +func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (*server.Connection, error)) error { + conn, err := connectorFunc(peer) if err != nil { m.log.Debugw("addNeighbor failed", "peer", peer.ID(), "err", err) Events.NeighborDropped.Trigger(&NeighborDroppedEvent{Peer: peer}) @@ -273,7 +293,7 @@ func marshal(msg pb.Message) []byte { return append([]byte{byte(mType)}, data...) } -func disconnect(conn *transport.Connection) { +func disconnect(conn *server.Connection) { _ = conn.Close() Events.NeighborDropped.Trigger(&NeighborDroppedEvent{Peer: conn.Peer()}) } diff --git a/packages/gossip/manager_test.go b/packages/gossip/manager_test.go index df9e83337b423403d46afe1b4ae445d4d19a45f6..789b86992f48e3fcd2d609e2cf12cc1cf773959f 100644 --- a/packages/gossip/manager_test.go +++ b/packages/gossip/manager_test.go @@ -11,7 +11,7 @@ import ( "github.com/iotaledger/autopeering-sim/peer" "github.com/iotaledger/autopeering-sim/peer/service" pb "github.com/iotaledger/goshimmer/packages/gossip/proto" - "github.com/iotaledger/goshimmer/packages/gossip/transport" + "github.com/iotaledger/goshimmer/packages/gossip/server" "github.com/iotaledger/hive.go/events" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -78,17 +78,20 @@ func newTest(t require.TestingT, name string) (*Manager, func(), *peer.Peer) { // enable TCP gossipping require.NoError(t, local.UpdateService(service.GossipKey, "tcp", getTCPAddress(t))) - trans, err := transport.Listen(local, l) - require.NoError(t, err) + mgr := NewManager(local, getTestTransaction, l) - mgr := NewManager(trans, l, getTestTransaction) + srv, err := server.ListenTCP(local, l) + require.NoError(t, err) // update the service with the actual address - require.NoError(t, local.UpdateService(service.GossipKey, trans.LocalAddr().Network(), trans.LocalAddr().String())) + require.NoError(t, local.UpdateService(service.GossipKey, srv.LocalAddr().Network(), srv.LocalAddr().String())) + + // start the actual gossipping + mgr.Start(srv) teardown := func() { mgr.Close() - trans.Close() + srv.Close() db.Close() } return mgr, teardown, &local.Peer diff --git a/packages/gossip/transport/connection.go b/packages/gossip/server/connection.go similarity index 96% rename from packages/gossip/transport/connection.go rename to packages/gossip/server/connection.go index d677040f0002b1fc40a04469751fbd24cee96097..076068f14050a18a74fde368297e72fa1e6262f4 100644 --- a/packages/gossip/transport/connection.go +++ b/packages/gossip/server/connection.go @@ -1,4 +1,4 @@ -package transport +package server import ( "net" diff --git a/packages/gossip/transport/handshake.go b/packages/gossip/server/handshake.go similarity index 95% rename from packages/gossip/transport/handshake.go rename to packages/gossip/server/handshake.go index 3489a9d66ade615fdeadb7549df19cf143c57dee..ef5bb60ec79d3e4897da828e2e802412ba9fc960 100644 --- a/packages/gossip/transport/handshake.go +++ b/packages/gossip/server/handshake.go @@ -1,4 +1,4 @@ -package transport +package server import ( "bytes" @@ -6,7 +6,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/iotaledger/autopeering-sim/server" - pb "github.com/iotaledger/goshimmer/packages/gossip/transport/proto" + pb "github.com/iotaledger/goshimmer/packages/gossip/server/proto" ) const ( diff --git a/packages/gossip/transport/proto/handshake.pb.go b/packages/gossip/server/proto/handshake.pb.go similarity index 97% rename from packages/gossip/transport/proto/handshake.pb.go rename to packages/gossip/server/proto/handshake.pb.go index da559c819039c5d8b60fdb6ff74e779de9c4fc20..c0e7307267baba8270206e24f17f7eebf41e85df 100644 --- a/packages/gossip/transport/proto/handshake.pb.go +++ b/packages/gossip/server/proto/handshake.pb.go @@ -1,12 +1,13 @@ // Code generated by protoc-gen-go. DO NOT EDIT. -// source: transport/proto/handshake.proto +// source: server/proto/handshake.proto package proto import ( fmt "fmt" - proto "github.com/golang/protobuf/proto" math "math" + + proto "github.com/golang/protobuf/proto" ) // Reference imports to suppress errors if they are not otherwise used. @@ -123,7 +124,7 @@ func init() { proto.RegisterType((*HandshakeResponse)(nil), "proto.HandshakeResponse") } -func init() { proto.RegisterFile("transport/proto/handshake.proto", fileDescriptor_d7101ffe19b05443) } +func init() { proto.RegisterFile("server/proto/handshake.proto", fileDescriptor_d7101ffe19b05443) } var fileDescriptor_d7101ffe19b05443 = []byte{ // 206 bytes of a gzipped FileDescriptorProto diff --git a/packages/gossip/transport/proto/handshake.proto b/packages/gossip/server/proto/handshake.proto similarity index 100% rename from packages/gossip/transport/proto/handshake.proto rename to packages/gossip/server/proto/handshake.proto diff --git a/packages/gossip/transport/transport.go b/packages/gossip/server/server.go similarity index 97% rename from packages/gossip/transport/transport.go rename to packages/gossip/server/server.go index 39d6c32b280c86b5fa9c30a82fafa352c21bb633..658f6be6dfc5a7e802b871639164ff7ffc72bfb4 100644 --- a/packages/gossip/transport/transport.go +++ b/packages/gossip/server/server.go @@ -1,4 +1,4 @@ -package transport +package server import ( "bytes" @@ -21,8 +21,8 @@ import ( var ( // ErrTimeout is returned when an expected incoming connection was not received in time. ErrTimeout = errors.New("accept timeout") - // ErrClosed means that the transport was shut down before a response could be received. - ErrClosed = errors.New("transport closed") + // ErrClosed means that the server was shut down before a response could be received. + ErrClosed = errors.New("server closed") // ErrInvalidHandshake is returned when no correct handshake could be established. ErrInvalidHandshake = errors.New("invalid handshake") // ErrNoGossip means that the given peer does not support the gossip service. @@ -71,8 +71,8 @@ type accept struct { conn net.Conn // the actual network connection } -// Listen creates the object and starts listening for incoming connections. -func Listen(local *peer.Local, log *zap.SugaredLogger) (*TCP, error) { +// ListenTCP creates the object and starts listening for incoming connections. +func ListenTCP(local *peer.Local, log *zap.SugaredLogger) (*TCP, error) { t := &TCP{ local: local, log: log, diff --git a/packages/gossip/transport/transport_test.go b/packages/gossip/server/server_test.go similarity index 98% rename from packages/gossip/transport/transport_test.go rename to packages/gossip/server/server_test.go index 7c1fecf632dd0479aa7627ef27fa01c667a4dc11..e4f0607032d0d7474560c84c739f1daced66a917 100644 --- a/packages/gossip/transport/transport_test.go +++ b/packages/gossip/server/server_test.go @@ -1,4 +1,4 @@ -package transport +package server import ( "log" @@ -47,7 +47,7 @@ func newTest(t require.TestingT, name string) (*TCP, func()) { // enable TCP gossipping require.NoError(t, local.UpdateService(service.GossipKey, "tcp", getTCPAddress(t))) - trans, err := Listen(local, l) + trans, err := ListenTCP(local, l) require.NoError(t, err) teardown := func() { diff --git a/plugins/autopeering/autopeering.go b/plugins/autopeering/autopeering.go index ea761dbfb725675f4e5905cf4867df1a8fbe7aba..a7759994b54a07a499ef123eee6766cc9ab534a6 100644 --- a/plugins/autopeering/autopeering.go +++ b/plugins/autopeering/autopeering.go @@ -51,7 +51,7 @@ const defaultZLC = `{ } }` -var zLogger = logger.NewLogger(defaultZLC, "info") +var zLogger = logger.NewLogger(defaultZLC, logLevel) func configureLocal() { ip := net.ParseIP(parameter.NodeConfig.GetString(CFG_ADDRESS)) diff --git a/plugins/autopeering/plugin.go b/plugins/autopeering/plugin.go index 904a19ffb54705697f281f54f588458c549d8c78..3a71eae9c0fe46d823c23dc8de2140cb62e40e53 100644 --- a/plugins/autopeering/plugin.go +++ b/plugins/autopeering/plugin.go @@ -9,12 +9,17 @@ import ( "github.com/iotaledger/hive.go/node" ) -// NETWORK defines the network type used for the autopeering. -const NETWORK = "udp" +const ( + // NETWORK defines the network type used for the autopeering. + NETWORK = "udp" -var PLUGIN = node.NewPlugin("Autopeering", node.Enabled, configure, run) + name = "Autopeering" // name of the plugin + logLevel = "info" +) + +var PLUGIN = node.NewPlugin(name, node.Enabled, configure, run) -var log = logger.NewLogger("Autopeering") +var log = logger.NewLogger(name) func configure(*node.Plugin) { configureEvents() @@ -23,7 +28,7 @@ func configure(*node.Plugin) { } func run(*node.Plugin) { - daemon.BackgroundWorker("Autopeering", start) + daemon.BackgroundWorker(name, start) } func configureEvents() { diff --git a/plugins/gossip/gossip.go b/plugins/gossip/gossip.go index efbb650e226574bbcf0141674aaf2126a6c29757..3ce2dc711f894be784577734fcd2e5d5fd982d7c 100644 --- a/plugins/gossip/gossip.go +++ b/plugins/gossip/gossip.go @@ -1,21 +1,22 @@ package gossip import ( - "errors" + "fmt" "github.com/golang/protobuf/proto" - zL "github.com/iotaledger/autopeering-sim/logger" - "github.com/iotaledger/autopeering-sim/peer/service" - "github.com/iotaledger/autopeering-sim/selection" - "github.com/iotaledger/goshimmer/packages/gossip" + "github.com/iotaledger/autopeering-sim/logger" + "github.com/iotaledger/goshimmer/packages/errors" gp "github.com/iotaledger/goshimmer/packages/gossip" pb "github.com/iotaledger/goshimmer/packages/gossip/proto" - "github.com/iotaledger/goshimmer/packages/gossip/transport" - "github.com/iotaledger/goshimmer/packages/model/value_transaction" + "github.com/iotaledger/goshimmer/packages/gossip/server" + "github.com/iotaledger/goshimmer/packages/typeutils" "github.com/iotaledger/goshimmer/plugins/autopeering/local" "github.com/iotaledger/goshimmer/plugins/tangle" - "github.com/iotaledger/hive.go/events" - "go.uber.org/zap" + "github.com/iotaledger/hive.go/daemon" +) + +var ( + mgr *gp.Manager ) const defaultZLC = `{ @@ -39,80 +40,45 @@ const defaultZLC = `{ } }` -var ( - debugLevel = "info" - zLogger *zap.SugaredLogger - mgr *gp.Manager -) +var zLogger = logger.NewLogger(defaultZLC, logLevel) -func getTransaction(h []byte) ([]byte, error) { - log.Info("Retrieving tx:", string(h)) - tx, err := tangle.GetTransaction(string(h)) - if err != nil { - return []byte{}, err - } - if tx == nil { - return []byte{}, errors.New("Not found") - } - pTx := &pb.TransactionRequest{ - Hash: tx.GetBytes(), - } - b, _ := proto.Marshal(pTx) - return b, nil +func configureGossip() { + mgr = gp.NewManager(local.INSTANCE, getTransaction, zLogger) } -func configureGossip() { - zLogger = zL.NewLogger(defaultZLC, debugLevel) +func start() { + defer log.Info("Stopping Gossip ... done") - trans, err := transport.Listen(local.INSTANCE, zLogger) + srv, err := server.ListenTCP(local.INSTANCE, zLogger) if err != nil { - log.Fatal(err) + log.Fatalf("ListenTCP: %v", err) } + defer srv.Close() - mgr = gp.NewManager(trans, zLogger, getTransaction) - - log.Info("Gossip started @", trans.LocalAddr().String()) -} + mgr.Start(srv) + defer mgr.Close() -func configureEvents() { + log.Infof("Gossip started: address=%v", mgr.LocalAddr()) - selection.Events.Dropped.Attach(events.NewClosure(func(ev *selection.DroppedEvent) { - log.Info("neighbor removed: " + ev.DroppedID.String()) - go mgr.DropNeighbor(ev.DroppedID) - })) + <-daemon.ShutdownSignal + log.Info("Stopping Gossip ...") +} - selection.Events.IncomingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) { - gossipService := ev.Peer.Services().Get(service.GossipKey) - if gossipService != nil { - log.Info("accepted neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String()) - go mgr.AddInbound(ev.Peer) - } - })) +func getTransaction(hash []byte) ([]byte, error) { + log.Infof("Retrieving tx: hash=%s", hash) - selection.Events.OutgoingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) { - gossipService := ev.Peer.Services().Get(service.GossipKey) - if gossipService != nil { - log.Info("chosen neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String()) - go mgr.AddOutbound(ev.Peer) - } - })) + tx, err := tangle.GetTransaction(typeutils.BytesToString(hash)) + if err != nil { + return nil, errors.Wrap(err, "could not get transaction") + } + if tx == nil { + return nil, fmt.Errorf("transaction not found: hash=%s", hash) + } - tangle.Events.TransactionSolid.Attach(events.NewClosure(func(tx *value_transaction.ValueTransaction) { - log.Info("gossip solid tx", tx.MetaTransaction.GetHash()) - t := &pb.Transaction{ - Body: tx.MetaTransaction.GetBytes(), - } - b, err := proto.Marshal(t) - if err != nil { - return - } - go mgr.SendTransaction(b) - })) + pTx := &pb.TransactionRequest{ + Hash: tx.GetBytes(), + } + b, _ := proto.Marshal(pTx) - gossip.Events.RequestTransaction.Attach(events.NewClosure(func(ev *gossip.RequestTransactionEvent) { - pTx := &pb.TransactionRequest{} - proto.Unmarshal(ev.Hash, pTx) - log.Info("Tx Requested:", string(pTx.Hash)) - go mgr.RequestTransaction(pTx.Hash) - })) + return b, nil } diff --git a/plugins/gossip/plugin.go b/plugins/gossip/plugin.go index 5bf6cfca6e3b40a2d30b44bbe7f0c50da67ad96b..963a830e4d7de4e89ca2e273309cfd53de0e2e8f 100644 --- a/plugins/gossip/plugin.go +++ b/plugins/gossip/plugin.go @@ -1,26 +1,75 @@ package gossip import ( + "github.com/golang/protobuf/proto" + "github.com/iotaledger/autopeering-sim/peer/service" + "github.com/iotaledger/autopeering-sim/selection" + "github.com/iotaledger/goshimmer/packages/gossip" + pb "github.com/iotaledger/goshimmer/packages/gossip/proto" + "github.com/iotaledger/goshimmer/packages/model/value_transaction" + "github.com/iotaledger/goshimmer/plugins/tangle" "github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/logger" "github.com/iotaledger/hive.go/node" ) -var PLUGIN = node.NewPlugin("Gossip", node.Enabled, configure, run) -var log = logger.NewLogger("Gossip") - -var ( - close = make(chan struct{}, 1) +const ( + name = "Gossip" // name of the plugin + logLevel = "info" ) -func configure(plugin *node.Plugin) { - daemon.Events.Shutdown.Attach(events.NewClosure(func() { - close <- struct{}{} - })) +var PLUGIN = node.NewPlugin(name, node.Enabled, configure, run) + +var log = logger.NewLogger(name) + +func configure(*node.Plugin) { configureGossip() configureEvents() } -func run(plugin *node.Plugin) { +func run(*node.Plugin) { + daemon.BackgroundWorker(name, start) +} + +func configureEvents() { + selection.Events.Dropped.Attach(events.NewClosure(func(ev *selection.DroppedEvent) { + log.Info("neighbor removed: " + ev.DroppedID.String()) + go mgr.DropNeighbor(ev.DroppedID) + })) + + selection.Events.IncomingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) { + gossipService := ev.Peer.Services().Get(service.GossipKey) + if gossipService != nil { + log.Info("accepted neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String()) + go mgr.AddInbound(ev.Peer) + } + })) + + selection.Events.OutgoingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) { + gossipService := ev.Peer.Services().Get(service.GossipKey) + if gossipService != nil { + log.Info("chosen neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String()) + go mgr.AddOutbound(ev.Peer) + } + })) + + tangle.Events.TransactionSolid.Attach(events.NewClosure(func(tx *value_transaction.ValueTransaction) { + log.Info("gossip solid tx", tx.MetaTransaction.GetHash()) + t := &pb.Transaction{ + Body: tx.MetaTransaction.GetBytes(), + } + b, err := proto.Marshal(t) + if err != nil { + return + } + go mgr.SendTransaction(b) + })) + + gossip.Events.RequestTransaction.Attach(events.NewClosure(func(ev *gossip.RequestTransactionEvent) { + pTx := &pb.TransactionRequest{} + proto.Unmarshal(ev.Hash, pTx) + log.Info("Tx Requested:", string(pTx.Hash)) + go mgr.RequestTransaction(pTx.Hash) + })) }