Skip to content
Snippets Groups Projects
Commit b16c5630 authored by Wolfgang Welz's avatar Wolfgang Welz
Browse files

fix: Run gossip plugin as a daemon

parent 60efb2aa
No related branches found
No related tags found
No related merge requests found
......@@ -3,6 +3,7 @@ package gossip
import "github.com/pkg/errors"
var (
ErrNotStarted = errors.New("manager not started")
ErrClosed = errors.New("manager closed")
ErrNotANeighbor = errors.New("peer is not a neighbor")
ErrDuplicateNeighbor = errors.New("peer already connected")
......
......@@ -6,47 +6,57 @@ import (
"strings"
"sync"
"github.com/iotaledger/autopeering-sim/peer/service"
"github.com/golang/protobuf/proto"
"github.com/iotaledger/autopeering-sim/peer"
pb "github.com/iotaledger/goshimmer/packages/gossip/proto"
"github.com/iotaledger/goshimmer/packages/gossip/transport"
"github.com/iotaledger/goshimmer/packages/gossip/server"
"github.com/pkg/errors"
"go.uber.org/zap"
)
const (
maxConnectionAttempts = 3
maxPacketSize = 2048
)
type GetTransaction func(txHash []byte) ([]byte, error)
type Manager struct {
trans *transport.TCP
log *zap.SugaredLogger
local *peer.Local
getTransaction GetTransaction
log *zap.SugaredLogger
wg sync.WaitGroup
mu sync.RWMutex
srv *server.TCP
neighbors map[peer.ID]*neighbor
running bool
}
type neighbor struct {
peer *peer.Peer
conn *transport.Connection
conn *server.Connection
}
func NewManager(t *transport.TCP, log *zap.SugaredLogger, f GetTransaction) *Manager {
m := &Manager{
trans: t,
log: log,
func NewManager(local *peer.Local, f GetTransaction, log *zap.SugaredLogger) *Manager {
return &Manager{
local: local,
getTransaction: f,
log: log,
srv: nil,
neighbors: make(map[peer.ID]*neighbor),
running: false,
}
}
func (m *Manager) Start(srv *server.TCP) {
m.mu.Lock()
defer m.mu.Unlock()
m.srv = srv
m.running = true
return m
}
// Close stops the manager and closes all established connections.
......@@ -62,14 +72,35 @@ func (m *Manager) Close() {
m.wg.Wait()
}
// LocalAddr returns the public address of the gossip service.
func (m *Manager) LocalAddr() net.Addr {
return m.local.Services().Get(service.GossipKey)
}
// AddOutbound tries to add a neighbor by connecting to that peer.
func (m *Manager) AddOutbound(p *peer.Peer) error {
return m.addNeighbor(p, m.trans.DialPeer)
var srv *server.TCP
m.mu.RLock()
if m.srv == nil {
return ErrNotStarted
}
srv = m.srv
m.mu.RUnlock()
return m.addNeighbor(p, srv.DialPeer)
}
// AddInbound tries to add a neighbor by accepting an incoming connection from that peer.
func (m *Manager) AddInbound(p *peer.Peer) error {
return m.addNeighbor(p, m.trans.AcceptPeer)
var srv *server.TCP
m.mu.RLock()
if m.srv == nil {
return ErrNotStarted
}
srv = m.srv
m.mu.RUnlock()
return m.addNeighbor(p, srv.AcceptPeer)
}
// NeighborDropped disconnects the neighbor with the given ID.
......@@ -151,19 +182,8 @@ func (m *Manager) send(msg []byte, to ...peer.ID) {
}
}
func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (*transport.Connection, error)) error {
var (
err error
conn *transport.Connection
)
for i := 0; i < maxConnectionAttempts; i++ {
conn, err = connectorFunc(peer)
if err == nil {
break
}
}
// could not add neighbor
func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (*server.Connection, error)) error {
conn, err := connectorFunc(peer)
if err != nil {
m.log.Debugw("addNeighbor failed", "peer", peer.ID(), "err", err)
Events.NeighborDropped.Trigger(&NeighborDroppedEvent{Peer: peer})
......@@ -273,7 +293,7 @@ func marshal(msg pb.Message) []byte {
return append([]byte{byte(mType)}, data...)
}
func disconnect(conn *transport.Connection) {
func disconnect(conn *server.Connection) {
_ = conn.Close()
Events.NeighborDropped.Trigger(&NeighborDroppedEvent{Peer: conn.Peer()})
}
......@@ -11,7 +11,7 @@ import (
"github.com/iotaledger/autopeering-sim/peer"
"github.com/iotaledger/autopeering-sim/peer/service"
pb "github.com/iotaledger/goshimmer/packages/gossip/proto"
"github.com/iotaledger/goshimmer/packages/gossip/transport"
"github.com/iotaledger/goshimmer/packages/gossip/server"
"github.com/iotaledger/hive.go/events"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
......@@ -78,17 +78,20 @@ func newTest(t require.TestingT, name string) (*Manager, func(), *peer.Peer) {
// enable TCP gossipping
require.NoError(t, local.UpdateService(service.GossipKey, "tcp", getTCPAddress(t)))
trans, err := transport.Listen(local, l)
require.NoError(t, err)
mgr := NewManager(local, getTestTransaction, l)
mgr := NewManager(trans, l, getTestTransaction)
srv, err := server.ListenTCP(local, l)
require.NoError(t, err)
// update the service with the actual address
require.NoError(t, local.UpdateService(service.GossipKey, trans.LocalAddr().Network(), trans.LocalAddr().String()))
require.NoError(t, local.UpdateService(service.GossipKey, srv.LocalAddr().Network(), srv.LocalAddr().String()))
// start the actual gossipping
mgr.Start(srv)
teardown := func() {
mgr.Close()
trans.Close()
srv.Close()
db.Close()
}
return mgr, teardown, &local.Peer
......
package transport
package server
import (
"net"
......
package transport
package server
import (
"bytes"
......@@ -6,7 +6,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/iotaledger/autopeering-sim/server"
pb "github.com/iotaledger/goshimmer/packages/gossip/transport/proto"
pb "github.com/iotaledger/goshimmer/packages/gossip/server/proto"
)
const (
......
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: transport/proto/handshake.proto
// source: server/proto/handshake.proto
package proto
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
math "math"
proto "github.com/golang/protobuf/proto"
)
// Reference imports to suppress errors if they are not otherwise used.
......@@ -123,7 +124,7 @@ func init() {
proto.RegisterType((*HandshakeResponse)(nil), "proto.HandshakeResponse")
}
func init() { proto.RegisterFile("transport/proto/handshake.proto", fileDescriptor_d7101ffe19b05443) }
func init() { proto.RegisterFile("server/proto/handshake.proto", fileDescriptor_d7101ffe19b05443) }
var fileDescriptor_d7101ffe19b05443 = []byte{
// 206 bytes of a gzipped FileDescriptorProto
......
package transport
package server
import (
"bytes"
......@@ -21,8 +21,8 @@ import (
var (
// ErrTimeout is returned when an expected incoming connection was not received in time.
ErrTimeout = errors.New("accept timeout")
// ErrClosed means that the transport was shut down before a response could be received.
ErrClosed = errors.New("transport closed")
// ErrClosed means that the server was shut down before a response could be received.
ErrClosed = errors.New("server closed")
// ErrInvalidHandshake is returned when no correct handshake could be established.
ErrInvalidHandshake = errors.New("invalid handshake")
// ErrNoGossip means that the given peer does not support the gossip service.
......@@ -71,8 +71,8 @@ type accept struct {
conn net.Conn // the actual network connection
}
// Listen creates the object and starts listening for incoming connections.
func Listen(local *peer.Local, log *zap.SugaredLogger) (*TCP, error) {
// ListenTCP creates the object and starts listening for incoming connections.
func ListenTCP(local *peer.Local, log *zap.SugaredLogger) (*TCP, error) {
t := &TCP{
local: local,
log: log,
......
package transport
package server
import (
"log"
......@@ -47,7 +47,7 @@ func newTest(t require.TestingT, name string) (*TCP, func()) {
// enable TCP gossipping
require.NoError(t, local.UpdateService(service.GossipKey, "tcp", getTCPAddress(t)))
trans, err := Listen(local, l)
trans, err := ListenTCP(local, l)
require.NoError(t, err)
teardown := func() {
......
......@@ -51,7 +51,7 @@ const defaultZLC = `{
}
}`
var zLogger = logger.NewLogger(defaultZLC, "info")
var zLogger = logger.NewLogger(defaultZLC, logLevel)
func configureLocal() {
ip := net.ParseIP(parameter.NodeConfig.GetString(CFG_ADDRESS))
......
......@@ -9,12 +9,17 @@ import (
"github.com/iotaledger/hive.go/node"
)
const (
// NETWORK defines the network type used for the autopeering.
const NETWORK = "udp"
NETWORK = "udp"
var PLUGIN = node.NewPlugin("Autopeering", node.Enabled, configure, run)
name = "Autopeering" // name of the plugin
logLevel = "info"
)
var PLUGIN = node.NewPlugin(name, node.Enabled, configure, run)
var log = logger.NewLogger("Autopeering")
var log = logger.NewLogger(name)
func configure(*node.Plugin) {
configureEvents()
......@@ -23,7 +28,7 @@ func configure(*node.Plugin) {
}
func run(*node.Plugin) {
daemon.BackgroundWorker("Autopeering", start)
daemon.BackgroundWorker(name, start)
}
func configureEvents() {
......
package gossip
import (
"errors"
"fmt"
"github.com/golang/protobuf/proto"
zL "github.com/iotaledger/autopeering-sim/logger"
"github.com/iotaledger/autopeering-sim/peer/service"
"github.com/iotaledger/autopeering-sim/selection"
"github.com/iotaledger/goshimmer/packages/gossip"
"github.com/iotaledger/autopeering-sim/logger"
"github.com/iotaledger/goshimmer/packages/errors"
gp "github.com/iotaledger/goshimmer/packages/gossip"
pb "github.com/iotaledger/goshimmer/packages/gossip/proto"
"github.com/iotaledger/goshimmer/packages/gossip/transport"
"github.com/iotaledger/goshimmer/packages/model/value_transaction"
"github.com/iotaledger/goshimmer/packages/gossip/server"
"github.com/iotaledger/goshimmer/packages/typeutils"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/goshimmer/plugins/tangle"
"github.com/iotaledger/hive.go/events"
"go.uber.org/zap"
"github.com/iotaledger/hive.go/daemon"
)
var (
mgr *gp.Manager
)
const defaultZLC = `{
......@@ -39,80 +40,45 @@ const defaultZLC = `{
}
}`
var (
debugLevel = "info"
zLogger *zap.SugaredLogger
mgr *gp.Manager
)
var zLogger = logger.NewLogger(defaultZLC, logLevel)
func getTransaction(h []byte) ([]byte, error) {
log.Info("Retrieving tx:", string(h))
tx, err := tangle.GetTransaction(string(h))
if err != nil {
return []byte{}, err
}
if tx == nil {
return []byte{}, errors.New("Not found")
}
pTx := &pb.TransactionRequest{
Hash: tx.GetBytes(),
}
b, _ := proto.Marshal(pTx)
return b, nil
func configureGossip() {
mgr = gp.NewManager(local.INSTANCE, getTransaction, zLogger)
}
func configureGossip() {
zLogger = zL.NewLogger(defaultZLC, debugLevel)
func start() {
defer log.Info("Stopping Gossip ... done")
trans, err := transport.Listen(local.INSTANCE, zLogger)
srv, err := server.ListenTCP(local.INSTANCE, zLogger)
if err != nil {
log.Fatal(err)
log.Fatalf("ListenTCP: %v", err)
}
defer srv.Close()
mgr = gp.NewManager(trans, zLogger, getTransaction)
mgr.Start(srv)
defer mgr.Close()
log.Info("Gossip started @", trans.LocalAddr().String())
}
log.Infof("Gossip started: address=%v", mgr.LocalAddr())
func configureEvents() {
<-daemon.ShutdownSignal
log.Info("Stopping Gossip ...")
}
selection.Events.Dropped.Attach(events.NewClosure(func(ev *selection.DroppedEvent) {
log.Info("neighbor removed: " + ev.DroppedID.String())
go mgr.DropNeighbor(ev.DroppedID)
}))
func getTransaction(hash []byte) ([]byte, error) {
log.Infof("Retrieving tx: hash=%s", hash)
selection.Events.IncomingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) {
gossipService := ev.Peer.Services().Get(service.GossipKey)
if gossipService != nil {
log.Info("accepted neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String())
go mgr.AddInbound(ev.Peer)
tx, err := tangle.GetTransaction(typeutils.BytesToString(hash))
if err != nil {
return nil, errors.Wrap(err, "could not get transaction")
}
}))
selection.Events.OutgoingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) {
gossipService := ev.Peer.Services().Get(service.GossipKey)
if gossipService != nil {
log.Info("chosen neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String())
go mgr.AddOutbound(ev.Peer)
if tx == nil {
return nil, fmt.Errorf("transaction not found: hash=%s", hash)
}
}))
tangle.Events.TransactionSolid.Attach(events.NewClosure(func(tx *value_transaction.ValueTransaction) {
log.Info("gossip solid tx", tx.MetaTransaction.GetHash())
t := &pb.Transaction{
Body: tx.MetaTransaction.GetBytes(),
}
b, err := proto.Marshal(t)
if err != nil {
return
pTx := &pb.TransactionRequest{
Hash: tx.GetBytes(),
}
go mgr.SendTransaction(b)
}))
b, _ := proto.Marshal(pTx)
gossip.Events.RequestTransaction.Attach(events.NewClosure(func(ev *gossip.RequestTransactionEvent) {
pTx := &pb.TransactionRequest{}
proto.Unmarshal(ev.Hash, pTx)
log.Info("Tx Requested:", string(pTx.Hash))
go mgr.RequestTransaction(pTx.Hash)
}))
return b, nil
}
package gossip
import (
"github.com/golang/protobuf/proto"
"github.com/iotaledger/autopeering-sim/peer/service"
"github.com/iotaledger/autopeering-sim/selection"
"github.com/iotaledger/goshimmer/packages/gossip"
pb "github.com/iotaledger/goshimmer/packages/gossip/proto"
"github.com/iotaledger/goshimmer/packages/model/value_transaction"
"github.com/iotaledger/goshimmer/plugins/tangle"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
)
var PLUGIN = node.NewPlugin("Gossip", node.Enabled, configure, run)
var log = logger.NewLogger("Gossip")
var (
close = make(chan struct{}, 1)
const (
name = "Gossip" // name of the plugin
logLevel = "info"
)
func configure(plugin *node.Plugin) {
daemon.Events.Shutdown.Attach(events.NewClosure(func() {
close <- struct{}{}
}))
var PLUGIN = node.NewPlugin(name, node.Enabled, configure, run)
var log = logger.NewLogger(name)
func configure(*node.Plugin) {
configureGossip()
configureEvents()
}
func run(plugin *node.Plugin) {
func run(*node.Plugin) {
daemon.BackgroundWorker(name, start)
}
func configureEvents() {
selection.Events.Dropped.Attach(events.NewClosure(func(ev *selection.DroppedEvent) {
log.Info("neighbor removed: " + ev.DroppedID.String())
go mgr.DropNeighbor(ev.DroppedID)
}))
selection.Events.IncomingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) {
gossipService := ev.Peer.Services().Get(service.GossipKey)
if gossipService != nil {
log.Info("accepted neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String())
go mgr.AddInbound(ev.Peer)
}
}))
selection.Events.OutgoingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) {
gossipService := ev.Peer.Services().Get(service.GossipKey)
if gossipService != nil {
log.Info("chosen neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String())
go mgr.AddOutbound(ev.Peer)
}
}))
tangle.Events.TransactionSolid.Attach(events.NewClosure(func(tx *value_transaction.ValueTransaction) {
log.Info("gossip solid tx", tx.MetaTransaction.GetHash())
t := &pb.Transaction{
Body: tx.MetaTransaction.GetBytes(),
}
b, err := proto.Marshal(t)
if err != nil {
return
}
go mgr.SendTransaction(b)
}))
gossip.Events.RequestTransaction.Attach(events.NewClosure(func(ev *gossip.RequestTransactionEvent) {
pTx := &pb.TransactionRequest{}
proto.Unmarshal(ev.Hash, pTx)
log.Info("Tx Requested:", string(pTx.Hash))
go mgr.RequestTransaction(pTx.Hash)
}))
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment