diff --git a/go.mod b/go.mod index 41d778526d789d8bb4185064801a829843d101d0..1a51bdbc97445688814a11e36ba4f4d10a58af79 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.13 require ( github.com/StabbyCutyou/buffstreams v2.0.0+incompatible + github.com/capossele/gossip v0.0.0-20191205112840-0e578079b414 github.com/dgraph-io/badger v1.6.0 github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/gdamore/tcell v1.3.0 @@ -11,7 +12,7 @@ require ( github.com/golang/protobuf v1.3.2 github.com/google/open-location-code/go v0.0.0-20190903173953-119bc96a3a51 github.com/gorilla/websocket v1.4.1 - github.com/iotaledger/autopeering-sim v0.0.0-20191201144404-58a6f3b1a56d + github.com/iotaledger/autopeering-sim v0.0.0-20191202192349-f8e7a238c2bb github.com/iotaledger/hive.go v0.0.0-20191125115115-f88d4ecab6dd github.com/iotaledger/iota.go v1.0.0-beta.10 github.com/labstack/echo v3.3.10+incompatible diff --git a/go.sum b/go.sum index 756f72664efab54e5926e14a3a1f6ea4ba15f119..55861d764132295ed1c10fe92c1adc7c7e4b1024 100644 --- a/go.sum +++ b/go.sum @@ -18,6 +18,10 @@ github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5 github.com/beevik/ntp v0.2.0/go.mod h1:hIHWr+l3+/clUnF44zdK+CWW7fO8dR5cIylAQ76NRpg= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= +github.com/capossele/gossip v0.0.0-20191204191545-36eddf08c1aa h1:46C1Ce+93zGZ+JJ4JUw3EuXHQXqKYzIX7+CRG0fweNk= +github.com/capossele/gossip v0.0.0-20191204191545-36eddf08c1aa/go.mod h1:P8rJEmorO5QpCL236F8vNhUFwFFjezt2d/+/LeRlELA= +github.com/capossele/gossip v0.0.0-20191205112840-0e578079b414 h1:C9Q279xU15Qt5WVZNH6t/1L7exbTVYp1uMRS9NSmL/U= +github.com/capossele/gossip v0.0.0-20191205112840-0e578079b414/go.mod h1:DnYLNZclq7cY6s2oA6wwhQ4tDB0j38enJHPrhpzOpJc= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= @@ -113,6 +117,8 @@ github.com/iotaledger/autopeering-sim v0.0.0-20191127100001-7ff75c77f051 h1:6JWv github.com/iotaledger/autopeering-sim v0.0.0-20191127100001-7ff75c77f051/go.mod h1:JiaqaxLkQVnd8e/sya9y/LlRW56WlRKRl2TQXQCVssI= github.com/iotaledger/autopeering-sim v0.0.0-20191201144404-58a6f3b1a56d h1:wZoNQRwLj4PqhKfruVQ1Yeci6kaSXnE9nSNCFtJWZ9s= github.com/iotaledger/autopeering-sim v0.0.0-20191201144404-58a6f3b1a56d/go.mod h1:JiaqaxLkQVnd8e/sya9y/LlRW56WlRKRl2TQXQCVssI= +github.com/iotaledger/autopeering-sim v0.0.0-20191202192349-f8e7a238c2bb h1:nfe6HpDhLjvnliVwaz8sO2E/fpD4BEnI/FwfrU+iDRU= +github.com/iotaledger/autopeering-sim v0.0.0-20191202192349-f8e7a238c2bb/go.mod h1:JiaqaxLkQVnd8e/sya9y/LlRW56WlRKRl2TQXQCVssI= github.com/iotaledger/goshimmer v0.0.0-20191113134331-c2d1b2f9d533/go.mod h1:7vYiofXphp9+PkgVAEM0pvw3aoi4ksrZ7lrEgX50XHs= github.com/iotaledger/hive.go v0.0.0-20191118130432-89eebe8fe8eb h1:nuS/LETRJ8obUyBIZeyxeei0ZPlyOMj8YPziOgSM4Og= github.com/iotaledger/hive.go v0.0.0-20191118130432-89eebe8fe8eb/go.mod h1:1Thhlil4lHzuy53EVvmEbEvWBFY0Tasp4kCBfxBCPIk= diff --git a/main.go b/main.go index b43e2240c7a1ec6824837a9eba4fa2faaf9d3277..2d313d68467dd7608bca855ec349d279762c30f1 100644 --- a/main.go +++ b/main.go @@ -7,7 +7,6 @@ import ( "github.com/iotaledger/goshimmer/plugins/cli" "github.com/iotaledger/goshimmer/plugins/dashboard" "github.com/iotaledger/goshimmer/plugins/gossip" - gossip_on_solidification "github.com/iotaledger/goshimmer/plugins/gossip-on-solidification" "github.com/iotaledger/goshimmer/plugins/gracefulshutdown" "github.com/iotaledger/goshimmer/plugins/metrics" "github.com/iotaledger/goshimmer/plugins/statusscreen" @@ -28,7 +27,7 @@ func main() { cli.PLUGIN, autopeering.PLUGIN, gossip.PLUGIN, - gossip_on_solidification.PLUGIN, + //gossip_on_solidification.PLUGIN, tangle.PLUGIN, bundleprocessor.PLUGIN, analysis.PLUGIN, diff --git a/packages/gossip/events.go b/packages/gossip/events.go new file mode 100644 index 0000000000000000000000000000000000000000..4c4e34a0675f6632d1a52c60f0626cbec28ed8a7 --- /dev/null +++ b/packages/gossip/events.go @@ -0,0 +1,28 @@ +package gossip + +import ( + "github.com/iotaledger/autopeering-sim/peer" + "github.com/iotaledger/hive.go/events" +) + +// Events contains all the events that are triggered during the gossip protocol. +type Events struct { + NewTransaction *events.Event + DropNeighbor *events.Event +} + +type NewTransactionEvent struct { + Body []byte + Peer *peer.Peer +} +type DropNeighborEvent struct { + Peer *peer.Peer +} + +func newTransaction(handler interface{}, params ...interface{}) { + handler.(func(*NewTransactionEvent))(params[0].(*NewTransactionEvent)) +} + +func dropNeighbor(handler interface{}, params ...interface{}) { + handler.(func(*DropNeighborEvent))(params[0].(*DropNeighborEvent)) +} diff --git a/packages/gossip/manager.go b/packages/gossip/manager.go new file mode 100644 index 0000000000000000000000000000000000000000..6c889e36fd6771f4d40c6b6e0d8d5c905da6de02 --- /dev/null +++ b/packages/gossip/manager.go @@ -0,0 +1,204 @@ +package gossip + +import ( + "net" + + "github.com/capossele/gossip/neighbor" + pb "github.com/capossele/gossip/proto" + "github.com/capossele/gossip/transport" + "github.com/golang/protobuf/proto" + "github.com/iotaledger/autopeering-sim/peer" + "github.com/iotaledger/hive.go/events" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +const ( + maxAttempts = 3 +) + +var ( + Event Events +) + +type GetTransaction func(txHash []byte) ([]byte, error) + +type Manager struct { + neighborhood *neighbor.NeighborMap + trans *transport.TransportTCP + log *zap.SugaredLogger + getTransaction GetTransaction + Events Events +} + +func NewManager(t *transport.TransportTCP, log *zap.SugaredLogger, f GetTransaction) *Manager { + mgr := &Manager{ + neighborhood: neighbor.NewMap(), + trans: t, + log: log, + getTransaction: f, + Events: Events{ + NewTransaction: events.NewEvent(newTransaction), + DropNeighbor: events.NewEvent(dropNeighbor)}, + } + Event = mgr.Events + return mgr +} + +func (m *Manager) AddOutbound(p *peer.Peer) error { + return m.addNeighbor(p, m.trans.DialPeer) +} + +func (m *Manager) AddInbound(p *peer.Peer) error { + return m.addNeighbor(p, m.trans.AcceptPeer) +} + +func (m *Manager) DropNeighbor(id peer.ID) { + m.deleteNeighbor(id) +} + +func (m *Manager) RequestTransaction(data []byte, to ...*neighbor.Neighbor) { + req := &pb.TransactionRequest{} + err := proto.Unmarshal(data, req) + if err != nil { + m.log.Warnw("Data to send is not a Transaction Request", "err", err) + } + msg := marshal(req) + + m.send(msg, to...) +} + +func (m *Manager) Send(data []byte, to ...*neighbor.Neighbor) { + tx := &pb.Transaction{} + err := proto.Unmarshal(data, tx) + if err != nil { + m.log.Warnw("Data to send is not a Transaction", "err", err) + } + msg := marshal(tx) + + m.send(msg, to...) +} + +func (m *Manager) send(msg []byte, to ...*neighbor.Neighbor) { + neighbors := m.neighborhood.GetSlice() + if to != nil { + neighbors = to + } + + for _, neighbor := range neighbors { + m.log.Debugw("Sending", "to", neighbor.Peer.ID().String(), "msg", msg) + err := neighbor.Conn.Write(msg) + if err != nil { + m.log.Debugw("send error", "err", err) + } + } +} + +func (m *Manager) addNeighbor(peer *peer.Peer, handshake func(*peer.Peer) (*transport.Connection, error)) error { + if _, ok := m.neighborhood.Load(peer.ID().String()); ok { + return errors.New("Neighbor already added") + } + + var err error + var conn *transport.Connection + i := 0 + for i = 0; i < maxAttempts; i++ { + conn, err = handshake(peer) + if err != nil { + m.log.Warnw("Connection attempt failed", "attempt", i+1) + } else { + break + } + } + if i == maxAttempts { + m.log.Warnw("Connection failed to", "peer", peer.ID().String()) + m.Events.DropNeighbor.Trigger(&DropNeighborEvent{Peer: peer}) + return err + } + + // add the new neighbor + neighbor := neighbor.New(peer, conn) + m.neighborhood.Store(peer.ID().String(), neighbor) + + // start listener for the new neighbor + go m.readLoop(neighbor) + + return nil +} + +func (m *Manager) deleteNeighbor(id peer.ID) { + m.log.Debugw("Deleting neighbor", "neighbor", id.String()) + + p, ok := m.neighborhood.Delete(id.String()) + if ok { + m.Events.DropNeighbor.Trigger(&DropNeighborEvent{Peer: p.Peer}) + } +} + +func (m *Manager) readLoop(neighbor *neighbor.Neighbor) { + for { + data, err := neighbor.Conn.Read() + if nerr, ok := err.(net.Error); ok && nerr.Temporary() { + // ignore temporary read errors. + //m.log.Debugw("temporary read error", "err", err) + continue + } else if err != nil { + // return from the loop on all other errors + m.log.Debugw("reading stopped") + m.deleteNeighbor(neighbor.Peer.ID()) + + return + } + if err := m.handlePacket(data, neighbor); err != nil { + m.log.Warnw("failed to handle packet", "from", neighbor.Peer.ID().String(), "err", err) + } + } +} + +func (m *Manager) handlePacket(data []byte, neighbor *neighbor.Neighbor) error { + switch pb.MType(data[0]) { + + // Incoming Transaction + case pb.MTransaction: + msg := new(pb.Transaction) + if err := proto.Unmarshal(data[1:], msg); err != nil { + return errors.Wrap(err, "invalid message") + } + m.log.Debugw("Received Transaction", "data", msg.GetBody()) + m.Events.NewTransaction.Trigger(&NewTransactionEvent{Body: msg.GetBody(), Peer: neighbor.Peer}) + + // Incoming Transaction request + case pb.MTransactionRequest: + msg := new(pb.TransactionRequest) + if err := proto.Unmarshal(data[1:], msg); err != nil { + return errors.Wrap(err, "invalid message") + } + m.log.Debugw("Received Tx Req", "data", msg.GetHash()) + // do something + tx, err := m.getTransaction(msg.GetHash()) + if err != nil { + m.log.Debugw("Tx not available", "tx", msg.GetHash()) + } else { + m.log.Debugw("Tx found", "tx", tx) + m.Send(tx, neighbor) + } + + default: + return nil + } + + return nil +} + +func marshal(msg pb.Message) []byte { + mType := msg.Type() + if mType > 0xFF { + panic("invalid message") + } + + data, err := proto.Marshal(msg) + if err != nil { + panic("invalid message") + } + return append([]byte{byte(mType)}, data...) +} diff --git a/packages/gossip/manager_test.go b/packages/gossip/manager_test.go new file mode 100644 index 0000000000000000000000000000000000000000..e037c22d9fd1abf1d370f4de081d6e41db5d69ec --- /dev/null +++ b/packages/gossip/manager_test.go @@ -0,0 +1,246 @@ +package gossip + +import ( + "log" + "sync" + "testing" + "time" + + pb "github.com/capossele/gossip/proto" + "github.com/capossele/gossip/transport" + "github.com/golang/protobuf/proto" + "github.com/iotaledger/autopeering-sim/peer" + "github.com/iotaledger/autopeering-sim/peer/service" + "github.com/iotaledger/hive.go/events" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +const graceTime = 5 * time.Millisecond + +var logger *zap.SugaredLogger + +func init() { + l, err := zap.NewDevelopment() + if err != nil { + log.Fatalf("cannot initialize logger: %v", err) + } + logger = l.Sugar() +} +func testGetTransaction([]byte) ([]byte, error) { + tx := &pb.TransactionRequest{ + Hash: []byte("testTx"), + } + b, _ := proto.Marshal(tx) + return b, nil +} + +func newTest(t require.TestingT, name string) (*Manager, func(), *peer.Peer) { + log := logger.Named(name) + db := peer.NewMemoryDB(log.Named("db")) + local, err := peer.NewLocal("peering", name, db) + require.NoError(t, err) + require.NoError(t, local.UpdateService(service.GossipKey, "tcp", "localhost:0")) + + trans, err := transport.Listen(local, log) + require.NoError(t, err) + + mgr := NewManager(trans, log, testGetTransaction) + + // update the service with the actual address + require.NoError(t, local.UpdateService(service.GossipKey, trans.LocalAddr().Network(), trans.LocalAddr().String())) + + teardown := func() { + trans.Close() + db.Close() + } + return mgr, teardown, &local.Peer +} + +func TestClose(t *testing.T) { + _, teardown, _ := newTest(t, "A") + teardown() +} + +func TestUnicast(t *testing.T) { + mgrA, closeA, peerA := newTest(t, "A") + defer closeA() + mgrB, closeB, peerB := newTest(t, "B") + defer closeB() + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + err := mgrA.addNeighbor(peerB, mgrA.trans.AcceptPeer) + assert.NoError(t, err) + }() + time.Sleep(graceTime) + go func() { + defer wg.Done() + err := mgrB.addNeighbor(peerA, mgrB.trans.DialPeer) + assert.NoError(t, err) + }() + + // wait for the connections to establish + wg.Wait() + + tx := &pb.Transaction{Body: []byte("Hello!")} + + triggered := make(chan struct{}, 1) + mgrB.Events.NewTransaction.Attach(events.NewClosure(func(ev *NewTransactionEvent) { + require.Empty(t, triggered) // only once + assert.Equal(t, tx.GetBody(), ev.Body) + assert.Equal(t, peerA, ev.Peer) + triggered <- struct{}{} + })) + + b, err := proto.Marshal(tx) + require.NoError(t, err) + mgrA.Send(b) + + // eventually the event should be triggered + assert.Eventually(t, func() bool { return len(triggered) >= 1 }, time.Second, 10*time.Millisecond) +} + +func TestBroadcast(t *testing.T) { + mgrA, closeA, peerA := newTest(t, "A") + defer closeA() + mgrB, closeB, peerB := newTest(t, "B") + defer closeB() + mgrC, closeC, peerC := newTest(t, "C") + defer closeC() + + var wg sync.WaitGroup + wg.Add(4) + + go func() { + defer wg.Done() + err := mgrA.addNeighbor(peerB, mgrA.trans.AcceptPeer) + assert.NoError(t, err) + }() + go func() { + defer wg.Done() + err := mgrA.addNeighbor(peerC, mgrA.trans.AcceptPeer) + assert.NoError(t, err) + }() + time.Sleep(graceTime) + go func() { + defer wg.Done() + err := mgrB.addNeighbor(peerA, mgrB.trans.DialPeer) + assert.NoError(t, err) + }() + go func() { + defer wg.Done() + err := mgrC.addNeighbor(peerA, mgrC.trans.DialPeer) + assert.NoError(t, err) + }() + + // wait for the connections to establish + wg.Wait() + + tx := &pb.Transaction{Body: []byte("Hello!")} + + triggeredB := make(chan struct{}, 1) + mgrB.Events.NewTransaction.Attach(events.NewClosure(func(ev *NewTransactionEvent) { + require.Empty(t, triggeredB) // only once + assert.Equal(t, tx.GetBody(), ev.Body) + assert.Equal(t, peerA, ev.Peer) + triggeredB <- struct{}{} + })) + + triggeredC := make(chan struct{}, 1) + mgrC.Events.NewTransaction.Attach(events.NewClosure(func(ev *NewTransactionEvent) { + require.Empty(t, triggeredC) // only once + assert.Equal(t, tx.GetBody(), ev.Body) + assert.Equal(t, peerA, ev.Peer) + triggeredC <- struct{}{} + })) + + b, err := proto.Marshal(tx) + assert.NoError(t, err) + mgrA.Send(b) + + // eventually the events should be triggered + success := func() bool { + return len(triggeredB) >= 1 && len(triggeredC) >= 1 + } + assert.Eventually(t, success, time.Second, 10*time.Millisecond) +} + +func TestDropUnsuccessfulAccept(t *testing.T) { + mgrA, closeA, _ := newTest(t, "A") + defer closeA() + _, closeB, peerB := newTest(t, "B") + defer closeB() + + triggered := make(chan struct{}, 1) + mgrA.Events.DropNeighbor.Attach(events.NewClosure(func(ev *DropNeighborEvent) { + require.Empty(t, triggered) // only once + assert.Equal(t, peerB, ev.Peer) + triggered <- struct{}{} + })) + + err := mgrA.addNeighbor(peerB, mgrA.trans.AcceptPeer) + assert.Error(t, err) + + // eventually the event should be triggered + assert.Eventually(t, func() bool { return len(triggered) >= 1 }, time.Second, 10*time.Millisecond) +} + +func TestTxRequest(t *testing.T) { + mgrA, closeA, peerA := newTest(t, "A") + defer closeA() + mgrB, closeB, peerB := newTest(t, "B") + defer closeB() + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + err := mgrA.addNeighbor(peerB, mgrA.trans.AcceptPeer) + assert.NoError(t, err) + logger.Debugw("Len", "len", mgrA.neighborhood.Len()) + }() + go func() { + defer wg.Done() + err := mgrB.addNeighbor(peerA, mgrB.trans.DialPeer) + assert.NoError(t, err) + logger.Debugw("Len", "len", mgrB.neighborhood.Len()) + }() + + wg.Wait() + + tx := &pb.TransactionRequest{ + Hash: []byte("Hello!"), + } + b, err := proto.Marshal(tx) + assert.NoError(t, err) + + sendChan := make(chan struct{}) + sendSuccess := false + + mgrA.Events.NewTransaction.Attach(events.NewClosure(func(ev *NewTransactionEvent) { + logger.Debugw("New TX Event triggered", "data", ev.Body, "from", ev.Peer.ID().String()) + assert.Equal(t, []byte("testTx"), ev.Body) + assert.Equal(t, peerB, ev.Peer) + sendChan <- struct{}{} + })) + + mgrA.RequestTransaction(b) + + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + + select { + case <-sendChan: + sendSuccess = true + case <-timer.C: + sendSuccess = false + } + + assert.True(t, sendSuccess) +} diff --git a/plugins/gossip/neighborMap.go b/packages/gossip/neighbor/neighbor.go similarity index 56% rename from plugins/gossip/neighborMap.go rename to packages/gossip/neighbor/neighbor.go index 9823c3550190aa57da702cc7a5e91caaf86be4af..41d2d25fbc3241a926299699761cda5f045b84f2 100644 --- a/plugins/gossip/neighborMap.go +++ b/packages/gossip/neighbor/neighbor.go @@ -1,24 +1,40 @@ -package gossip +package neighbor import ( "sync" + + "github.com/capossele/gossip/transport" + "github.com/iotaledger/autopeering-sim/peer" ) -// NeighborMap is the mapping of neighbor identifier and their neighbor struct -// It uses a mutex to handle concurrent access to its internal map +// Neighbor defines a neighbor +type Neighbor struct { + Peer *peer.Peer + Conn *transport.Connection +} + +// NeighborMap implements a map of neighbors thread safe type NeighborMap struct { sync.RWMutex internal map[string]*Neighbor } -// NewPeerMap returns a new PeerMap -func NewNeighborMap() *NeighborMap { +// NewMap returns a new NeighborMap +func NewMap() *NeighborMap { return &NeighborMap{ internal: make(map[string]*Neighbor), } } -// Len returns the number of peers stored in a PeerMap +// New returns a new Neighbor +func New(peer *peer.Peer, conn *transport.Connection) *Neighbor { + return &Neighbor{ + Peer: peer, + Conn: conn, + } +} + +// Len returns the number of neighbors stored in a NeighborMap func (nm *NeighborMap) Len() int { nm.RLock() defer nm.RUnlock() @@ -36,7 +52,7 @@ func (nm *NeighborMap) GetMap() map[string]*Neighbor { return newMap } -// GetMap returns the content of the entire internal map +// GetSlice returns a slice of the content of the entire internal map func (nm *NeighborMap) GetSlice() []*Neighbor { newSlice := make([]*Neighbor, nm.Len()) nm.RLock() @@ -49,9 +65,9 @@ func (nm *NeighborMap) GetSlice() []*Neighbor { return newSlice } -// Load returns the peer for a given key. +// Load returns the neighbor for a given key. // It also return a bool to communicate the presence of the given -// peer into the internal map +// neighbor into the internal map func (nm *NeighborMap) Load(key string) (value *Neighbor, ok bool) { nm.RLock() defer nm.RUnlock() @@ -60,18 +76,21 @@ func (nm *NeighborMap) Load(key string) (value *Neighbor, ok bool) { } // Delete removes the entire entry for a given key and return true if successful -func (nm *NeighborMap) Delete(key string) (deletedPeer *Neighbor, ok bool) { - deletedPeer, ok = nm.Load(key) +func (nm *NeighborMap) Delete(key string) (deletedNeighbor *Neighbor, ok bool) { + deletedNeighbor, ok = nm.Load(key) if !ok { return nil, false } nm.Lock() defer nm.Unlock() + if deletedNeighbor.Conn != nil { + deletedNeighbor.Conn.Close() + } delete(nm.internal, key) - return deletedPeer, true + return deletedNeighbor, true } -// Store adds a new peer to the PeerMap +// Store adds a new neighbor to the NeighborMap func (nm *NeighborMap) Store(key string, value *Neighbor) { nm.Lock() defer nm.Unlock() diff --git a/packages/gossip/proto/message.go b/packages/gossip/proto/message.go new file mode 100644 index 0000000000000000000000000000000000000000..f9404c1ebbdae75067d3e94615dcbab1ac57e20e --- /dev/null +++ b/packages/gossip/proto/message.go @@ -0,0 +1,30 @@ +package proto + +import ( + "github.com/golang/protobuf/proto" +) + +// MType is the type of message type enum. +type MType uint + +// An enum for the different message types. +const ( + MTransaction MType = 20 + iota + MTransactionRequest +) + +// Message extends the proto.Message interface with additional util functions. +type Message interface { + proto.Message + + // Name returns the name of the corresponding message type for debugging. + Name() string + // Type returns the type of the corresponding message as an enum. + Type() MType +} + +func (m *Transaction) Name() string { return "TRANSACTION" } +func (m *Transaction) Type() MType { return MTransaction } + +func (m *TransactionRequest) Name() string { return "TRANSACTION_REQUEST" } +func (m *TransactionRequest) Type() MType { return MTransactionRequest } diff --git a/packages/gossip/proto/message.pb.go b/packages/gossip/proto/message.pb.go new file mode 100644 index 0000000000000000000000000000000000000000..1e67ece10c370d5957078761427d411d983a93b6 --- /dev/null +++ b/packages/gossip/proto/message.pb.go @@ -0,0 +1,121 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: proto/message.proto + +package proto + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type Transaction struct { + // body of the tx + Body []byte `protobuf:"bytes,1,opt,name=body,proto3" json:"body,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Transaction) Reset() { *m = Transaction{} } +func (m *Transaction) String() string { return proto.CompactTextString(m) } +func (*Transaction) ProtoMessage() {} +func (*Transaction) Descriptor() ([]byte, []int) { + return fileDescriptor_33f3a5e1293a7bcd, []int{0} +} + +func (m *Transaction) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Transaction.Unmarshal(m, b) +} +func (m *Transaction) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Transaction.Marshal(b, m, deterministic) +} +func (m *Transaction) XXX_Merge(src proto.Message) { + xxx_messageInfo_Transaction.Merge(m, src) +} +func (m *Transaction) XXX_Size() int { + return xxx_messageInfo_Transaction.Size(m) +} +func (m *Transaction) XXX_DiscardUnknown() { + xxx_messageInfo_Transaction.DiscardUnknown(m) +} + +var xxx_messageInfo_Transaction proto.InternalMessageInfo + +func (m *Transaction) GetBody() []byte { + if m != nil { + return m.Body + } + return nil +} + +type TransactionRequest struct { + // transaction hash + Hash []byte `protobuf:"bytes,1,opt,name=hash,proto3" json:"hash,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *TransactionRequest) Reset() { *m = TransactionRequest{} } +func (m *TransactionRequest) String() string { return proto.CompactTextString(m) } +func (*TransactionRequest) ProtoMessage() {} +func (*TransactionRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_33f3a5e1293a7bcd, []int{1} +} + +func (m *TransactionRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_TransactionRequest.Unmarshal(m, b) +} +func (m *TransactionRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_TransactionRequest.Marshal(b, m, deterministic) +} +func (m *TransactionRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_TransactionRequest.Merge(m, src) +} +func (m *TransactionRequest) XXX_Size() int { + return xxx_messageInfo_TransactionRequest.Size(m) +} +func (m *TransactionRequest) XXX_DiscardUnknown() { + xxx_messageInfo_TransactionRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_TransactionRequest proto.InternalMessageInfo + +func (m *TransactionRequest) GetHash() []byte { + if m != nil { + return m.Hash + } + return nil +} + +func init() { + proto.RegisterType((*Transaction)(nil), "proto.Transaction") + proto.RegisterType((*TransactionRequest)(nil), "proto.TransactionRequest") +} + +func init() { proto.RegisterFile("proto/message.proto", fileDescriptor_33f3a5e1293a7bcd) } + +var fileDescriptor_33f3a5e1293a7bcd = []byte{ + // 139 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x2e, 0x28, 0xca, 0x2f, + 0xc9, 0xd7, 0xcf, 0x4d, 0x2d, 0x2e, 0x4e, 0x4c, 0x4f, 0xd5, 0x03, 0xf3, 0x84, 0x58, 0xc1, 0x94, + 0x92, 0x22, 0x17, 0x77, 0x48, 0x51, 0x62, 0x5e, 0x71, 0x62, 0x72, 0x49, 0x66, 0x7e, 0x9e, 0x90, + 0x10, 0x17, 0x4b, 0x52, 0x7e, 0x4a, 0xa5, 0x04, 0xa3, 0x02, 0xa3, 0x06, 0x4f, 0x10, 0x98, 0xad, + 0xa4, 0xc1, 0x25, 0x84, 0xa4, 0x24, 0x28, 0xb5, 0xb0, 0x34, 0xb5, 0xb8, 0x04, 0xa4, 0x32, 0x23, + 0xb1, 0x38, 0x03, 0xa6, 0x12, 0xc4, 0x76, 0x52, 0x8e, 0x52, 0x4c, 0xcf, 0x2c, 0xc9, 0x28, 0x4d, + 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x4f, 0x4e, 0x2c, 0xc8, 0x2f, 0x2e, 0x4e, 0xcd, 0x49, 0xd5, 0x4f, + 0xcf, 0x2f, 0x2e, 0xce, 0x2c, 0xd0, 0x07, 0xdb, 0x98, 0xc4, 0x06, 0xa6, 0x8c, 0x01, 0x01, 0x00, + 0x00, 0xff, 0xff, 0x34, 0x46, 0xa5, 0x0f, 0x96, 0x00, 0x00, 0x00, +} diff --git a/packages/gossip/proto/message.proto b/packages/gossip/proto/message.proto new file mode 100644 index 0000000000000000000000000000000000000000..b01b93680ce540aea64f26535cb269bd1435df90 --- /dev/null +++ b/packages/gossip/proto/message.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +option go_package = "github.com/capossele/gossip/proto"; + +package proto; + +message Transaction { + // body of the tx + bytes body = 1; +} + +message TransactionRequest { + // transaction hash + bytes hash = 1; +} \ No newline at end of file diff --git a/packages/gossip/transport/connection.go b/packages/gossip/transport/connection.go new file mode 100644 index 0000000000000000000000000000000000000000..f6886129735517fe451df9c6cf11d34fddeb3e94 --- /dev/null +++ b/packages/gossip/transport/connection.go @@ -0,0 +1,44 @@ +package transport + +import ( + "net" + + "github.com/iotaledger/autopeering-sim/peer" +) + +const ( + // MaxPacketSize specifies the maximum allowed size of packets. + // Packets larger than this will be cut and thus treated as invalid. + MaxPacketSize = 1280 +) + +type Connection struct { + peer *peer.Peer + conn net.Conn +} + +func newConnection(p *peer.Peer, c net.Conn) *Connection { + return &Connection{ + peer: p, + conn: c, + } +} + +func (c *Connection) Close() { + c.conn.Close() +} + +func (c *Connection) Read() ([]byte, error) { + b := make([]byte, MaxPacketSize) + n, err := c.conn.Read(b) + if err != nil { + return nil, err + } + + return b[:n], nil +} + +func (c *Connection) Write(b []byte) error { + _, err := c.conn.Write(b) + return err +} diff --git a/packages/gossip/transport/handshake.go b/packages/gossip/transport/handshake.go new file mode 100644 index 0000000000000000000000000000000000000000..c6e253ffda9b0a4a39f1637d854d6214c240a526 --- /dev/null +++ b/packages/gossip/transport/handshake.go @@ -0,0 +1,90 @@ +package transport + +import ( + "bytes" + "time" + + pb "github.com/capossele/gossip/transport/proto" + "github.com/golang/protobuf/proto" + "github.com/iotaledger/autopeering-sim/server" +) + +const ( + HandshakeExpiration = 20 * time.Second + VersionNum = 0 +) + +// isExpired checks whether the given UNIX time stamp is too far in the past. +func isExpired(ts int64) bool { + return time.Since(time.Unix(ts, 0)) >= HandshakeExpiration +} + +func newHandshakeRequest(fromAddr string, toAddr string) ([]byte, error) { + m := &pb.HandshakeRequest{ + Version: VersionNum, + From: fromAddr, + To: toAddr, + Timestamp: time.Now().Unix(), + } + return proto.Marshal(m) +} + +func newHandshakeResponse(reqData []byte) ([]byte, error) { + m := &pb.HandshakeResponse{ + ReqHash: server.PacketHash(reqData), + } + return proto.Marshal(m) +} + +func (t *TransportTCP) validateHandshakeRequest(reqData []byte, fromAddr string) bool { + m := new(pb.HandshakeRequest) + if err := proto.Unmarshal(reqData, m); err != nil { + t.log.Debugw("invalid handshake", + "err", err, + ) + return false + } + if m.GetVersion() != VersionNum { + t.log.Debugw("invalid handshake", + "version", m.GetVersion(), + ) + return false + } + if m.GetFrom() != fromAddr { + t.log.Debugw("invalid handshake", + "from", m.GetFrom(), + ) + return false + } + if m.GetTo() != t.LocalAddr().String() { + t.log.Debugw("invalid handshake", + "to", m.GetTo(), + ) + return false + } + if isExpired(m.GetTimestamp()) { + t.log.Debugw("invalid handshake", + "timestamp", time.Unix(m.GetTimestamp(), 0), + ) + } + + return true +} + +func (t *TransportTCP) validateHandshakeResponse(resData []byte, reqData []byte) bool { + m := new(pb.HandshakeResponse) + if err := proto.Unmarshal(resData, m); err != nil { + t.log.Debugw("invalid handshake", + "err", err, + ) + return false + } + if !bytes.Equal(m.GetReqHash(), server.PacketHash(reqData)) { + t.log.Debugw("invalid handshake", + "hash", m.GetReqHash(), + ) + return false + } + + return true +} diff --git a/packages/gossip/transport/proto/handshake.pb.go b/packages/gossip/transport/proto/handshake.pb.go new file mode 100644 index 0000000000000000000000000000000000000000..c7f368873432c4456bb4f7cd8376f023905c9bf1 --- /dev/null +++ b/packages/gossip/transport/proto/handshake.pb.go @@ -0,0 +1,152 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: transport/proto/handshake.proto + +package proto + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type HandshakeRequest struct { + // protocol version number + Version uint32 `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"` + // string form of the sender address (e.g. "192.0.2.1:25", "[2001:db8::1]:80") + From string `protobuf:"bytes,2,opt,name=from,proto3" json:"from,omitempty"` + // string form of the recipient address + To string `protobuf:"bytes,3,opt,name=to,proto3" json:"to,omitempty"` + // unix time + Timestamp int64 `protobuf:"varint,4,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *HandshakeRequest) Reset() { *m = HandshakeRequest{} } +func (m *HandshakeRequest) String() string { return proto.CompactTextString(m) } +func (*HandshakeRequest) ProtoMessage() {} +func (*HandshakeRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_d7101ffe19b05443, []int{0} +} + +func (m *HandshakeRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_HandshakeRequest.Unmarshal(m, b) +} +func (m *HandshakeRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_HandshakeRequest.Marshal(b, m, deterministic) +} +func (m *HandshakeRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_HandshakeRequest.Merge(m, src) +} +func (m *HandshakeRequest) XXX_Size() int { + return xxx_messageInfo_HandshakeRequest.Size(m) +} +func (m *HandshakeRequest) XXX_DiscardUnknown() { + xxx_messageInfo_HandshakeRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_HandshakeRequest proto.InternalMessageInfo + +func (m *HandshakeRequest) GetVersion() uint32 { + if m != nil { + return m.Version + } + return 0 +} + +func (m *HandshakeRequest) GetFrom() string { + if m != nil { + return m.From + } + return "" +} + +func (m *HandshakeRequest) GetTo() string { + if m != nil { + return m.To + } + return "" +} + +func (m *HandshakeRequest) GetTimestamp() int64 { + if m != nil { + return m.Timestamp + } + return 0 +} + +type HandshakeResponse struct { + // hash of the ping packet + ReqHash []byte `protobuf:"bytes,1,opt,name=req_hash,json=reqHash,proto3" json:"req_hash,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *HandshakeResponse) Reset() { *m = HandshakeResponse{} } +func (m *HandshakeResponse) String() string { return proto.CompactTextString(m) } +func (*HandshakeResponse) ProtoMessage() {} +func (*HandshakeResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_d7101ffe19b05443, []int{1} +} + +func (m *HandshakeResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_HandshakeResponse.Unmarshal(m, b) +} +func (m *HandshakeResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_HandshakeResponse.Marshal(b, m, deterministic) +} +func (m *HandshakeResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_HandshakeResponse.Merge(m, src) +} +func (m *HandshakeResponse) XXX_Size() int { + return xxx_messageInfo_HandshakeResponse.Size(m) +} +func (m *HandshakeResponse) XXX_DiscardUnknown() { + xxx_messageInfo_HandshakeResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_HandshakeResponse proto.InternalMessageInfo + +func (m *HandshakeResponse) GetReqHash() []byte { + if m != nil { + return m.ReqHash + } + return nil +} + +func init() { + proto.RegisterType((*HandshakeRequest)(nil), "proto.HandshakeRequest") + proto.RegisterType((*HandshakeResponse)(nil), "proto.HandshakeResponse") +} + +func init() { proto.RegisterFile("transport/proto/handshake.proto", fileDescriptor_d7101ffe19b05443) } + +var fileDescriptor_d7101ffe19b05443 = []byte{ + // 203 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x5c, 0x8f, 0x3f, 0x4b, 0x04, 0x31, + 0x10, 0xc5, 0xb9, 0x3f, 0x7a, 0xde, 0xa0, 0xa2, 0xa9, 0x22, 0x08, 0xca, 0x55, 0x82, 0xb8, 0x29, + 0xfc, 0x06, 0x56, 0x57, 0xa7, 0xb4, 0x91, 0xec, 0x3a, 0x6e, 0x82, 0x26, 0x93, 0xcd, 0x64, 0xfd, + 0xfc, 0x0e, 0x81, 0x45, 0xb1, 0x7a, 0xef, 0xf7, 0x0b, 0xe4, 0x25, 0x70, 0x57, 0x8b, 0x4b, 0x9c, + 0xa9, 0x54, 0x93, 0x0b, 0x55, 0x32, 0xde, 0xa5, 0x77, 0xf6, 0xee, 0x13, 0xbb, 0xc6, 0xea, 0xa4, + 0xc5, 0x21, 0xc1, 0xd5, 0x71, 0x39, 0xb1, 0x38, 0xcd, 0xc8, 0x55, 0x69, 0xd8, 0x7d, 0x63, 0xe1, + 0x40, 0x49, 0xaf, 0xee, 0x57, 0x0f, 0x17, 0x76, 0x41, 0xa5, 0x60, 0xfb, 0x51, 0x28, 0xea, 0xb5, + 0xe8, 0xbd, 0x6d, 0x5d, 0x5d, 0xc2, 0xba, 0x92, 0xde, 0x34, 0x23, 0x4d, 0xdd, 0xc2, 0xbe, 0x86, + 0x28, 0xf7, 0xb8, 0x98, 0xf5, 0x56, 0xf4, 0xc6, 0xfe, 0x8a, 0x43, 0x07, 0xd7, 0x7f, 0xf6, 0xe4, + 0x81, 0x89, 0x51, 0xdd, 0xc0, 0x59, 0xc1, 0xe9, 0xcd, 0x3b, 0xf6, 0x6d, 0xf1, 0xdc, 0xee, 0x84, + 0x8f, 0x82, 0x2f, 0x4f, 0xaf, 0x8f, 0x63, 0xa8, 0x7e, 0xee, 0xbb, 0x81, 0xa2, 0x19, 0x5c, 0x26, + 0x66, 0xfc, 0x42, 0x33, 0x4a, 0x86, 0x6c, 0xfe, 0xfd, 0xb2, 0x3f, 0x6d, 0xf1, 0xfc, 0x13, 0x00, + 0x00, 0xff, 0xff, 0x51, 0xe0, 0x08, 0xd0, 0xff, 0x00, 0x00, 0x00, +} diff --git a/packages/gossip/transport/proto/handshake.proto b/packages/gossip/transport/proto/handshake.proto new file mode 100644 index 0000000000000000000000000000000000000000..2f9c628169cb57daa324e5ff16fa9510d6990950 --- /dev/null +++ b/packages/gossip/transport/proto/handshake.proto @@ -0,0 +1,21 @@ +syntax = "proto3"; + +option go_package = "github.com/capossele/gossip/transport/proto"; + +package proto; + +message HandshakeRequest { + // protocol version number + uint32 version = 1; + // string form of the sender address (e.g. "192.0.2.1:25", "[2001:db8::1]:80") + string from = 2; + // string form of the recipient address + string to = 3; + // unix time + int64 timestamp = 4; +} + +message HandshakeResponse { + // hash of the ping packet + bytes req_hash = 1; +} \ No newline at end of file diff --git a/packages/gossip/transport/transport.go b/packages/gossip/transport/transport.go new file mode 100644 index 0000000000000000000000000000000000000000..f9ae8101dbb42d4f8f77ee559a7100dade21e0e1 --- /dev/null +++ b/packages/gossip/transport/transport.go @@ -0,0 +1,386 @@ +package transport + +import ( + "bytes" + "container/list" + "errors" + "net" + "sync" + "time" + + "github.com/golang/protobuf/proto" + "github.com/iotaledger/autopeering-sim/peer" + "github.com/iotaledger/autopeering-sim/peer/service" + pb "github.com/iotaledger/autopeering-sim/server/proto" + "go.uber.org/zap" +) + +var ( + ErrTimeout = errors.New("accept timeout") + ErrClosed = errors.New("listener closed") + ErrInvalidHandshake = errors.New("invalid handshake") + ErrNoGossip = errors.New("peer does not have a gossip service") +) + +// connection timeouts +const ( + acceptTimeout = 500 * time.Millisecond + handshakeTimeout = 100 * time.Millisecond + connectionTimeout = acceptTimeout + handshakeTimeout +) + +type TransportTCP struct { + local *peer.Local + listener *net.TCPListener + log *zap.SugaredLogger + + addAcceptMatcher chan *acceptMatcher + acceptReceived chan accept + + closeOnce sync.Once + wg sync.WaitGroup + closing chan struct{} // if this channel gets closed all pending waits should terminate +} + +// connect contains the result of an incoming connection. +type connect struct { + c *Connection + err error +} + +type acceptMatcher struct { + peer *peer.Peer // connecting peer + deadline time.Time // deadline for the incoming call + connected chan connect // result of the connection is signaled here +} + +type accept struct { + fromID peer.ID // ID of the connecting peer + req []byte // raw data of the handshake request + conn net.Conn // the actual network connection +} + +func Listen(local *peer.Local, log *zap.SugaredLogger) (*TransportTCP, error) { + t := &TransportTCP{ + local: local, + log: log, + addAcceptMatcher: make(chan *acceptMatcher), + acceptReceived: make(chan accept), + closing: make(chan struct{}), + } + + gossipAddr := local.Services().Get(service.GossipKey) + if gossipAddr == nil { + return nil, ErrNoGossip + } + tcpAddr, err := net.ResolveTCPAddr(gossipAddr.Network(), gossipAddr.String()) + if err != nil { + return nil, err + } + listener, err := net.ListenTCP(gossipAddr.Network(), tcpAddr) + if err != nil { + return nil, err + } + t.listener = listener + + t.wg.Add(2) + go t.run() + go t.listenLoop() + + return t, nil +} + +// Close stops listening on the gossip address. +func (t *TransportTCP) Close() { + t.closeOnce.Do(func() { + close(t.closing) + if err := t.listener.Close(); err != nil { + t.log.Warnw("close error", "err", err) + } + t.wg.Wait() + }) +} + +// LocalAddr returns the listener's network address, +func (t *TransportTCP) LocalAddr() net.Addr { + return t.listener.Addr() +} + +// DialPeer establishes a gossip connection to the given peer. +// If the peer does not accept the connection or the handshake fails, an error is returned. +func (t *TransportTCP) DialPeer(p *peer.Peer) (*Connection, error) { + gossipAddr := p.Services().Get(service.GossipKey) + if gossipAddr == nil { + return nil, ErrNoGossip + } + + conn, err := net.DialTimeout(gossipAddr.Network(), gossipAddr.String(), acceptTimeout) + if err != nil { + return nil, err + } + + err = t.doHandshake(p.PublicKey(), gossipAddr.String(), conn) + if err != nil { + return nil, err + } + + t.log.Debugw("connected", "id", p.ID(), "addr", conn.RemoteAddr(), "direction", "out") + return newConnection(p, conn), nil +} + +// AcceptPeer awaits an incoming connection from the given peer. +// If the peer does not establish the connection or the handshake fails, an error is returned. +func (t *TransportTCP) AcceptPeer(p *peer.Peer) (*Connection, error) { + if p.Services().Get(service.GossipKey) == nil { + return nil, ErrNoGossip + } + // wait for the connection + connected := <-t.acceptPeer(p) + if connected.err != nil { + return nil, connected.err + } + t.log.Debugw("connected", "id", p.ID(), "addr", connected.c.conn.RemoteAddr(), "direction", "in") + return connected.c, nil +} + +func (t *TransportTCP) acceptPeer(p *peer.Peer) <-chan connect { + connected := make(chan connect, 1) + // add the matcher + select { + case t.addAcceptMatcher <- &acceptMatcher{peer: p, connected: connected}: + case <-t.closing: + connected <- connect{nil, ErrClosed} + } + return connected +} + +func (t *TransportTCP) closeConnection(c net.Conn) { + if err := c.Close(); err != nil { + t.log.Warnw("close error", "err", err) + } +} + +func (t *TransportTCP) run() { + defer t.wg.Done() + + var ( + mlist = list.New() + timeout = time.NewTimer(0) + ) + defer timeout.Stop() + + <-timeout.C // ignore first timeout + + for { + + // Set the timer so that it fires when the next accept expires + if el := mlist.Front(); el != nil { + // the first element always has the closest deadline + m := el.Value.(*acceptMatcher) + timeout.Reset(time.Until(m.deadline)) + } else { + timeout.Stop() + } + + select { + + // add a new matcher to the list + case m := <-t.addAcceptMatcher: + m.deadline = time.Now().Add(connectionTimeout) + mlist.PushBack(m) + + // on accept received, check all matchers for a fit + case a := <-t.acceptReceived: + matched := false + for el := mlist.Front(); el != nil; el = el.Next() { + m := el.Value.(*acceptMatcher) + if m.peer.ID() == a.fromID { + matched = true + mlist.Remove(el) + // finish the handshake + go t.matchAccept(m, a.req, a.conn) + } + } + // close the connection if not matched + if !matched { + t.log.Debugw("unexpected connection", "id", a.fromID, "addr", a.conn.RemoteAddr()) + t.closeConnection(a.conn) + } + + // on timeout, check for expired matchers + case <-timeout.C: + now := time.Now() + + // notify and remove any expired matchers + for el := mlist.Front(); el != nil; el = el.Next() { + m := el.Value.(*acceptMatcher) + if now.After(m.deadline) || now.Equal(m.deadline) { + m.connected <- connect{nil, ErrTimeout} + mlist.Remove(el) + } + } + + // on close, notify all the matchers + case <-t.closing: + for el := mlist.Front(); el != nil; el = el.Next() { + el.Value.(*acceptMatcher).connected <- connect{nil, ErrClosed} + } + return + + } + } +} + +func (t *TransportTCP) matchAccept(m *acceptMatcher, req []byte, conn net.Conn) { + t.wg.Add(1) + defer t.wg.Done() + + if err := t.writeHandshakeResponse(req, conn); err != nil { + t.log.Warnw("failed handshake", "addr", conn.RemoteAddr(), "err", err) + m.connected <- connect{nil, err} + t.closeConnection(conn) + return + } + m.connected <- connect{newConnection(m.peer, conn), nil} +} + +func (t *TransportTCP) listenLoop() { + defer t.wg.Done() + + for { + conn, err := t.listener.AcceptTCP() + if err, ok := err.(net.Error); ok && err.Temporary() { + t.log.Debugw("temporary read error", "err", err) + continue + } else if err != nil { + // return from the loop on all other errors + t.log.Warnw("read error", "err", err) + return + } + + key, req, err := t.readHandshakeRequest(conn) + if err != nil { + t.log.Warnw("failed handshake", "addr", conn.RemoteAddr(), "err", err) + t.closeConnection(conn) + continue + } + + select { + case t.acceptReceived <- accept{ + fromID: key.ID(), + req: req, + conn: conn, + }: + case <-t.closing: + t.closeConnection(conn) + return + } + } +} + +func (t *TransportTCP) doHandshake(key peer.PublicKey, remoteAddr string, conn net.Conn) error { + reqData, err := newHandshakeRequest(conn.LocalAddr().String(), remoteAddr) + if err != nil { + return err + } + + pkt := &pb.Packet{ + PublicKey: t.local.PublicKey(), + Signature: t.local.Sign(reqData), + Data: reqData, + } + b, err := proto.Marshal(pkt) + if err != nil { + return err + } + + if err := conn.SetWriteDeadline(time.Now().Add(handshakeTimeout)); err != nil { + return err + } + _, err = conn.Write(b) + if err != nil { + return err + } + + if err := conn.SetReadDeadline(time.Now().Add(handshakeTimeout)); err != nil { + return err + } + b = make([]byte, MaxPacketSize) + n, err := conn.Read(b) + if err != nil { + return err + } + + pkt = new(pb.Packet) + if err := proto.Unmarshal(b[:n], pkt); err != nil { + return err + } + + signer, err := peer.RecoverKeyFromSignedData(pkt) + if err != nil { + return err + } + if !bytes.Equal(key, signer) { + return errors.New("invalid key") + } + + if !t.validateHandshakeResponse(pkt.GetData(), reqData) { + return ErrInvalidHandshake + } + + return nil +} + +func (t *TransportTCP) readHandshakeRequest(conn net.Conn) (peer.PublicKey, []byte, error) { + if err := conn.SetReadDeadline(time.Now().Add(handshakeTimeout)); err != nil { + return nil, nil, err + } + b := make([]byte, MaxPacketSize) + n, err := conn.Read(b) + if err != nil { + return nil, nil, err + } + + pkt := new(pb.Packet) + if err := proto.Unmarshal(b[:n], pkt); err != nil { + return nil, nil, err + } + + key, err := peer.RecoverKeyFromSignedData(pkt) + if err != nil { + return nil, nil, err + } + + if !t.validateHandshakeRequest(pkt.GetData(), conn.RemoteAddr().String()) { + return nil, nil, ErrInvalidHandshake + } + + return key, pkt.GetData(), nil +} + +func (t *TransportTCP) writeHandshakeResponse(reqData []byte, conn net.Conn) error { + data, err := newHandshakeResponse(reqData) + if err != nil { + return err + } + + pkt := &pb.Packet{ + PublicKey: t.local.PublicKey(), + Signature: t.local.Sign(data), + Data: data, + } + b, err := proto.Marshal(pkt) + if err != nil { + return err + } + + if err := conn.SetWriteDeadline(time.Now().Add(handshakeTimeout)); err != nil { + return err + } + _, err = conn.Write(b) + if err != nil { + return err + } + + return nil +} diff --git a/packages/gossip/transport/transport_test.go b/packages/gossip/transport/transport_test.go new file mode 100644 index 0000000000000000000000000000000000000000..c0b670476d80b8e49a946474c08a22e7dd2a7ed6 --- /dev/null +++ b/packages/gossip/transport/transport_test.go @@ -0,0 +1,197 @@ +package transport + +import ( + "log" + "net" + "sync" + "testing" + "time" + + "github.com/iotaledger/autopeering-sim/peer" + "github.com/iotaledger/autopeering-sim/peer/service" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +const graceTime = 5 * time.Millisecond + +var logger *zap.SugaredLogger + +func init() { + l, err := zap.NewDevelopment() + if err != nil { + log.Fatalf("cannot initialize logger: %v", err) + } + logger = l.Sugar() +} + +func newTest(t require.TestingT, name string) (*TransportTCP, func()) { + l := logger.Named(name) + db := peer.NewMemoryDB(l.Named("db")) + local, err := peer.NewLocal("peering", name, db) + require.NoError(t, err) + + // enable TCP gossipping + require.NoError(t, local.UpdateService(service.GossipKey, "tcp", ":0")) + + trans, err := Listen(local, l) + require.NoError(t, err) + + // update the service with the actual address + require.NoError(t, local.UpdateService(service.GossipKey, trans.LocalAddr().Network(), trans.LocalAddr().String())) + + teardown := func() { + trans.Close() + db.Close() + } + return trans, teardown +} + +func getPeer(t *TransportTCP) *peer.Peer { + return &t.local.Peer +} + +func TestClose(t *testing.T) { + _, teardown := newTest(t, "A") + teardown() +} + +func TestUnansweredAccept(t *testing.T) { + transA, closeA := newTest(t, "A") + defer closeA() + + _, err := transA.AcceptPeer(getPeer(transA)) + assert.Error(t, err) +} + +func TestCloseWhileAccepting(t *testing.T) { + transA, closeA := newTest(t, "A") + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _, err := transA.AcceptPeer(getPeer(transA)) + assert.Error(t, err) + }() + time.Sleep(graceTime) + + closeA() + wg.Wait() +} + +func TestUnansweredDial(t *testing.T) { + transA, closeA := newTest(t, "A") + defer closeA() + + // create peer with invalid gossip address + services := getPeer(transA).Services().CreateRecord() + services.Update(service.GossipKey, "tcp", ":0") + unreachablePeer := peer.NewPeer(getPeer(transA).PublicKey(), services) + + _, err := transA.DialPeer(unreachablePeer) + assert.Error(t, err) +} + +func TestNoHandshakeResponse(t *testing.T) { + transA, closeA := newTest(t, "A") + defer closeA() + + // accept and read incoming connections + lis, err := net.Listen("tcp", ":0") + require.NoError(t, err) + go func() { + conn, err := lis.Accept() + require.NoError(t, err) + n, _ := conn.Read(make([]byte, MaxPacketSize)) + assert.NotZero(t, n) + _ = conn.Close() + _ = lis.Close() + }() + + // create peer for the listener + services := getPeer(transA).Services().CreateRecord() + services.Update(service.GossipKey, lis.Addr().Network(), lis.Addr().String()) + p := peer.NewPeer(getPeer(transA).PublicKey(), services) + + _, err = transA.DialPeer(p) + assert.Error(t, err) +} + +func TestNoHandshakeRequest(t *testing.T) { + transA, closeA := newTest(t, "A") + defer closeA() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _, err := transA.AcceptPeer(getPeer(transA)) + assert.Error(t, err) + }() + time.Sleep(graceTime) + + conn, err := net.Dial(transA.LocalAddr().Network(), transA.LocalAddr().String()) + require.NoError(t, err) + time.Sleep(handshakeTimeout) + _ = conn.Close() + + wg.Wait() +} + +func TestConnect(t *testing.T) { + transA, closeA := newTest(t, "A") + defer closeA() + transB, closeB := newTest(t, "B") + defer closeB() + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + c, err := transA.AcceptPeer(getPeer(transB)) + assert.NoError(t, err) + if assert.NotNil(t, c) { + c.Close() + } + }() + time.Sleep(graceTime) + go func() { + defer wg.Done() + c, err := transB.DialPeer(getPeer(transA)) + assert.NoError(t, err) + if assert.NotNil(t, c) { + c.Close() + } + }() + + wg.Wait() +} + +func TestWrongConnect(t *testing.T) { + transA, closeA := newTest(t, "A") + defer closeA() + transB, closeB := newTest(t, "B") + defer closeB() + transC, closeC := newTest(t, "C") + defer closeC() + + var wg sync.WaitGroup + wg.Add(2) + + // a expects connection from B, but C is connecting + go func() { + defer wg.Done() + _, err := transA.AcceptPeer(getPeer(transB)) + assert.Error(t, err) + }() + go func() { + defer wg.Done() + _, err := transC.DialPeer(getPeer(transA)) + assert.Error(t, err) + }() + + wg.Wait() +} diff --git a/plugins/autopeering/plugin.go b/plugins/autopeering/plugin.go index 3ebd4a00045d09297372d39ba435f4c5cff61316..f406622fed6da867fc84aa7a20d6b50cfb379f2c 100644 --- a/plugins/autopeering/plugin.go +++ b/plugins/autopeering/plugin.go @@ -1,10 +1,8 @@ package autopeering import ( - "net" - "github.com/iotaledger/autopeering-sim/discover" - "github.com/iotaledger/autopeering-sim/selection" + "github.com/iotaledger/goshimmer/packages/gossip/neighbor" "github.com/iotaledger/goshimmer/plugins/gossip" "github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/events" @@ -28,26 +26,26 @@ func run(plugin *node.Plugin) { } func configureLogging(plugin *node.Plugin) { - gossip.Events.RemoveNeighbor.Attach(events.NewClosure(func(peer *gossip.Neighbor) { + gossip.Events.DropNeighbor.Attach(events.NewClosure(func(peer *neighbor.Neighbor) { Selection.DropPeer(peer.Peer) })) - selection.Events.Dropped.Attach(events.NewClosure(func(ev *selection.DroppedEvent) { - log.Debug("neighbor removed: " + ev.DroppedID.String()) - gossip.RemoveNeighbor(ev.DroppedID.String()) - })) - - selection.Events.IncomingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) { - log.Debug("accepted neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String()) - address, port, _ := net.SplitHostPort(ev.Services["gossip"].Address) - gossip.AddNeighbor(gossip.NewNeighbor(ev.Peer, address, port)) - })) - - selection.Events.OutgoingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) { - log.Debug("chosen neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String()) - address, port, _ := net.SplitHostPort(ev.Services["gossip"].Address) - gossip.AddNeighbor(gossip.NewNeighbor(ev.Peer, address, port)) - })) + // selection.Events.Dropped.Attach(events.NewClosure(func(ev *selection.DroppedEvent) { + // log.Debug("neighbor removed: " + ev.DroppedID.String()) + // gossip.RemoveNeighbor(ev.DroppedID.String()) + // })) + + // selection.Events.IncomingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) { + // log.Debug("accepted neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String()) + // address, port, _ := net.SplitHostPort(ev.Services["gossip"].Address) + // gossip.AddNeighbor(gossip.NewNeighbor(ev.Peer, address, port)) + // })) + + // selection.Events.OutgoingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) { + // log.Debug("chosen neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String()) + // address, port, _ := net.SplitHostPort(ev.Services["gossip"].Address) + // gossip.AddNeighbor(gossip.NewNeighbor(ev.Peer, address, port)) + // })) discover.Events.PeerDiscovered.Attach(events.NewClosure(func(ev *discover.DiscoveredEvent) { log.Info("new peer discovered: " + ev.Peer.Address() + " / " + ev.Peer.ID().String()) diff --git a/plugins/gossip-on-solidification/plugin.go b/plugins/gossip-on-solidification/plugin.go deleted file mode 100644 index 42b09256badcd7c2abf4e7430c743c095a2f6c2d..0000000000000000000000000000000000000000 --- a/plugins/gossip-on-solidification/plugin.go +++ /dev/null @@ -1,15 +0,0 @@ -package gossip_on_solidification - -import ( - "github.com/iotaledger/goshimmer/packages/model/value_transaction" - "github.com/iotaledger/goshimmer/plugins/gossip" - "github.com/iotaledger/goshimmer/plugins/tangle" - "github.com/iotaledger/hive.go/events" - "github.com/iotaledger/hive.go/node" -) - -var PLUGIN = node.NewPlugin("Gossip On Solidification", node.Enabled, func(plugin *node.Plugin) { - tangle.Events.TransactionSolid.Attach(events.NewClosure(func(tx *value_transaction.ValueTransaction) { - gossip.SendTransaction(tx.MetaTransaction) - })) -}) diff --git a/plugins/gossip/errors.go b/plugins/gossip/errors.go deleted file mode 100644 index 8da04cfa7c762c9c7697152c44abcb46bd2f9947..0000000000000000000000000000000000000000 --- a/plugins/gossip/errors.go +++ /dev/null @@ -1,12 +0,0 @@ -package gossip - -import "github.com/iotaledger/goshimmer/packages/errors" - -var ( - ErrConnectionFailed = errors.Wrap(errors.New("connection error"), "could not connect to neighbor") - ErrInvalidAuthenticationMessage = errors.Wrap(errors.New("protocol error"), "invalid authentication message") - ErrInvalidIdentity = errors.Wrap(errors.New("protocol error"), "invalid identity message") - ErrInvalidStateTransition = errors.New("protocol error: invalid state transition message") - ErrSendFailed = errors.Wrap(errors.New("protocol error"), "failed to send message") - ErrInvalidSendParam = errors.New("invalid parameter passed to send") -) diff --git a/plugins/gossip/events.go b/plugins/gossip/events.go deleted file mode 100644 index 4bcb484a1bcdcfc158222e798e5e00fd3ed85223..0000000000000000000000000000000000000000 --- a/plugins/gossip/events.go +++ /dev/null @@ -1,97 +0,0 @@ -package gossip - -import ( - "github.com/iotaledger/goshimmer/packages/errors" - "github.com/iotaledger/goshimmer/packages/identity" - "github.com/iotaledger/goshimmer/packages/model/meta_transaction" - "github.com/iotaledger/goshimmer/packages/network" - "github.com/iotaledger/hive.go/events" -) - -var Events = pluginEvents{ - // neighbor events - AddNeighbor: events.NewEvent(neighborCaller), - UpdateNeighbor: events.NewEvent(neighborCaller), - RemoveNeighbor: events.NewEvent(neighborCaller), - - // low level network events - IncomingConnection: events.NewEvent(connectionCaller), - - // high level protocol events - DropNeighbor: events.NewEvent(neighborCaller), - SendTransaction: events.NewEvent(transactionCaller), - SendTransactionRequest: events.NewEvent(transactionCaller), // TODO - ReceiveTransaction: events.NewEvent(transactionCaller), - ReceiveTransactionRequest: events.NewEvent(transactionCaller), // TODO - ProtocolError: events.NewEvent(transactionCaller), // TODO - - // generic events - Error: events.NewEvent(errorCaller), -} - -type pluginEvents struct { - // neighbor events - AddNeighbor *events.Event - UpdateNeighbor *events.Event - RemoveNeighbor *events.Event - - // low level network events - IncomingConnection *events.Event - - // high level protocol events - DropNeighbor *events.Event - SendTransaction *events.Event - SendTransactionRequest *events.Event - ReceiveTransaction *events.Event - ReceiveTransactionRequest *events.Event - ProtocolError *events.Event - - // generic events - Error *events.Event -} - -type protocolEvents struct { - ReceiveVersion *events.Event - ReceiveIdentification *events.Event - ReceiveConnectionAccepted *events.Event - ReceiveConnectionRejected *events.Event - ReceiveDropConnection *events.Event - ReceiveTransactionData *events.Event - ReceiveRequestData *events.Event - HandshakeCompleted *events.Event - Error *events.Event -} - -type neighborEvents struct { - ProtocolConnectionEstablished *events.Event -} - -func intCaller(handler interface{}, params ...interface{}) { handler.(func(int))(params[0].(int)) } - -func identityCaller(handler interface{}, params ...interface{}) { - handler.(func(*identity.Identity))(params[0].(*identity.Identity)) -} - -func connectionCaller(handler interface{}, params ...interface{}) { - handler.(func(*network.ManagedConnection))(params[0].(*network.ManagedConnection)) -} - -func protocolCaller(handler interface{}, params ...interface{}) { - handler.(func(*protocol))(params[0].(*protocol)) -} - -func neighborCaller(handler interface{}, params ...interface{}) { - handler.(func(*Neighbor))(params[0].(*Neighbor)) -} - -func errorCaller(handler interface{}, params ...interface{}) { - handler.(func(errors.IdentifiableError))(params[0].(errors.IdentifiableError)) -} - -func dataCaller(handler interface{}, params ...interface{}) { - handler.(func([]byte))(params[0].([]byte)) -} - -func transactionCaller(handler interface{}, params ...interface{}) { - handler.(func(*meta_transaction.MetaTransaction))(params[0].(*meta_transaction.MetaTransaction)) -} diff --git a/plugins/gossip/gossip.go b/plugins/gossip/gossip.go new file mode 100644 index 0000000000000000000000000000000000000000..0ca491a83656bce9d8084bcdb5226d6c53a13a66 --- /dev/null +++ b/plugins/gossip/gossip.go @@ -0,0 +1,86 @@ +package gossip + +import ( + "github.com/iotaledger/goshimmer/packages/gossip/transport" + "github.com/iotaledger/goshimmer/packages/model/meta_transaction" + gp "github.com/iotaledger/goshimmer/packages/gossip" + pb "github.com/iotaledger/goshimmer/packages/gossip/proto" + "github.com/iotaledger/goshimmer/plugins/autopeering/local" + "github.com/iotaledger/autopeering-sim/selection" + "github.com/iotaledger/goshimmer/plugins/tangle" + "go.uber.org/zap" + "github.com/golang/protobuf/proto" + "github.com/iotaledger/hive.go/events" +) + +var ( + zLogger *zap.SugaredLogger + mgr *gp.Manager + SendTransaction = mgr.Send + RequestTransaction = mgr.RequestTransaction + AddInbound = mgr.AddInbound + AddOutbound = mgr.AddOutbound + DropNeighbor = mgr.DropNeighbor +) + +func init() { + l, err := zap.NewDevelopment() + if err != nil { + log.Fatalf("cannot initialize logger: %v", err) + } + zLogger = l.Sugar() +} + +func getTransaction(h []byte) ([]byte, error) { + tx := &pb.TransactionRequest{ + Hash: []byte("testTx"), + } + b, _ := proto.Marshal(tx) + return b, nil +} + +func configureGossip() { + defer func() { _ = zLogger.Sync() }() // ignore the returned error + + trans, err := transport.Listen(local.INSTANCE, zLogger) + if err != nil { + // TODO: handle error + } + + mgr = gp.NewManager(trans, zLogger, getTransaction) +} + +func configureEvents() { + + selection.Events.Dropped.Attach(events.NewClosure(func(ev *selection.DroppedEvent) { + log.Debug("neighbor removed: " + ev.DroppedID.String()) + DropNeighbor(ev.DroppedID) + })) + + selection.Events.IncomingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) { + log.Debug("accepted neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String()) + AddInbound(ev.Peer) + })) + + selection.Events.OutgoingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) { + log.Debug("chosen neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String()) + AddOutbound(ev.Peer) + })) + + // mgr.Events.NewTransaction.Attach(events.NewClosure(func(ev *gp.NewTransactionEvent) { + // tx := ev.Body + // metaTx := meta_transaction.FromBytes(tx) + // Events.NewTransaction.Trigger(metaTx) + // })) + + tangle.Events.TransactionSolid.Attach(events.NewClosure(func(tx *meta_transaction.MetaTransaction) { + t := &pb.Transaction{ + Body: tx.GetBytes(), + } + b, err := proto.Marshal(t) + if err != nil { + return + } + SendTransaction(b) + })) +} diff --git a/plugins/gossip/neighbors.go b/plugins/gossip/neighbors.go deleted file mode 100644 index 2acecf294e50fc7c31af4904a0c62c875f371e92..0000000000000000000000000000000000000000 --- a/plugins/gossip/neighbors.go +++ /dev/null @@ -1,286 +0,0 @@ -package gossip - -import ( - "math" - "net" - "sync" - "time" - - "github.com/iotaledger/autopeering-sim/peer" - "github.com/iotaledger/goshimmer/packages/errors" - "github.com/iotaledger/goshimmer/packages/identity" - "github.com/iotaledger/goshimmer/packages/network" - "github.com/iotaledger/goshimmer/plugins/autopeering/local" - "github.com/iotaledger/hive.go/daemon" - "github.com/iotaledger/hive.go/events" - "github.com/iotaledger/hive.go/node" -) - -func configureNeighbors(plugin *node.Plugin) { - Events.AddNeighbor.Attach(events.NewClosure(func(neighbor *Neighbor) { - log.Info("new neighbor added " + neighbor.GetIdentity().StringIdentifier + "@" + neighbor.GetAddress().String() + ":" + neighbor.GetPort()) - //plugin.LogSuccess("new neighbor added " + hex.EncodeToString(neighbor.Peer.ID().Bytes()) + "@" + neighbor.GetAddress().String() + ":" + neighbor.GetPort()) - })) - - Events.UpdateNeighbor.Attach(events.NewClosure(func(neighbor *Neighbor) { - log.Info("existing neighbor updated " + neighbor.GetIdentity().StringIdentifier + "@" + neighbor.GetAddress().String() + ":" + neighbor.GetPort()) - })) - - Events.RemoveNeighbor.Attach(events.NewClosure(func(neighbor *Neighbor) { - log.Info("existing neighbor removed " + neighbor.GetIdentity().StringIdentifier + "@" + neighbor.GetAddress().String() + ":" + neighbor.GetPort()) - })) -} - -func runNeighbors(plugin *node.Plugin) { - log.Info("Starting Neighbor Connection Manager ...") - - neighborLock.RLock() - for _, neighbor := range neighbors.GetMap() { - manageConnection(plugin, neighbor) - } - neighborLock.RUnlock() - - Events.AddNeighbor.Attach(events.NewClosure(func(neighbor *Neighbor) { - manageConnection(plugin, neighbor) - })) - - log.Info("Starting Neighbor Connection Manager ... done") -} - -func manageConnection(plugin *node.Plugin, neighbor *Neighbor) { - daemon.BackgroundWorker("Connection Manager ("+neighbor.GetIdentity().StringIdentifier+")", func() { - failedConnectionAttempts := 0 - - for _, exists := neighbors.Load(neighbor.GetIdentity().StringIdentifier); exists && failedConnectionAttempts < CONNECTION_MAX_ATTEMPTS; { - protocol, dialed, err := neighbor.Connect() - if err != nil { - failedConnectionAttempts++ - - log.Errorf("connection attempt [%d / %d] %s", failedConnectionAttempts, CONNECTION_MAX_ATTEMPTS, err.Error()) - - if failedConnectionAttempts <= CONNECTION_MAX_ATTEMPTS { - select { - case <-daemon.ShutdownSignal: - return - - case <-time.After(time.Duration(int(math.Pow(2, float64(failedConnectionAttempts-1)))) * CONNECTION_BASE_TIMEOUT): - continue - } - } - } - - failedConnectionAttempts = 0 - - disconnectSignal := make(chan int, 1) - protocol.Conn.Events.Close.Attach(events.NewClosure(func() { - close(disconnectSignal) - })) - - if dialed { - go protocol.Init() - } - - // wait for shutdown or - select { - case <-daemon.ShutdownSignal: - return - - case <-disconnectSignal: - continue - } - } - - RemoveNeighbor(neighbor.GetIdentity().StringIdentifier) - }) -} - -type Neighbor struct { - identity *identity.Identity - identityMutex sync.RWMutex - address net.IP - addressMutex sync.RWMutex - port string - portMutex sync.RWMutex - initiatedProtocol *protocol - initiatedProtocolMutex sync.RWMutex - acceptedProtocol *protocol - Events neighborEvents - acceptedProtocolMutex sync.RWMutex - Peer *peer.Peer -} - -func NewNeighbor(peer *peer.Peer, address, port string) *Neighbor { - return &Neighbor{ - identity: identity.NewPublicIdentity(peer.ToProto().GetPublicKey()), - address: net.ParseIP(address), - port: port, - Peer: peer, - Events: neighborEvents{ - ProtocolConnectionEstablished: events.NewEvent(protocolCaller), - }, - } -} - -func (neighbor *Neighbor) GetIdentity() (result *identity.Identity) { - neighbor.identityMutex.RLock() - result = neighbor.identity - neighbor.identityMutex.RUnlock() - - return result -} - -func (neighbor *Neighbor) SetIdentity(identity *identity.Identity) { - neighbor.identityMutex.Lock() - neighbor.identity = identity - neighbor.identityMutex.Unlock() -} - -func (neighbor *Neighbor) GetAddress() (result net.IP) { - neighbor.addressMutex.RLock() - result = neighbor.address - neighbor.addressMutex.RUnlock() - - return result -} - -func (neighbor *Neighbor) SetAddress(address net.IP) { - neighbor.addressMutex.Lock() - neighbor.address = address - neighbor.addressMutex.Unlock() -} - -func (neighbor *Neighbor) GetPort() (result string) { - neighbor.portMutex.RLock() - result = neighbor.port - neighbor.portMutex.RUnlock() - - return result -} - -func (neighbor *Neighbor) SetPort(port string) { - neighbor.portMutex.Lock() - neighbor.port = port - neighbor.portMutex.Unlock() -} - -func (neighbor *Neighbor) GetInitiatedProtocol() (result *protocol) { - neighbor.initiatedProtocolMutex.RLock() - result = neighbor.initiatedProtocol - neighbor.initiatedProtocolMutex.RUnlock() - - return result -} - -func (neighbor *Neighbor) SetInitiatedProtocol(p *protocol) { - neighbor.initiatedProtocolMutex.Lock() - neighbor.initiatedProtocol = p - neighbor.initiatedProtocolMutex.Unlock() -} - -func (neighbor *Neighbor) GetAcceptedProtocol() (result *protocol) { - neighbor.acceptedProtocolMutex.RLock() - result = neighbor.acceptedProtocol - neighbor.acceptedProtocolMutex.RUnlock() - - return result -} - -func (neighbor *Neighbor) SetAcceptedProtocol(p *protocol) { - neighbor.acceptedProtocolMutex.Lock() - neighbor.acceptedProtocol = p - neighbor.acceptedProtocolMutex.Unlock() -} - -func UnmarshalPeer(data []byte) (*Neighbor, error) { - return &Neighbor{}, nil -} - -func (neighbor *Neighbor) Connect() (*protocol, bool, errors.IdentifiableError) { - // return existing connections first - if neighbor.GetInitiatedProtocol() != nil { - return neighbor.GetInitiatedProtocol(), false, nil - } - - // if we already have an accepted connection -> use it instead - if neighbor.GetAcceptedProtocol() != nil { - return neighbor.GetAcceptedProtocol(), false, nil - } - - // otherwise try to dial - conn, err := net.Dial("tcp", neighbor.GetAddress().String()+":"+neighbor.GetPort()) - if err != nil { - return nil, false, ErrConnectionFailed.Derive(err, "error when connecting to neighbor "+ - neighbor.GetIdentity().StringIdentifier+"@"+neighbor.GetAddress().String()+":"+neighbor.GetPort()) - } - - neighbor.SetInitiatedProtocol(newProtocol(network.NewManagedConnection(conn))) - - neighbor.GetInitiatedProtocol().Conn.Events.Close.Attach(events.NewClosure(func() { - neighbor.SetInitiatedProtocol(nil) - })) - - // drop the "secondary" connection upon successful handshake - neighbor.GetInitiatedProtocol().Events.HandshakeCompleted.Attach(events.NewClosure(func() { - if local.INSTANCE.ID().String() <= neighbor.Peer.ID().String() { - var acceptedProtocolConn *network.ManagedConnection - if neighbor.GetAcceptedProtocol() != nil { - acceptedProtocolConn = neighbor.GetAcceptedProtocol().Conn - } - - if acceptedProtocolConn != nil { - _ = acceptedProtocolConn.Close() - } - } - - neighbor.Events.ProtocolConnectionEstablished.Trigger(neighbor.GetInitiatedProtocol()) - })) - - return neighbor.GetInitiatedProtocol(), true, nil -} - -func (neighbor *Neighbor) Marshal() []byte { - return nil -} - -func (neighbor *Neighbor) Equals(other *Neighbor) bool { - return neighbor.GetIdentity().StringIdentifier == other.GetIdentity().StringIdentifier && - neighbor.GetPort() == other.GetPort() && neighbor.GetAddress().String() == other.GetAddress().String() -} - -func AddNeighbor(newNeighbor *Neighbor) { - if neighbor, exists := neighbors.Load(newNeighbor.GetIdentity().StringIdentifier); !exists { - neighbors.Store(newNeighbor.GetIdentity().StringIdentifier, newNeighbor) - Events.AddNeighbor.Trigger(newNeighbor) - } else { - if !neighbor.Equals(newNeighbor) { - neighbor.SetIdentity(newNeighbor.GetIdentity()) - neighbor.SetPort(newNeighbor.GetPort()) - neighbor.SetAddress(newNeighbor.GetAddress()) - - Events.UpdateNeighbor.Trigger(neighbor) - } - } -} - -func RemoveNeighbor(identifier string) { - if neighbor, exists := neighbors.Delete(identifier); exists { - Events.RemoveNeighbor.Trigger(neighbor) - } -} - -func GetNeighbor(identifier string) (*Neighbor, bool) { - return neighbors.Load(identifier) -} - -func GetNeighbors() map[string]*Neighbor { - return neighbors.GetMap() -} - -const ( - CONNECTION_MAX_ATTEMPTS = 5 - CONNECTION_BASE_TIMEOUT = 10 * time.Second -) - -var neighbors = NewNeighborMap() - -var neighborLock sync.RWMutex diff --git a/plugins/gossip/plugin.go b/plugins/gossip/plugin.go index c3d7ffef9ff81162a7f0ce80d53a71a23dfd68eb..2e6a902f914edb05efaae8f051096b4d4d8ec1b7 100644 --- a/plugins/gossip/plugin.go +++ b/plugins/gossip/plugin.go @@ -3,19 +3,25 @@ package gossip import ( "github.com/iotaledger/hive.go/logger" "github.com/iotaledger/hive.go/node" + "github.com/iotaledger/hive.go/daemon" + "github.com/iotaledger/hive.go/events" ) var PLUGIN = node.NewPlugin("Gossip", node.Enabled, configure, run) var log = logger.NewLogger("Gossip") +var ( + debugLevel = "debug" + close = make(chan struct{}, 1) +) + func configure(plugin *node.Plugin) { - configureNeighbors(plugin) - configureServer(plugin) - configureSendQueue(plugin) + daemon.Events.Shutdown.Attach(events.NewClosure(func() { + close <- struct{}{} + })) + configureGossip() + configureEvents() } func run(plugin *node.Plugin) { - runNeighbors(plugin) - runServer(plugin) - runSendQueue(plugin) } diff --git a/plugins/gossip/protocol.go b/plugins/gossip/protocol.go deleted file mode 100644 index 54c891714b80d4a7b8eb050bcfe6fb65c8c69438..0000000000000000000000000000000000000000 --- a/plugins/gossip/protocol.go +++ /dev/null @@ -1,199 +0,0 @@ -package gossip - -import ( - "strconv" - "sync" - - "github.com/iotaledger/goshimmer/packages/errors" - "github.com/iotaledger/goshimmer/packages/network" - "github.com/iotaledger/hive.go/events" -) - -// region constants and variables ////////////////////////////////////////////////////////////////////////////////////// - -var DEFAULT_PROTOCOL = protocolDefinition{ - version: VERSION_1, - initializer: protocolV1, -} - -// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// - -// region protocol ///////////////////////////////////////////////////////////////////////////////////////////////////// - -type protocol struct { - Conn *network.ManagedConnection - Neighbor *Neighbor - Version byte - sendHandshakeCompleted bool - receiveHandshakeCompleted bool - SendState protocolState - ReceivingState protocolState - Events protocolEvents - sendMutex sync.Mutex - handshakeMutex sync.Mutex -} - -func newProtocol(conn *network.ManagedConnection) *protocol { - protocol := &protocol{ - Conn: conn, - Events: protocolEvents{ - ReceiveVersion: events.NewEvent(intCaller), - ReceiveIdentification: events.NewEvent(identityCaller), - ReceiveConnectionAccepted: events.NewEvent(events.CallbackCaller), - ReceiveConnectionRejected: events.NewEvent(events.CallbackCaller), - ReceiveTransactionData: events.NewEvent(dataCaller), - HandshakeCompleted: events.NewEvent(events.CallbackCaller), - Error: events.NewEvent(errorCaller), - }, - sendHandshakeCompleted: false, - receiveHandshakeCompleted: false, - } - - protocol.SendState = &versionState{protocol: protocol} - protocol.ReceivingState = &versionState{protocol: protocol} - - return protocol -} - -func (protocol *protocol) Init() { - // setup event handlers - onReceiveData := events.NewClosure(protocol.Receive) - onConnectionAccepted := events.NewClosure(func() { - protocol.handshakeMutex.Lock() - defer protocol.handshakeMutex.Unlock() - - protocol.receiveHandshakeCompleted = true - if protocol.sendHandshakeCompleted { - protocol.Events.HandshakeCompleted.Trigger() - } - }) - var onClose *events.Closure - onClose = events.NewClosure(func() { - protocol.Conn.Events.ReceiveData.Detach(onReceiveData) - protocol.Conn.Events.Close.Detach(onClose) - protocol.Events.ReceiveConnectionAccepted.Detach(onConnectionAccepted) - }) - - // region register event handlers - protocol.Conn.Events.ReceiveData.Attach(onReceiveData) - protocol.Conn.Events.Close.Attach(onClose) - protocol.Events.ReceiveConnectionAccepted.Attach(onConnectionAccepted) - - // send protocol version - if err := protocol.Send(DEFAULT_PROTOCOL.version); err != nil { - return - } - - // initialize default protocol - if err := DEFAULT_PROTOCOL.initializer(protocol); err != nil { - protocol.SendState = nil - - _ = protocol.Conn.Close() - - protocol.Events.Error.Trigger(err) - - return - } - - // start reading from the connection - _, _ = protocol.Conn.Read(make([]byte, 1000)) -} - -func (protocol *protocol) Receive(data []byte) { - offset := 0 - length := len(data) - for offset < length && protocol.ReceivingState != nil { - if readBytes, err := protocol.ReceivingState.Receive(data, offset, length); err != nil { - Events.Error.Trigger(err) - - _ = protocol.Conn.Close() - - return - } else { - offset += readBytes - } - } -} - -func (protocol *protocol) Send(data interface{}) errors.IdentifiableError { - protocol.sendMutex.Lock() - defer protocol.sendMutex.Unlock() - - return protocol.send(data) -} - -func (protocol *protocol) send(data interface{}) errors.IdentifiableError { - if protocol.SendState != nil { - if err := protocol.SendState.Send(data); err != nil { - protocol.SendState = nil - - _ = protocol.Conn.Close() - - protocol.Events.Error.Trigger(err) - - return err - } - } - - return nil -} - -// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// - -// region versionState ///////////////////////////////////////////////////////////////////////////////////////////////// - -type versionState struct { - protocol *protocol -} - -func (state *versionState) Receive(data []byte, offset int, length int) (int, errors.IdentifiableError) { - switch data[offset] { - case 1: - protocol := state.protocol - - protocol.Version = 1 - protocol.Events.ReceiveVersion.Trigger(1) - - protocol.ReceivingState = newIndentificationStateV1(protocol) - - return 1, nil - - default: - return 1, ErrInvalidStateTransition.Derive("invalid version state transition (" + strconv.Itoa(int(data[offset])) + ")") - } -} - -func (state *versionState) Send(param interface{}) errors.IdentifiableError { - if version, ok := param.(byte); ok { - switch version { - case VERSION_1: - protocol := state.protocol - - if _, err := protocol.Conn.Write([]byte{version}); err != nil { - return ErrSendFailed.Derive(err, "failed to send version byte") - } - - protocol.SendState = newIndentificationStateV1(protocol) - - return nil - } - } - - return ErrInvalidSendParam.Derive("passed in parameter is not a valid version byte") -} - -// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// - -// region types and interfaces ///////////////////////////////////////////////////////////////////////////////////////// - -type protocolState interface { - Send(param interface{}) errors.IdentifiableError - Receive(data []byte, offset int, length int) (int, errors.IdentifiableError) -} - -type protocolDefinition struct { - version byte - initializer func(*protocol) errors.IdentifiableError -} - -// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/plugins/gossip/protocol_v1.go b/plugins/gossip/protocol_v1.go deleted file mode 100644 index 268ec8dc249f685f352143a78122d621cef0c0b6..0000000000000000000000000000000000000000 --- a/plugins/gossip/protocol_v1.go +++ /dev/null @@ -1,383 +0,0 @@ -package gossip - -import ( - "strconv" - - "github.com/iotaledger/goshimmer/packages/byteutils" - "github.com/iotaledger/goshimmer/packages/errors" - "github.com/iotaledger/goshimmer/packages/identity" - "github.com/iotaledger/goshimmer/packages/model/meta_transaction" - "github.com/iotaledger/goshimmer/plugins/autopeering/local" - "github.com/iotaledger/hive.go/events" - "github.com/iotaledger/iota.go/consts" -) - -// region protocolV1 /////////////////////////////////////////////////////////////////////////////////////////////////// - -func protocolV1(protocol *protocol) errors.IdentifiableError { - if err := protocol.Send(local.INSTANCE.ID().Bytes()); err != nil { - return err - } - - onReceiveIdentification := events.NewClosure(func(identity *identity.Identity) { - if protocol.Neighbor == nil { - if err := protocol.Send(CONNECTION_REJECT); err != nil { - return - } - } else { - if err := protocol.Send(CONNECTION_ACCEPT); err != nil { - return - } - - protocol.handshakeMutex.Lock() - defer protocol.handshakeMutex.Unlock() - - protocol.sendHandshakeCompleted = true - if protocol.receiveHandshakeCompleted { - protocol.Events.HandshakeCompleted.Trigger() - } - } - }) - - protocol.Events.ReceiveIdentification.Attach(onReceiveIdentification) - - return nil -} - -func sendTransactionV1(protocol *protocol, tx *meta_transaction.MetaTransaction) { - if _, ok := protocol.SendState.(*dispatchStateV1); ok { - protocol.sendMutex.Lock() - defer protocol.sendMutex.Unlock() - - if err := protocol.send(DISPATCH_TRANSACTION); err != nil { - return - } - if err := protocol.send(tx); err != nil { - return - } - } -} - -// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// - -// region indentificationStateV1 /////////////////////////////////////////////////////////////////////////////////////// - -type indentificationStateV1 struct { - protocol *protocol - buffer []byte - offset int -} - -func newIndentificationStateV1(protocol *protocol) *indentificationStateV1 { - return &indentificationStateV1{ - protocol: protocol, - buffer: make([]byte, MARSHALED_IDENTITY_TOTAL_SIZE), - offset: 0, - } -} - -func (state *indentificationStateV1) Receive(data []byte, offset int, length int) (int, errors.IdentifiableError) { - bytesRead := byteutils.ReadAvailableBytesToBuffer(state.buffer, state.offset, data, offset, length) - - state.offset += bytesRead - if state.offset == MARSHALED_IDENTITY_TOTAL_SIZE { - receivedIdentity, err := identity.FromSignedData(state.buffer) - if err != nil { - return bytesRead, ErrInvalidAuthenticationMessage.Derive(err, "invalid authentication message") - } - protocol := state.protocol - - if neighbor, exists := GetNeighbor(receivedIdentity.StringIdentifier); exists { - protocol.Neighbor = neighbor - } else { - protocol.Neighbor = nil - } - - protocol.Events.ReceiveIdentification.Trigger(receivedIdentity) - - // switch to new state - protocol.ReceivingState = newacceptanceStateV1(protocol) - state.offset = 0 - } - - return bytesRead, nil -} - -func (state *indentificationStateV1) Send(param interface{}) errors.IdentifiableError { - id, ok := param.(*identity.Identity) - if !ok { - return ErrInvalidSendParam.Derive("parameter is not a valid identity") - } - - msg := id.Identifier.Bytes() - data := id.AddSignature(msg) - - protocol := state.protocol - if _, err := protocol.Conn.Write(data); err != nil { - return ErrSendFailed.Derive(err, "failed to send identification") - } - - // switch to new state - protocol.SendState = newacceptanceStateV1(protocol) - - return nil -} - -// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// - -// region acceptanceStateV1 //////////////////////////////////////////////////////////////////////////////////////////// - -type acceptanceStateV1 struct { - protocol *protocol -} - -func newacceptanceStateV1(protocol *protocol) *acceptanceStateV1 { - return &acceptanceStateV1{protocol: protocol} -} - -func (state *acceptanceStateV1) Receive(data []byte, offset int, length int) (int, errors.IdentifiableError) { - protocol := state.protocol - - switch data[offset] { - case 0: - protocol.Events.ReceiveConnectionRejected.Trigger() - - _ = protocol.Conn.Close() - - protocol.ReceivingState = nil - - case 1: - protocol.Events.ReceiveConnectionAccepted.Trigger() - - protocol.ReceivingState = newDispatchStateV1(protocol) - - default: - return 1, ErrInvalidStateTransition.Derive("invalid acceptance state transition (" + strconv.Itoa(int(data[offset])) + ")") - } - - return 1, nil -} - -func (state *acceptanceStateV1) Send(param interface{}) errors.IdentifiableError { - if responseType, ok := param.(byte); ok { - switch responseType { - case CONNECTION_REJECT: - protocol := state.protocol - - if _, err := protocol.Conn.Write([]byte{CONNECTION_REJECT}); err != nil { - return ErrSendFailed.Derive(err, "failed to send reject message") - } - - _ = protocol.Conn.Close() - - protocol.SendState = nil - - return nil - - case CONNECTION_ACCEPT: - protocol := state.protocol - - if _, err := protocol.Conn.Write([]byte{CONNECTION_ACCEPT}); err != nil { - return ErrSendFailed.Derive(err, "failed to send accept message") - } - - protocol.SendState = newDispatchStateV1(protocol) - - return nil - } - } - - return ErrInvalidSendParam.Derive("passed in parameter is not a valid acceptance byte") -} - -// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// - -// region dispatchStateV1 ////////////////////////////////////////////////////////////////////////////////////////////// - -type dispatchStateV1 struct { - protocol *protocol -} - -func newDispatchStateV1(protocol *protocol) *dispatchStateV1 { - return &dispatchStateV1{ - protocol: protocol, - } -} - -func (state *dispatchStateV1) Receive(data []byte, offset int, length int) (int, errors.IdentifiableError) { - switch data[0] { - case DISPATCH_DROP: - protocol := state.protocol - - protocol.Events.ReceiveConnectionRejected.Trigger() - - _ = protocol.Conn.Close() - - protocol.ReceivingState = nil - - case DISPATCH_TRANSACTION: - protocol := state.protocol - - protocol.ReceivingState = newTransactionStateV1(protocol) - - case DISPATCH_REQUEST: - protocol := state.protocol - - protocol.ReceivingState = newRequestStateV1(protocol) - - default: - return 1, ErrInvalidStateTransition.Derive("invalid dispatch state transition (" + strconv.Itoa(int(data[offset])) + ")") - } - - return 1, nil -} - -func (state *dispatchStateV1) Send(param interface{}) errors.IdentifiableError { - if dispatchByte, ok := param.(byte); ok { - switch dispatchByte { - case DISPATCH_DROP: - protocol := state.protocol - - if _, err := protocol.Conn.Write([]byte{DISPATCH_DROP}); err != nil { - return ErrSendFailed.Derive(err, "failed to send drop message") - } - - _ = protocol.Conn.Close() - - protocol.SendState = nil - - return nil - - case DISPATCH_TRANSACTION: - protocol := state.protocol - - if _, err := protocol.Conn.Write([]byte{DISPATCH_TRANSACTION}); err != nil { - return ErrSendFailed.Derive(err, "failed to send transaction dispatch byte") - } - - protocol.SendState = newTransactionStateV1(protocol) - - return nil - - case DISPATCH_REQUEST: - protocol := state.protocol - - if _, err := protocol.Conn.Write([]byte{DISPATCH_REQUEST}); err != nil { - return ErrSendFailed.Derive(err, "failed to send request dispatch byte") - } - - protocol.SendState = newTransactionStateV1(protocol) - - return nil - } - } - - return ErrInvalidSendParam.Derive("passed in parameter is not a valid dispatch byte") -} - -// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// - -// region transactionStateV1 /////////////////////////////////////////////////////////////////////////////////////////// - -type transactionStateV1 struct { - protocol *protocol - buffer []byte - offset int -} - -func newTransactionStateV1(protocol *protocol) *transactionStateV1 { - return &transactionStateV1{ - protocol: protocol, - buffer: make([]byte, meta_transaction.MARSHALED_TOTAL_SIZE/consts.NumberOfTritsInAByte), - offset: 0, - } -} - -func (state *transactionStateV1) Receive(data []byte, offset int, length int) (int, errors.IdentifiableError) { - bytesRead := byteutils.ReadAvailableBytesToBuffer(state.buffer, state.offset, data, offset, length) - - state.offset += bytesRead - if state.offset == meta_transaction.MARSHALED_TOTAL_SIZE/consts.NumberOfTritsInAByte { - protocol := state.protocol - - transactionData := make([]byte, meta_transaction.MARSHALED_TOTAL_SIZE/consts.NumberOfTritsInAByte) - copy(transactionData, state.buffer) - - protocol.Events.ReceiveTransactionData.Trigger(transactionData) - - go ProcessReceivedTransactionData(transactionData) - - protocol.ReceivingState = newDispatchStateV1(protocol) - state.offset = 0 - } - - return bytesRead, nil -} - -func (state *transactionStateV1) Send(param interface{}) errors.IdentifiableError { - if tx, ok := param.(*meta_transaction.MetaTransaction); ok { - protocol := state.protocol - - if _, err := protocol.Conn.Write(tx.GetBytes()); err != nil { - return ErrSendFailed.Derive(err, "failed to send transaction") - } - - protocol.SendState = newDispatchStateV1(protocol) - - return nil - } - - return ErrInvalidSendParam.Derive("passed in parameter is not a valid transaction") -} - -// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// - -// region requestStateV1 /////////////////////////////////////////////////////////////////////////////////////////////// - -type requestStateV1 struct { - buffer []byte - offset int -} - -func newRequestStateV1(protocol *protocol) *requestStateV1 { - return &requestStateV1{ - buffer: make([]byte, 1), - offset: 0, - } -} - -func (state *requestStateV1) Receive(data []byte, offset int, length int) (int, errors.IdentifiableError) { - return 0, nil -} - -func (state *requestStateV1) Send(param interface{}) errors.IdentifiableError { - return nil -} - -// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// - -// region constants and variables ////////////////////////////////////////////////////////////////////////////////////// - -const ( - VERSION_1 = byte(1) - - CONNECTION_REJECT = byte(0) - CONNECTION_ACCEPT = byte(1) - - DISPATCH_DROP = byte(0) - DISPATCH_TRANSACTION = byte(1) - DISPATCH_REQUEST = byte(2) - - MARSHALED_IDENTITY_IDENTIFIER_START = 0 - MARSHALED_IDENTITY_SIGNATURE_START = MARSHALED_IDENTITY_IDENTIFIER_END - - MARSHALED_IDENTITY_IDENTIFIER_SIZE = identity.IDENTIFIER_BYTE_LENGTH - MARSHALED_IDENTITY_SIGNATURE_SIZE = identity.SIGNATURE_BYTE_LENGTH - - MARSHALED_IDENTITY_IDENTIFIER_END = MARSHALED_IDENTITY_IDENTIFIER_START + MARSHALED_IDENTITY_IDENTIFIER_SIZE - MARSHALED_IDENTITY_SIGNATURE_END = MARSHALED_IDENTITY_SIGNATURE_START + MARSHALED_IDENTITY_SIGNATURE_SIZE - - MARSHALED_IDENTITY_TOTAL_SIZE = MARSHALED_IDENTITY_SIGNATURE_END -) - -// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/plugins/gossip/protocol_v1.png b/plugins/gossip/protocol_v1.png deleted file mode 100644 index ce667756467d35bf33185cf5c3727894ea7c4e57..0000000000000000000000000000000000000000 Binary files a/plugins/gossip/protocol_v1.png and /dev/null differ diff --git a/plugins/gossip/send_queue.go b/plugins/gossip/send_queue.go deleted file mode 100644 index c85502895189fc49a46c1013ad21cbf2f57f004e..0000000000000000000000000000000000000000 --- a/plugins/gossip/send_queue.go +++ /dev/null @@ -1,156 +0,0 @@ -package gossip - -import ( - "sync" - - "github.com/iotaledger/goshimmer/packages/model/meta_transaction" - "github.com/iotaledger/hive.go/daemon" - "github.com/iotaledger/hive.go/events" - "github.com/iotaledger/hive.go/node" -) - -// region plugin module setup ////////////////////////////////////////////////////////////////////////////////////////// - -func configureSendQueue(plugin *node.Plugin) { - for _, neighbor := range neighbors.GetMap() { - setupEventHandlers(neighbor) - } - - Events.AddNeighbor.Attach(events.NewClosure(setupEventHandlers)) - - daemon.Events.Shutdown.Attach(events.NewClosure(func() { - log.Info("Stopping Send Queue Dispatcher ...") - })) -} - -func runSendQueue(plugin *node.Plugin) { - log.Info("Starting Send Queue Dispatcher ...") - - daemon.BackgroundWorker("Gossip Send Queue Dispatcher", func() { - log.Info("Starting Send Queue Dispatcher ... done") - - for { - select { - case <-daemon.ShutdownSignal: - log.Info("Stopping Send Queue Dispatcher ... done") - - return - - case tx := <-sendQueue: - connectedNeighborsMutex.RLock() - for _, neighborQueue := range neighborQueues { - select { - case neighborQueue.queue <- tx: - // log sth - - default: - // log sth - } - } - connectedNeighborsMutex.RUnlock() - } - } - }) - - connectedNeighborsMutex.Lock() - for _, neighborQueue := range neighborQueues { - startNeighborSendQueue(neighborQueue.protocol.Neighbor, neighborQueue) - } - connectedNeighborsMutex.Unlock() -} - -// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// - -// region public api /////////////////////////////////////////////////////////////////////////////////////////////////// - -func SendTransaction(transaction *meta_transaction.MetaTransaction) { - sendQueue <- transaction -} - -func (neighbor *Neighbor) SendTransaction(transaction *meta_transaction.MetaTransaction) { - if queue, exists := neighborQueues[neighbor.GetIdentity().StringIdentifier]; exists { - select { - case queue.queue <- transaction: - return - - default: - return - } - } -} - -// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// - -// region utility methods ////////////////////////////////////////////////////////////////////////////////////////////// - -func setupEventHandlers(neighbor *Neighbor) { - neighbor.Events.ProtocolConnectionEstablished.Attach(events.NewClosure(func(protocol *protocol) { - queue := &neighborQueue{ - protocol: protocol, - queue: make(chan *meta_transaction.MetaTransaction, SEND_QUEUE_SIZE), - disconnectChan: make(chan int, 1), - } - - connectedNeighborsMutex.Lock() - neighborQueues[neighbor.GetIdentity().StringIdentifier] = queue - connectedNeighborsMutex.Unlock() - - protocol.Conn.Events.Close.Attach(events.NewClosure(func() { - close(queue.disconnectChan) - - connectedNeighborsMutex.Lock() - delete(neighborQueues, neighbor.GetIdentity().StringIdentifier) - connectedNeighborsMutex.Unlock() - })) - - if daemon.IsRunning() { - startNeighborSendQueue(neighbor, queue) - } - })) -} - -func startNeighborSendQueue(neighbor *Neighbor, neighborQueue *neighborQueue) { - daemon.BackgroundWorker("Gossip Send Queue ("+neighbor.GetIdentity().StringIdentifier+")", func() { - for { - select { - case <-daemon.ShutdownSignal: - return - - case <-neighborQueue.disconnectChan: - return - - case tx := <-neighborQueue.queue: - switch neighborQueue.protocol.Version { - case VERSION_1: - sendTransactionV1(neighborQueue.protocol, tx) - } - } - } - }) -} - -// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// - -// region types and interfaces ///////////////////////////////////////////////////////////////////////////////////////// - -type neighborQueue struct { - protocol *protocol - queue chan *meta_transaction.MetaTransaction - disconnectChan chan int -} - -// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// - -// region constants and variables ////////////////////////////////////////////////////////////////////////////////////// - -var neighborQueues = make(map[string]*neighborQueue) - -var connectedNeighborsMutex sync.RWMutex - -var sendQueue = make(chan *meta_transaction.MetaTransaction, SEND_QUEUE_SIZE) - -const ( - SEND_QUEUE_SIZE = 500 -) - -// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/plugins/gossip/server.go b/plugins/gossip/server.go deleted file mode 100644 index 3a34424026c7f81a740d0bfb8cdf22c07cd252b6..0000000000000000000000000000000000000000 --- a/plugins/gossip/server.go +++ /dev/null @@ -1,77 +0,0 @@ -package gossip - -import ( - "github.com/iotaledger/goshimmer/packages/errors" - "github.com/iotaledger/goshimmer/packages/identity" - "github.com/iotaledger/goshimmer/packages/network" - "github.com/iotaledger/goshimmer/packages/network/tcp" - "github.com/iotaledger/goshimmer/plugins/autopeering/local" - "github.com/iotaledger/hive.go/daemon" - "github.com/iotaledger/hive.go/events" - "github.com/iotaledger/hive.go/node" - "github.com/iotaledger/hive.go/parameter" -) - -var TCPServer = tcp.NewServer() - -func configureServer(plugin *node.Plugin) { - TCPServer.Events.Connect.Attach(events.NewClosure(func(conn *network.ManagedConnection) { - protocol := newProtocol(conn) - - // print protocol errors - protocol.Events.Error.Attach(events.NewClosure(func(err errors.IdentifiableError) { - log.Error(err.Error()) - })) - - // store protocol in neighbor if its a neighbor calling - protocol.Events.ReceiveIdentification.Attach(events.NewClosure(func(identity *identity.Identity) { - if protocol.Neighbor != nil { - - if protocol.Neighbor.GetAcceptedProtocol() == nil { - protocol.Neighbor.SetAcceptedProtocol(protocol) - - protocol.Conn.Events.Close.Attach(events.NewClosure(func() { - protocol.Neighbor.SetAcceptedProtocol(nil) - })) - } - } - })) - - // drop the "secondary" connection upon successful handshake - protocol.Events.HandshakeCompleted.Attach(events.NewClosure(func() { - if protocol.Neighbor.Peer.ID().String() <= local.INSTANCE.ID().String() { - var initiatedProtocolConn *network.ManagedConnection - if protocol.Neighbor.GetInitiatedProtocol() != nil { - initiatedProtocolConn = protocol.Neighbor.GetInitiatedProtocol().Conn - } - - if initiatedProtocolConn != nil { - _ = initiatedProtocolConn.Close() - } - } - - protocol.Neighbor.Events.ProtocolConnectionEstablished.Trigger(protocol) - })) - - go protocol.Init() - })) - - daemon.Events.Shutdown.Attach(events.NewClosure(func() { - log.Info("Stopping TCP Server ...") - - TCPServer.Shutdown() - })) -} - -func runServer(plugin *node.Plugin) { - gossipPort := parameter.NodeConfig.GetInt(GOSSIP_PORT) - log.Infof("Starting TCP Server (port %d) ...", gossipPort) - - daemon.BackgroundWorker("Gossip TCP Server", func() { - log.Infof("Starting TCP Server (port %d) ... done", gossipPort) - - TCPServer.Listen(gossipPort) - - log.Info("Stopping TCP Server ... done") - }) -} diff --git a/plugins/gossip/transaction_processor.go b/plugins/gossip/transaction_processor.go deleted file mode 100644 index 742ac36929dfabd2d64b293c8906f397b02575a1..0000000000000000000000000000000000000000 --- a/plugins/gossip/transaction_processor.go +++ /dev/null @@ -1,26 +0,0 @@ -package gossip - -import ( - "github.com/iotaledger/goshimmer/packages/filter" - "github.com/iotaledger/goshimmer/packages/model/meta_transaction" -) - -// region public api /////////////////////////////////////////////////////////////////////////////////////////////////// - -func ProcessReceivedTransactionData(transactionData []byte) { - if transactionFilter.Add(transactionData) { - Events.ReceiveTransaction.Trigger(meta_transaction.FromBytes(transactionData)) - } -} - -// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// - -// region constants and variables ////////////////////////////////////////////////////////////////////////////////////// - -var transactionFilter = filter.NewByteArrayFilter(TRANSACTION_FILTER_SIZE) - -const ( - TRANSACTION_FILTER_SIZE = 500 -) - -// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/plugins/gossip/transaction_processor_test.go b/plugins/gossip/transaction_processor_test.go deleted file mode 100644 index 082eba9d38ffa31b0f3e29e2b8f39c9abed101fc..0000000000000000000000000000000000000000 --- a/plugins/gossip/transaction_processor_test.go +++ /dev/null @@ -1,49 +0,0 @@ -package gossip - -import ( - "sync" - "testing" - - "github.com/iotaledger/goshimmer/packages/model/meta_transaction" - "github.com/iotaledger/iota.go/consts" -) - -func BenchmarkProcessSimilarTransactionsFiltered(b *testing.B) { - byteArray := setupTransaction(meta_transaction.MARSHALED_TOTAL_SIZE / consts.NumberOfTritsInAByte) - - b.ResetTimer() - - for i := 0; i < b.N; i++ { - ProcessReceivedTransactionData(byteArray) - } -} - -func BenchmarkProcessSimilarTransactionsUnfiltered(b *testing.B) { - byteArray := setupTransaction(meta_transaction.MARSHALED_TOTAL_SIZE / consts.NumberOfTritsInAByte) - - b.ResetTimer() - - var wg sync.WaitGroup - - for i := 0; i < b.N; i++ { - wg.Add(1) - - go func() { - Events.ReceiveTransaction.Trigger(meta_transaction.FromBytes(byteArray)) - - wg.Done() - }() - } - - wg.Wait() -} - -func setupTransaction(byteArraySize int) []byte { - byteArray := make([]byte, byteArraySize) - - for i := 0; i < len(byteArray); i++ { - byteArray[i] = byte(i % 128) - } - - return byteArray -} diff --git a/plugins/tangle/events.go b/plugins/tangle/events.go index 3cc90ba43176512d4db1aa835bb180b921d35844..b9c0acf5a9ca4d9a84ccc4a291ea978de1222b44 100644 --- a/plugins/tangle/events.go +++ b/plugins/tangle/events.go @@ -1,7 +1,7 @@ package tangle import ( - "github.com/iotaledger/goshimmer/packages/model/value_transaction" + "github.com/iotaledger/goshimmer/packages/model/meta_transaction" "github.com/iotaledger/hive.go/events" ) @@ -16,5 +16,5 @@ type pluginEvents struct { } func transactionCaller(handler interface{}, params ...interface{}) { - handler.(func(*value_transaction.ValueTransaction))(params[0].(*value_transaction.ValueTransaction)) + handler.(func(*meta_transaction.MetaTransaction))(params[0].(*meta_transaction.MetaTransaction)) }