From b16c56308c0391006acc5ddeba7e8271237d264d Mon Sep 17 00:00:00 2001 From: Wolfgang Welz <welzwo@gmail.com> Date: Sat, 4 Jan 2020 13:15:41 +0100 Subject: [PATCH] fix: Run gossip plugin as a daemon --- packages/gossip/errors.go | 1 + packages/gossip/manager.go | 74 +++++++----- packages/gossip/manager_test.go | 15 ++- .../{transport => server}/connection.go | 2 +- .../gossip/{transport => server}/handshake.go | 4 +- .../proto/handshake.pb.go | 7 +- .../proto/handshake.proto | 0 .../transport.go => server/server.go} | 10 +- .../server_test.go} | 4 +- plugins/autopeering/autopeering.go | 2 +- plugins/autopeering/plugin.go | 15 ++- plugins/gossip/gossip.go | 110 ++++++------------ plugins/gossip/plugin.go | 69 +++++++++-- 13 files changed, 179 insertions(+), 134 deletions(-) rename packages/gossip/{transport => server}/connection.go (96%) rename packages/gossip/{transport => server}/handshake.go (95%) rename packages/gossip/{transport => server}/proto/handshake.pb.go (97%) rename packages/gossip/{transport => server}/proto/handshake.proto (100%) rename packages/gossip/{transport/transport.go => server/server.go} (97%) rename packages/gossip/{transport/transport_test.go => server/server_test.go} (98%) diff --git a/packages/gossip/errors.go b/packages/gossip/errors.go index 7be5d0fc..160ed551 100644 --- a/packages/gossip/errors.go +++ b/packages/gossip/errors.go @@ -3,6 +3,7 @@ package gossip import "github.com/pkg/errors" var ( + ErrNotStarted = errors.New("manager not started") ErrClosed = errors.New("manager closed") ErrNotANeighbor = errors.New("peer is not a neighbor") ErrDuplicateNeighbor = errors.New("peer already connected") diff --git a/packages/gossip/manager.go b/packages/gossip/manager.go index 8c11efba..897f1c50 100644 --- a/packages/gossip/manager.go +++ b/packages/gossip/manager.go @@ -6,47 +6,57 @@ import ( "strings" "sync" + "github.com/iotaledger/autopeering-sim/peer/service" + "github.com/golang/protobuf/proto" "github.com/iotaledger/autopeering-sim/peer" pb "github.com/iotaledger/goshimmer/packages/gossip/proto" - "github.com/iotaledger/goshimmer/packages/gossip/transport" + "github.com/iotaledger/goshimmer/packages/gossip/server" "github.com/pkg/errors" "go.uber.org/zap" ) const ( - maxConnectionAttempts = 3 - maxPacketSize = 2048 + maxPacketSize = 2048 ) type GetTransaction func(txHash []byte) ([]byte, error) type Manager struct { - trans *transport.TCP - log *zap.SugaredLogger + local *peer.Local getTransaction GetTransaction + log *zap.SugaredLogger wg sync.WaitGroup mu sync.RWMutex + srv *server.TCP neighbors map[peer.ID]*neighbor running bool } type neighbor struct { peer *peer.Peer - conn *transport.Connection + conn *server.Connection } -func NewManager(t *transport.TCP, log *zap.SugaredLogger, f GetTransaction) *Manager { - m := &Manager{ - trans: t, - log: log, +func NewManager(local *peer.Local, f GetTransaction, log *zap.SugaredLogger) *Manager { + return &Manager{ + local: local, getTransaction: f, + log: log, + srv: nil, neighbors: make(map[peer.ID]*neighbor), + running: false, } +} + +func (m *Manager) Start(srv *server.TCP) { + m.mu.Lock() + defer m.mu.Unlock() + + m.srv = srv m.running = true - return m } // Close stops the manager and closes all established connections. @@ -62,14 +72,35 @@ func (m *Manager) Close() { m.wg.Wait() } +// LocalAddr returns the public address of the gossip service. +func (m *Manager) LocalAddr() net.Addr { + return m.local.Services().Get(service.GossipKey) +} + // AddOutbound tries to add a neighbor by connecting to that peer. func (m *Manager) AddOutbound(p *peer.Peer) error { - return m.addNeighbor(p, m.trans.DialPeer) + var srv *server.TCP + m.mu.RLock() + if m.srv == nil { + return ErrNotStarted + } + srv = m.srv + m.mu.RUnlock() + + return m.addNeighbor(p, srv.DialPeer) } // AddInbound tries to add a neighbor by accepting an incoming connection from that peer. func (m *Manager) AddInbound(p *peer.Peer) error { - return m.addNeighbor(p, m.trans.AcceptPeer) + var srv *server.TCP + m.mu.RLock() + if m.srv == nil { + return ErrNotStarted + } + srv = m.srv + m.mu.RUnlock() + + return m.addNeighbor(p, srv.AcceptPeer) } // NeighborDropped disconnects the neighbor with the given ID. @@ -151,19 +182,8 @@ func (m *Manager) send(msg []byte, to ...peer.ID) { } } -func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (*transport.Connection, error)) error { - var ( - err error - conn *transport.Connection - ) - for i := 0; i < maxConnectionAttempts; i++ { - conn, err = connectorFunc(peer) - if err == nil { - break - } - } - - // could not add neighbor +func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (*server.Connection, error)) error { + conn, err := connectorFunc(peer) if err != nil { m.log.Debugw("addNeighbor failed", "peer", peer.ID(), "err", err) Events.NeighborDropped.Trigger(&NeighborDroppedEvent{Peer: peer}) @@ -273,7 +293,7 @@ func marshal(msg pb.Message) []byte { return append([]byte{byte(mType)}, data...) } -func disconnect(conn *transport.Connection) { +func disconnect(conn *server.Connection) { _ = conn.Close() Events.NeighborDropped.Trigger(&NeighborDroppedEvent{Peer: conn.Peer()}) } diff --git a/packages/gossip/manager_test.go b/packages/gossip/manager_test.go index df9e8333..789b8699 100644 --- a/packages/gossip/manager_test.go +++ b/packages/gossip/manager_test.go @@ -11,7 +11,7 @@ import ( "github.com/iotaledger/autopeering-sim/peer" "github.com/iotaledger/autopeering-sim/peer/service" pb "github.com/iotaledger/goshimmer/packages/gossip/proto" - "github.com/iotaledger/goshimmer/packages/gossip/transport" + "github.com/iotaledger/goshimmer/packages/gossip/server" "github.com/iotaledger/hive.go/events" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -78,17 +78,20 @@ func newTest(t require.TestingT, name string) (*Manager, func(), *peer.Peer) { // enable TCP gossipping require.NoError(t, local.UpdateService(service.GossipKey, "tcp", getTCPAddress(t))) - trans, err := transport.Listen(local, l) - require.NoError(t, err) + mgr := NewManager(local, getTestTransaction, l) - mgr := NewManager(trans, l, getTestTransaction) + srv, err := server.ListenTCP(local, l) + require.NoError(t, err) // update the service with the actual address - require.NoError(t, local.UpdateService(service.GossipKey, trans.LocalAddr().Network(), trans.LocalAddr().String())) + require.NoError(t, local.UpdateService(service.GossipKey, srv.LocalAddr().Network(), srv.LocalAddr().String())) + + // start the actual gossipping + mgr.Start(srv) teardown := func() { mgr.Close() - trans.Close() + srv.Close() db.Close() } return mgr, teardown, &local.Peer diff --git a/packages/gossip/transport/connection.go b/packages/gossip/server/connection.go similarity index 96% rename from packages/gossip/transport/connection.go rename to packages/gossip/server/connection.go index d677040f..076068f1 100644 --- a/packages/gossip/transport/connection.go +++ b/packages/gossip/server/connection.go @@ -1,4 +1,4 @@ -package transport +package server import ( "net" diff --git a/packages/gossip/transport/handshake.go b/packages/gossip/server/handshake.go similarity index 95% rename from packages/gossip/transport/handshake.go rename to packages/gossip/server/handshake.go index 3489a9d6..ef5bb60e 100644 --- a/packages/gossip/transport/handshake.go +++ b/packages/gossip/server/handshake.go @@ -1,4 +1,4 @@ -package transport +package server import ( "bytes" @@ -6,7 +6,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/iotaledger/autopeering-sim/server" - pb "github.com/iotaledger/goshimmer/packages/gossip/transport/proto" + pb "github.com/iotaledger/goshimmer/packages/gossip/server/proto" ) const ( diff --git a/packages/gossip/transport/proto/handshake.pb.go b/packages/gossip/server/proto/handshake.pb.go similarity index 97% rename from packages/gossip/transport/proto/handshake.pb.go rename to packages/gossip/server/proto/handshake.pb.go index da559c81..c0e73072 100644 --- a/packages/gossip/transport/proto/handshake.pb.go +++ b/packages/gossip/server/proto/handshake.pb.go @@ -1,12 +1,13 @@ // Code generated by protoc-gen-go. DO NOT EDIT. -// source: transport/proto/handshake.proto +// source: server/proto/handshake.proto package proto import ( fmt "fmt" - proto "github.com/golang/protobuf/proto" math "math" + + proto "github.com/golang/protobuf/proto" ) // Reference imports to suppress errors if they are not otherwise used. @@ -123,7 +124,7 @@ func init() { proto.RegisterType((*HandshakeResponse)(nil), "proto.HandshakeResponse") } -func init() { proto.RegisterFile("transport/proto/handshake.proto", fileDescriptor_d7101ffe19b05443) } +func init() { proto.RegisterFile("server/proto/handshake.proto", fileDescriptor_d7101ffe19b05443) } var fileDescriptor_d7101ffe19b05443 = []byte{ // 206 bytes of a gzipped FileDescriptorProto diff --git a/packages/gossip/transport/proto/handshake.proto b/packages/gossip/server/proto/handshake.proto similarity index 100% rename from packages/gossip/transport/proto/handshake.proto rename to packages/gossip/server/proto/handshake.proto diff --git a/packages/gossip/transport/transport.go b/packages/gossip/server/server.go similarity index 97% rename from packages/gossip/transport/transport.go rename to packages/gossip/server/server.go index 39d6c32b..658f6be6 100644 --- a/packages/gossip/transport/transport.go +++ b/packages/gossip/server/server.go @@ -1,4 +1,4 @@ -package transport +package server import ( "bytes" @@ -21,8 +21,8 @@ import ( var ( // ErrTimeout is returned when an expected incoming connection was not received in time. ErrTimeout = errors.New("accept timeout") - // ErrClosed means that the transport was shut down before a response could be received. - ErrClosed = errors.New("transport closed") + // ErrClosed means that the server was shut down before a response could be received. + ErrClosed = errors.New("server closed") // ErrInvalidHandshake is returned when no correct handshake could be established. ErrInvalidHandshake = errors.New("invalid handshake") // ErrNoGossip means that the given peer does not support the gossip service. @@ -71,8 +71,8 @@ type accept struct { conn net.Conn // the actual network connection } -// Listen creates the object and starts listening for incoming connections. -func Listen(local *peer.Local, log *zap.SugaredLogger) (*TCP, error) { +// ListenTCP creates the object and starts listening for incoming connections. +func ListenTCP(local *peer.Local, log *zap.SugaredLogger) (*TCP, error) { t := &TCP{ local: local, log: log, diff --git a/packages/gossip/transport/transport_test.go b/packages/gossip/server/server_test.go similarity index 98% rename from packages/gossip/transport/transport_test.go rename to packages/gossip/server/server_test.go index 7c1fecf6..e4f06070 100644 --- a/packages/gossip/transport/transport_test.go +++ b/packages/gossip/server/server_test.go @@ -1,4 +1,4 @@ -package transport +package server import ( "log" @@ -47,7 +47,7 @@ func newTest(t require.TestingT, name string) (*TCP, func()) { // enable TCP gossipping require.NoError(t, local.UpdateService(service.GossipKey, "tcp", getTCPAddress(t))) - trans, err := Listen(local, l) + trans, err := ListenTCP(local, l) require.NoError(t, err) teardown := func() { diff --git a/plugins/autopeering/autopeering.go b/plugins/autopeering/autopeering.go index ea761dbf..a7759994 100644 --- a/plugins/autopeering/autopeering.go +++ b/plugins/autopeering/autopeering.go @@ -51,7 +51,7 @@ const defaultZLC = `{ } }` -var zLogger = logger.NewLogger(defaultZLC, "info") +var zLogger = logger.NewLogger(defaultZLC, logLevel) func configureLocal() { ip := net.ParseIP(parameter.NodeConfig.GetString(CFG_ADDRESS)) diff --git a/plugins/autopeering/plugin.go b/plugins/autopeering/plugin.go index 904a19ff..3a71eae9 100644 --- a/plugins/autopeering/plugin.go +++ b/plugins/autopeering/plugin.go @@ -9,12 +9,17 @@ import ( "github.com/iotaledger/hive.go/node" ) -// NETWORK defines the network type used for the autopeering. -const NETWORK = "udp" +const ( + // NETWORK defines the network type used for the autopeering. + NETWORK = "udp" -var PLUGIN = node.NewPlugin("Autopeering", node.Enabled, configure, run) + name = "Autopeering" // name of the plugin + logLevel = "info" +) + +var PLUGIN = node.NewPlugin(name, node.Enabled, configure, run) -var log = logger.NewLogger("Autopeering") +var log = logger.NewLogger(name) func configure(*node.Plugin) { configureEvents() @@ -23,7 +28,7 @@ func configure(*node.Plugin) { } func run(*node.Plugin) { - daemon.BackgroundWorker("Autopeering", start) + daemon.BackgroundWorker(name, start) } func configureEvents() { diff --git a/plugins/gossip/gossip.go b/plugins/gossip/gossip.go index efbb650e..3ce2dc71 100644 --- a/plugins/gossip/gossip.go +++ b/plugins/gossip/gossip.go @@ -1,21 +1,22 @@ package gossip import ( - "errors" + "fmt" "github.com/golang/protobuf/proto" - zL "github.com/iotaledger/autopeering-sim/logger" - "github.com/iotaledger/autopeering-sim/peer/service" - "github.com/iotaledger/autopeering-sim/selection" - "github.com/iotaledger/goshimmer/packages/gossip" + "github.com/iotaledger/autopeering-sim/logger" + "github.com/iotaledger/goshimmer/packages/errors" gp "github.com/iotaledger/goshimmer/packages/gossip" pb "github.com/iotaledger/goshimmer/packages/gossip/proto" - "github.com/iotaledger/goshimmer/packages/gossip/transport" - "github.com/iotaledger/goshimmer/packages/model/value_transaction" + "github.com/iotaledger/goshimmer/packages/gossip/server" + "github.com/iotaledger/goshimmer/packages/typeutils" "github.com/iotaledger/goshimmer/plugins/autopeering/local" "github.com/iotaledger/goshimmer/plugins/tangle" - "github.com/iotaledger/hive.go/events" - "go.uber.org/zap" + "github.com/iotaledger/hive.go/daemon" +) + +var ( + mgr *gp.Manager ) const defaultZLC = `{ @@ -39,80 +40,45 @@ const defaultZLC = `{ } }` -var ( - debugLevel = "info" - zLogger *zap.SugaredLogger - mgr *gp.Manager -) +var zLogger = logger.NewLogger(defaultZLC, logLevel) -func getTransaction(h []byte) ([]byte, error) { - log.Info("Retrieving tx:", string(h)) - tx, err := tangle.GetTransaction(string(h)) - if err != nil { - return []byte{}, err - } - if tx == nil { - return []byte{}, errors.New("Not found") - } - pTx := &pb.TransactionRequest{ - Hash: tx.GetBytes(), - } - b, _ := proto.Marshal(pTx) - return b, nil +func configureGossip() { + mgr = gp.NewManager(local.INSTANCE, getTransaction, zLogger) } -func configureGossip() { - zLogger = zL.NewLogger(defaultZLC, debugLevel) +func start() { + defer log.Info("Stopping Gossip ... done") - trans, err := transport.Listen(local.INSTANCE, zLogger) + srv, err := server.ListenTCP(local.INSTANCE, zLogger) if err != nil { - log.Fatal(err) + log.Fatalf("ListenTCP: %v", err) } + defer srv.Close() - mgr = gp.NewManager(trans, zLogger, getTransaction) - - log.Info("Gossip started @", trans.LocalAddr().String()) -} + mgr.Start(srv) + defer mgr.Close() -func configureEvents() { + log.Infof("Gossip started: address=%v", mgr.LocalAddr()) - selection.Events.Dropped.Attach(events.NewClosure(func(ev *selection.DroppedEvent) { - log.Info("neighbor removed: " + ev.DroppedID.String()) - go mgr.DropNeighbor(ev.DroppedID) - })) + <-daemon.ShutdownSignal + log.Info("Stopping Gossip ...") +} - selection.Events.IncomingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) { - gossipService := ev.Peer.Services().Get(service.GossipKey) - if gossipService != nil { - log.Info("accepted neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String()) - go mgr.AddInbound(ev.Peer) - } - })) +func getTransaction(hash []byte) ([]byte, error) { + log.Infof("Retrieving tx: hash=%s", hash) - selection.Events.OutgoingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) { - gossipService := ev.Peer.Services().Get(service.GossipKey) - if gossipService != nil { - log.Info("chosen neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String()) - go mgr.AddOutbound(ev.Peer) - } - })) + tx, err := tangle.GetTransaction(typeutils.BytesToString(hash)) + if err != nil { + return nil, errors.Wrap(err, "could not get transaction") + } + if tx == nil { + return nil, fmt.Errorf("transaction not found: hash=%s", hash) + } - tangle.Events.TransactionSolid.Attach(events.NewClosure(func(tx *value_transaction.ValueTransaction) { - log.Info("gossip solid tx", tx.MetaTransaction.GetHash()) - t := &pb.Transaction{ - Body: tx.MetaTransaction.GetBytes(), - } - b, err := proto.Marshal(t) - if err != nil { - return - } - go mgr.SendTransaction(b) - })) + pTx := &pb.TransactionRequest{ + Hash: tx.GetBytes(), + } + b, _ := proto.Marshal(pTx) - gossip.Events.RequestTransaction.Attach(events.NewClosure(func(ev *gossip.RequestTransactionEvent) { - pTx := &pb.TransactionRequest{} - proto.Unmarshal(ev.Hash, pTx) - log.Info("Tx Requested:", string(pTx.Hash)) - go mgr.RequestTransaction(pTx.Hash) - })) + return b, nil } diff --git a/plugins/gossip/plugin.go b/plugins/gossip/plugin.go index 5bf6cfca..963a830e 100644 --- a/plugins/gossip/plugin.go +++ b/plugins/gossip/plugin.go @@ -1,26 +1,75 @@ package gossip import ( + "github.com/golang/protobuf/proto" + "github.com/iotaledger/autopeering-sim/peer/service" + "github.com/iotaledger/autopeering-sim/selection" + "github.com/iotaledger/goshimmer/packages/gossip" + pb "github.com/iotaledger/goshimmer/packages/gossip/proto" + "github.com/iotaledger/goshimmer/packages/model/value_transaction" + "github.com/iotaledger/goshimmer/plugins/tangle" "github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/logger" "github.com/iotaledger/hive.go/node" ) -var PLUGIN = node.NewPlugin("Gossip", node.Enabled, configure, run) -var log = logger.NewLogger("Gossip") - -var ( - close = make(chan struct{}, 1) +const ( + name = "Gossip" // name of the plugin + logLevel = "info" ) -func configure(plugin *node.Plugin) { - daemon.Events.Shutdown.Attach(events.NewClosure(func() { - close <- struct{}{} - })) +var PLUGIN = node.NewPlugin(name, node.Enabled, configure, run) + +var log = logger.NewLogger(name) + +func configure(*node.Plugin) { configureGossip() configureEvents() } -func run(plugin *node.Plugin) { +func run(*node.Plugin) { + daemon.BackgroundWorker(name, start) +} + +func configureEvents() { + selection.Events.Dropped.Attach(events.NewClosure(func(ev *selection.DroppedEvent) { + log.Info("neighbor removed: " + ev.DroppedID.String()) + go mgr.DropNeighbor(ev.DroppedID) + })) + + selection.Events.IncomingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) { + gossipService := ev.Peer.Services().Get(service.GossipKey) + if gossipService != nil { + log.Info("accepted neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String()) + go mgr.AddInbound(ev.Peer) + } + })) + + selection.Events.OutgoingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) { + gossipService := ev.Peer.Services().Get(service.GossipKey) + if gossipService != nil { + log.Info("chosen neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String()) + go mgr.AddOutbound(ev.Peer) + } + })) + + tangle.Events.TransactionSolid.Attach(events.NewClosure(func(tx *value_transaction.ValueTransaction) { + log.Info("gossip solid tx", tx.MetaTransaction.GetHash()) + t := &pb.Transaction{ + Body: tx.MetaTransaction.GetBytes(), + } + b, err := proto.Marshal(t) + if err != nil { + return + } + go mgr.SendTransaction(b) + })) + + gossip.Events.RequestTransaction.Attach(events.NewClosure(func(ev *gossip.RequestTransactionEvent) { + pTx := &pb.TransactionRequest{} + proto.Unmarshal(ev.Hash, pTx) + log.Info("Tx Requested:", string(pTx.Hash)) + go mgr.RequestTransaction(pTx.Hash) + })) } -- GitLab