Skip to content
Snippets Groups Projects
Commit 1cd22b16 authored by capossele's avatar capossele
Browse files

:sparkles: integrates latest gossip package

parent 5ef395cf
Branches
Tags
No related merge requests found
......@@ -4,5 +4,6 @@ import "github.com/pkg/errors"
var (
ErrClosed = errors.New("manager closed")
ErrNotANeighbor = errors.New("peer is not a neighbor")
ErrDuplicateNeighbor = errors.New("peer already connected")
)
......@@ -5,28 +5,40 @@ import (
"github.com/iotaledger/hive.go/events"
)
// Events contains all the events that are triggered during the gossip protocol.
// Events contains all the events related to the gossip protocol.
var Events = struct {
// A NewTransaction event is triggered when a new transaction is received by the gossip protocol.
NewTransaction *events.Event
DropNeighbor *events.Event
// A TransactionReceived event is triggered when a new transaction is received by the gossip protocol.
TransactionReceived *events.Event
// A NeighborDropped event is triggered when a neighbor has been dropped.
NeighborDropped *events.Event
// A RequestTransaction should be triggered for a transaction to be requested through the gossip protocol.
RequestTransaction *events.Event
}{
NewTransaction: events.NewEvent(newTransaction),
DropNeighbor: events.NewEvent(dropNeighbor),
TransactionReceived: events.NewEvent(transactionReceived),
NeighborDropped: events.NewEvent(neighborDropped),
RequestTransaction: events.NewEvent(requestTransaction),
}
type NewTransactionEvent struct {
type TransactionReceivedEvent struct {
Body []byte
Peer *peer.Peer
}
type DropNeighborEvent struct {
type RequestTransactionEvent struct {
Hash []byte // hash of the transaction to request
}
type NeighborDroppedEvent struct {
Peer *peer.Peer
}
func newTransaction(handler interface{}, params ...interface{}) {
handler.(func(*NewTransactionEvent))(params[0].(*NewTransactionEvent))
func transactionReceived(handler interface{}, params ...interface{}) {
handler.(func(*TransactionReceivedEvent))(params[0].(*TransactionReceivedEvent))
}
func requestTransaction(handler interface{}, params ...interface{}) {
handler.(func(*RequestTransactionEvent))(params[0].(*RequestTransactionEvent))
}
func dropNeighbor(handler interface{}, params ...interface{}) {
handler.(func(*DropNeighborEvent))(params[0].(*DropNeighborEvent))
func neighborDropped(handler interface{}, params ...interface{}) {
handler.(func(*NeighborDroppedEvent))(params[0].(*NeighborDroppedEvent))
}
......@@ -49,28 +49,59 @@ func NewManager(t *transport.TCP, log *zap.SugaredLogger, f GetTransaction) *Man
return m
}
// Close stops the manager and closes all established connections.
func (m *Manager) Close() {
m.mu.Lock()
m.running = false
// close all connections
for _, n := range m.neighbors {
_ = n.conn.Close()
}
m.mu.Unlock()
m.wg.Wait()
}
// AddOutbound tries to add a neighbor by connecting to that peer.
func (m *Manager) AddOutbound(p *peer.Peer) error {
return m.addNeighbor(p, m.trans.DialPeer)
}
// AddInbound tries to add a neighbor by accepting an incoming connection from that peer.
func (m *Manager) AddInbound(p *peer.Peer) error {
return m.addNeighbor(p, m.trans.AcceptPeer)
}
func (m *Manager) DropNeighbor(p peer.ID) {
m.deleteNeighbor(p)
// NeighborDropped disconnects the neighbor with the given ID.
func (m *Manager) DropNeighbor(id peer.ID) error {
m.mu.Lock()
defer m.mu.Unlock()
if _, ok := m.neighbors[id]; !ok {
return ErrNotANeighbor
}
n := m.neighbors[id]
delete(m.neighbors, id)
disconnect(n.conn)
return nil
}
func (m *Manager) Close() {
m.mu.Lock()
m.running = false
// close all connections
for _, n := range m.neighbors {
_ = n.conn.Close()
// RequestTransaction requests the transaction with the given hash from the neighbors.
// If no peer is provided, all neighbors are queried.
func (m *Manager) RequestTransaction(txHash []byte, to ...peer.ID) {
req := &pb.TransactionRequest{
Hash: txHash,
}
m.mu.Unlock()
m.send(marshal(req), to...)
}
m.wg.Wait()
// SendTransaction sends the given transaction data to the neighbors.
// If no peer is provided, it is send to all neighbors.
func (m *Manager) SendTransaction(txData []byte, to ...peer.ID) {
tx := &pb.Transaction{
Body: txData,
}
m.send(marshal(tx), to...)
}
func (m *Manager) getNeighbors(ids ...peer.ID) []*neighbor {
......@@ -105,21 +136,6 @@ func (m *Manager) getNeighborsById(ids []peer.ID) []*neighbor {
return result
}
func (m *Manager) RequestTransaction(txHash []byte, to ...peer.ID) {
req := &pb.TransactionRequest{
Hash: txHash,
}
m.send(marshal(req), to...)
}
// SendTransaction sends the transaction data to the given neighbors.
func (m *Manager) SendTransaction(txData []byte, to ...peer.ID) {
tx := &pb.Transaction{
Body: txData,
}
m.send(marshal(tx), to...)
}
func (m *Manager) send(msg []byte, to ...peer.ID) {
if l := len(msg); l > maxPacketSize {
m.log.Errorw("message too large", "len", l, "max", maxPacketSize)
......@@ -150,7 +166,7 @@ func (m *Manager) addNeighbor(peer *peer.Peer, handshake func(*peer.Peer) (*tran
// could not add neighbor
if err != nil {
m.log.Debugw("addNeighbor failed", "peer", peer.ID(), "err", err)
Events.DropNeighbor.Trigger(&DropNeighborEvent{Peer: peer})
Events.NeighborDropped.Trigger(&NeighborDroppedEvent{Peer: peer})
return err
}
......@@ -176,18 +192,6 @@ func (m *Manager) addNeighbor(peer *peer.Peer, handshake func(*peer.Peer) (*tran
return nil
}
func (m *Manager) deleteNeighbor(peer peer.ID) {
m.mu.Lock()
defer m.mu.Unlock()
if _, ok := m.neighbors[peer]; !ok {
return
}
n := m.neighbors[peer]
delete(m.neighbors, peer)
disconnect(n.conn)
}
func (m *Manager) readLoop(nbr *neighbor) {
m.wg.Add(1)
defer m.wg.Done()
......@@ -207,7 +211,7 @@ func (m *Manager) readLoop(nbr *neighbor) {
m.log.Warnw("read error", "err", err)
}
_ = nbr.conn.Close() // just make sure that the connection is closed as fast as possible
m.deleteNeighbor(nbr.peer.ID())
_ = m.DropNeighbor(nbr.peer.ID())
m.log.Debug("reading stopped")
return
}
......@@ -228,7 +232,7 @@ func (m *Manager) handlePacket(data []byte, n *neighbor) error {
return errors.Wrap(err, "invalid message")
}
m.log.Debugw("Received Transaction", "data", msg.GetBody())
Events.NewTransaction.Trigger(&NewTransactionEvent{Body: msg.GetBody(), Peer: n.peer})
Events.TransactionReceived.Trigger(&TransactionReceivedEvent{Body: msg.GetBody(), Peer: n.peer})
// Incoming Transaction request
case pb.MTransactionRequest:
......@@ -267,5 +271,5 @@ func marshal(msg pb.Message) []byte {
func disconnect(conn *transport.Connection) {
_ = conn.Close()
Events.DropNeighbor.Trigger(&DropNeighborEvent{Peer: conn.Peer()})
Events.NeighborDropped.Trigger(&NeighborDroppedEvent{Peer: conn.Peer()})
}
......@@ -6,11 +6,11 @@ import (
"testing"
"time"
pb "github.com/iotaledger/goshimmer/packages/gossip/proto"
"github.com/iotaledger/goshimmer/packages/gossip/transport"
"github.com/golang/protobuf/proto"
"github.com/iotaledger/autopeering-sim/peer"
"github.com/iotaledger/autopeering-sim/peer/service"
pb "github.com/iotaledger/goshimmer/packages/gossip/proto"
"github.com/iotaledger/goshimmer/packages/gossip/transport"
"github.com/iotaledger/hive.go/events"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
......@@ -27,8 +27,8 @@ var (
testTxData = []byte("testTx")
)
func newTransactionEvent(ev *NewTransactionEvent) { eventMock.Called(ev) }
func dropNeighborEvent(ev *DropNeighborEvent) { eventMock.Called(ev) }
func transactionReceivedEvent(ev *TransactionReceivedEvent) { eventMock.Called(ev) }
func neighborDroppedEvent(ev *NeighborDroppedEvent) { eventMock.Called(ev) }
// assertEvents initializes the mock and asserts the expectations
func assertEvents(t *testing.T) func() {
......@@ -47,9 +47,9 @@ func init() {
}
logger = l.Sugar()
// mock the events
Events.NewTransaction.Attach(events.NewClosure(newTransactionEvent))
Events.DropNeighbor.Attach(events.NewClosure(dropNeighborEvent))
// mock the events triggered by the gossip
Events.TransactionReceived.Attach(events.NewClosure(transactionReceivedEvent))
Events.NeighborDropped.Attach(events.NewClosure(neighborDroppedEvent))
}
func getTestTransaction([]byte) ([]byte, error) {
......@@ -101,24 +101,25 @@ func TestClosedConnection(t *testing.T) {
// B -> A
go func() {
defer wg.Done()
err := mgrA.addNeighbor(peerB, mgrA.trans.AcceptPeer)
err := mgrA.AddInbound(peerB)
assert.NoError(t, err)
}()
time.Sleep(graceTime)
go func() {
defer wg.Done()
err := mgrB.addNeighbor(peerA, mgrB.trans.DialPeer)
err := mgrB.AddOutbound(peerA)
assert.NoError(t, err)
}()
// wait for the connections to establish
wg.Wait()
eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerA}).Once()
eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerB}).Once()
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerA}).Once()
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerB}).Once()
// A drops B
mgrA.deleteNeighbor(peerB.ID())
err := mgrA.NeighborDropped(peerB.ID())
require.NoError(t, err)
time.Sleep(graceTime)
// the events should be there even before we close
......@@ -140,25 +141,25 @@ func TestP2PSend(t *testing.T) {
// B -> A
go func() {
defer wg.Done()
err := mgrA.addNeighbor(peerB, mgrA.trans.AcceptPeer)
err := mgrA.AddInbound(peerB)
assert.NoError(t, err)
}()
time.Sleep(graceTime)
go func() {
defer wg.Done()
err := mgrB.addNeighbor(peerA, mgrB.trans.DialPeer)
err := mgrB.AddOutbound(peerA)
assert.NoError(t, err)
}()
// wait for the connections to establish
wg.Wait()
eventMock.On("newTransactionEvent", &NewTransactionEvent{
eventMock.On("transactionReceivedEvent", &TransactionReceivedEvent{
Body: testTxData,
Peer: peerA,
}).Once()
eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerA}).Once()
eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerB}).Once()
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerA}).Once()
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerB}).Once()
mgrA.SendTransaction(testTxData)
time.Sleep(graceTime)
......@@ -179,25 +180,25 @@ func TestP2PSendTwice(t *testing.T) {
// B -> A
go func() {
defer wg.Done()
err := mgrA.addNeighbor(peerB, mgrA.trans.AcceptPeer)
err := mgrA.AddInbound(peerB)
assert.NoError(t, err)
}()
time.Sleep(graceTime)
go func() {
defer wg.Done()
err := mgrB.addNeighbor(peerA, mgrB.trans.DialPeer)
err := mgrB.AddOutbound(peerA)
assert.NoError(t, err)
}()
// wait for the connections to establish
wg.Wait()
eventMock.On("newTransactionEvent", &NewTransactionEvent{
eventMock.On("transactionReceivedEvent", &TransactionReceivedEvent{
Body: testTxData,
Peer: peerA,
}).Twice()
eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerA}).Once()
eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerB}).Once()
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerA}).Once()
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerB}).Once()
mgrA.SendTransaction(testTxData)
time.Sleep(1 * time.Second) // wait a bit between the sends, to test timeouts
......@@ -222,41 +223,94 @@ func TestBroadcast(t *testing.T) {
// B -> A <- C
go func() {
defer wg.Done()
err := mgrA.addNeighbor(peerB, mgrA.trans.AcceptPeer)
err := mgrA.AddInbound(peerB)
assert.NoError(t, err)
}()
go func() {
defer wg.Done()
err := mgrA.addNeighbor(peerC, mgrA.trans.AcceptPeer)
err := mgrA.AddInbound(peerC)
assert.NoError(t, err)
}()
time.Sleep(graceTime)
go func() {
defer wg.Done()
err := mgrB.addNeighbor(peerA, mgrB.trans.DialPeer)
err := mgrB.AddOutbound(peerA)
assert.NoError(t, err)
}()
go func() {
defer wg.Done()
err := mgrC.addNeighbor(peerA, mgrC.trans.DialPeer)
err := mgrC.AddOutbound(peerA)
assert.NoError(t, err)
}()
// wait for the connections to establish
wg.Wait()
eventMock.On("newTransactionEvent", &NewTransactionEvent{
eventMock.On("transactionReceivedEvent", &TransactionReceivedEvent{
Body: testTxData,
Peer: peerA,
}).Twice()
eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerA}).Twice()
eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerB}).Once()
eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerC}).Once()
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerA}).Twice()
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerB}).Once()
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerC}).Once()
mgrA.SendTransaction(testTxData)
time.Sleep(graceTime)
}
func TestSingleSend(t *testing.T) {
defer assertEvents(t)()
mgrA, closeA, peerA := newTest(t, "A")
defer closeA()
mgrB, closeB, peerB := newTest(t, "B")
defer closeB()
mgrC, closeC, peerC := newTest(t, "C")
defer closeC()
var wg sync.WaitGroup
wg.Add(4)
// connect in the following way
// B -> A <- C
go func() {
defer wg.Done()
err := mgrA.AddInbound(peerB)
assert.NoError(t, err)
}()
go func() {
defer wg.Done()
err := mgrA.AddInbound(peerC)
assert.NoError(t, err)
}()
time.Sleep(graceTime)
go func() {
defer wg.Done()
err := mgrB.AddOutbound(peerA)
assert.NoError(t, err)
}()
go func() {
defer wg.Done()
err := mgrC.AddOutbound(peerA)
assert.NoError(t, err)
}()
// wait for the connections to establish
wg.Wait()
eventMock.On("transactionReceivedEvent", &TransactionReceivedEvent{
Body: testTxData,
Peer: peerA,
}).Once()
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerA}).Twice()
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerB}).Once()
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerC}).Once()
// A sends the transaction only to B
mgrA.SendTransaction(testTxData, peerB.ID())
time.Sleep(graceTime)
}
func TestDropUnsuccessfulAccept(t *testing.T) {
defer assertEvents(t)()
......@@ -265,11 +319,11 @@ func TestDropUnsuccessfulAccept(t *testing.T) {
_, closeB, peerB := newTest(t, "B")
defer closeB()
eventMock.On("dropNeighborEvent", &DropNeighborEvent{
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{
Peer: peerB,
}).Once()
err := mgrA.addNeighbor(peerB, mgrA.trans.AcceptPeer)
err := mgrA.AddInbound(peerB)
assert.Error(t, err)
}
......@@ -288,27 +342,29 @@ func TestTxRequest(t *testing.T) {
// B -> A
go func() {
defer wg.Done()
err := mgrA.addNeighbor(peerB, mgrA.trans.AcceptPeer)
err := mgrA.AddInbound(peerB)
assert.NoError(t, err)
}()
time.Sleep(graceTime)
go func() {
defer wg.Done()
err := mgrB.addNeighbor(peerA, mgrB.trans.DialPeer)
err := mgrB.AddOutbound(peerA)
assert.NoError(t, err)
}()
// wait for the connections to establish
wg.Wait()
eventMock.On("newTransactionEvent", &NewTransactionEvent{
txHash := []byte("Hello!")
eventMock.On("transactionReceivedEvent", &TransactionReceivedEvent{
Body: testTxData,
Peer: peerB,
}).Once()
eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerA}).Once()
eventMock.On("dropNeighborEvent", &DropNeighborEvent{Peer: peerB}).Once()
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerA}).Once()
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerB}).Once()
b, err := proto.Marshal(&pb.TransactionRequest{Hash: []byte("Hello!")})
b, err := proto.Marshal(&pb.TransactionRequest{Hash: txHash})
require.NoError(t, err)
mgrA.RequestTransaction(b)
time.Sleep(graceTime)
......
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: packages/gossip/proto/message.proto
// source: proto/message.proto
package proto
......@@ -32,7 +32,7 @@ func (m *Transaction) Reset() { *m = Transaction{} }
func (m *Transaction) String() string { return proto.CompactTextString(m) }
func (*Transaction) ProtoMessage() {}
func (*Transaction) Descriptor() ([]byte, []int) {
return fileDescriptor_fcce9e84825f2fa5, []int{0}
return fileDescriptor_33f3a5e1293a7bcd, []int{0}
}
func (m *Transaction) XXX_Unmarshal(b []byte) error {
......@@ -72,7 +72,7 @@ func (m *TransactionRequest) Reset() { *m = TransactionRequest{} }
func (m *TransactionRequest) String() string { return proto.CompactTextString(m) }
func (*TransactionRequest) ProtoMessage() {}
func (*TransactionRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_fcce9e84825f2fa5, []int{1}
return fileDescriptor_33f3a5e1293a7bcd, []int{1}
}
func (m *TransactionRequest) XXX_Unmarshal(b []byte) error {
......@@ -105,20 +105,17 @@ func init() {
proto.RegisterType((*TransactionRequest)(nil), "proto.TransactionRequest")
}
func init() {
proto.RegisterFile("packages/gossip/proto/message.proto", fileDescriptor_fcce9e84825f2fa5)
}
var fileDescriptor_fcce9e84825f2fa5 = []byte{
// 157 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x8e, 0x31, 0x0b, 0xc2, 0x30,
0x10, 0x46, 0x29, 0xa8, 0x43, 0x74, 0xca, 0xe4, 0xa8, 0x75, 0xe9, 0xd4, 0x0c, 0x22, 0xee, 0xfe,
0x84, 0xe2, 0xe4, 0x76, 0x49, 0x8f, 0x24, 0x68, 0x7a, 0x31, 0x97, 0x0e, 0xfe, 0x7b, 0x49, 0x40,
0x70, 0xe8, 0xf4, 0xbd, 0x0f, 0xde, 0xf0, 0xc4, 0x29, 0x82, 0x79, 0x82, 0x45, 0x56, 0x96, 0x98,
0x7d, 0x54, 0x31, 0x51, 0x26, 0x15, 0x90, 0x19, 0x2c, 0xf6, 0xf5, 0xc9, 0x75, 0x9d, 0xf6, 0x28,
0xb6, 0xf7, 0x04, 0x13, 0x83, 0xc9, 0x9e, 0x26, 0x29, 0xc5, 0x4a, 0xd3, 0xf8, 0xd9, 0x37, 0x87,
0xa6, 0xdb, 0x0d, 0x95, 0xdb, 0x4e, 0xc8, 0x3f, 0x65, 0xc0, 0xf7, 0x8c, 0x9c, 0x8b, 0xe9, 0x80,
0xdd, 0xcf, 0x2c, 0x7c, 0xbb, 0x3e, 0x2e, 0xd6, 0x67, 0x37, 0xeb, 0xde, 0x50, 0x50, 0x9e, 0x32,
0xbc, 0x70, 0xb4, 0x98, 0x4a, 0x87, 0xf3, 0x21, 0x60, 0x52, 0x8b, 0x69, 0x7a, 0x53, 0xe7, 0xfc,
0x0d, 0x00, 0x00, 0xff, 0xff, 0xd7, 0x71, 0x33, 0x9a, 0xba, 0x00, 0x00, 0x00,
func init() { proto.RegisterFile("proto/message.proto", fileDescriptor_33f3a5e1293a7bcd) }
var fileDescriptor_33f3a5e1293a7bcd = []byte{
// 137 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0x12, 0x2e, 0x28, 0xca, 0x2f,
0xc9, 0xd7, 0xcf, 0x4d, 0x2d, 0x2e, 0x4e, 0x4c, 0x4f, 0xd5, 0x03, 0xf3, 0x84, 0x58, 0xc1, 0x94,
0x92, 0x22, 0x17, 0x77, 0x48, 0x51, 0x62, 0x5e, 0x71, 0x62, 0x72, 0x49, 0x66, 0x7e, 0x9e, 0x90,
0x10, 0x17, 0x4b, 0x52, 0x7e, 0x4a, 0xa5, 0x04, 0xa3, 0x02, 0xa3, 0x06, 0x4f, 0x10, 0x98, 0xad,
0xa4, 0xc1, 0x25, 0x84, 0xa4, 0x24, 0x28, 0xb5, 0xb0, 0x34, 0xb5, 0xb8, 0x04, 0xa4, 0x32, 0x23,
0xb1, 0x38, 0x03, 0xa6, 0x12, 0xc4, 0x76, 0x52, 0x8e, 0x52, 0x4c, 0xcf, 0x2c, 0xc9, 0x28, 0x4d,
0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x4f, 0x4e, 0x2c, 0xc8, 0x2f, 0x2e, 0x4e, 0xcd, 0x49, 0xd5, 0x4f,
0x07, 0xd2, 0x99, 0x05, 0xfa, 0x60, 0x1b, 0x93, 0xd8, 0xc0, 0x94, 0x31, 0x20, 0x00, 0x00, 0xff,
0xff, 0x34, 0x46, 0xa5, 0x0f, 0x96, 0x00, 0x00, 0x00,
}
......@@ -4,9 +4,9 @@ import (
"bytes"
"time"
pb "github.com/iotaledger/goshimmer/packages/gossip/transport/proto"
"github.com/golang/protobuf/proto"
"github.com/iotaledger/autopeering-sim/server"
pb "github.com/iotaledger/goshimmer/packages/gossip/transport/proto"
)
const (
......
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: packages/gossip/transport/proto/handshake.proto
// source: transport/proto/handshake.proto
package proto
......@@ -38,7 +38,7 @@ func (m *HandshakeRequest) Reset() { *m = HandshakeRequest{} }
func (m *HandshakeRequest) String() string { return proto.CompactTextString(m) }
func (*HandshakeRequest) ProtoMessage() {}
func (*HandshakeRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_f9e96b60881ea276, []int{0}
return fileDescriptor_d7101ffe19b05443, []int{0}
}
func (m *HandshakeRequest) XXX_Unmarshal(b []byte) error {
......@@ -99,7 +99,7 @@ func (m *HandshakeResponse) Reset() { *m = HandshakeResponse{} }
func (m *HandshakeResponse) String() string { return proto.CompactTextString(m) }
func (*HandshakeResponse) ProtoMessage() {}
func (*HandshakeResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_f9e96b60881ea276, []int{1}
return fileDescriptor_d7101ffe19b05443, []int{1}
}
func (m *HandshakeResponse) XXX_Unmarshal(b []byte) error {
......@@ -132,24 +132,21 @@ func init() {
proto.RegisterType((*HandshakeResponse)(nil), "proto.HandshakeResponse")
}
func init() {
proto.RegisterFile("packages/gossip/transport/proto/handshake.proto", fileDescriptor_f9e96b60881ea276)
}
var fileDescriptor_f9e96b60881ea276 = []byte{
// 219 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x8f, 0x31, 0x4f, 0xc3, 0x30,
0x10, 0x85, 0x95, 0xb4, 0x50, 0x6a, 0x01, 0x02, 0x4f, 0x41, 0x62, 0x88, 0x3a, 0x65, 0x8a, 0x07,
0x7e, 0x00, 0x82, 0xa9, 0xb3, 0x47, 0x16, 0x74, 0x6d, 0x8f, 0xd8, 0x2a, 0xf6, 0x39, 0x77, 0x57,
0x7e, 0x3f, 0xc2, 0x52, 0x05, 0x1b, 0xd3, 0xbb, 0xf7, 0x6e, 0xf8, 0xf4, 0x19, 0x57, 0x60, 0x7f,
0x84, 0x09, 0xc5, 0x4d, 0x24, 0x12, 0x8b, 0x53, 0x86, 0x2c, 0x85, 0x58, 0x5d, 0x61, 0x52, 0x72,
0x01, 0xf2, 0x41, 0x02, 0x1c, 0x71, 0xac, 0xdd, 0x5e, 0xd4, 0xd8, 0x64, 0x73, 0xb7, 0x3d, 0x7f,
0x3c, 0xce, 0x27, 0x14, 0xb5, 0x9d, 0x59, 0x7d, 0x21, 0x4b, 0xa4, 0xdc, 0x35, 0x7d, 0x33, 0xdc,
0xf8, 0x73, 0xb5, 0xd6, 0x2c, 0x3f, 0x98, 0x52, 0xd7, 0xf6, 0xcd, 0xb0, 0xf6, 0xf5, 0xb6, 0xb7,
0xa6, 0x55, 0xea, 0x16, 0x75, 0x69, 0x95, 0xec, 0xa3, 0x59, 0x6b, 0x4c, 0x28, 0x0a, 0xa9, 0x74,
0xcb, 0xbe, 0x19, 0x16, 0xfe, 0x77, 0xd8, 0x8c, 0xe6, 0xfe, 0x0f, 0x4f, 0x0a, 0x65, 0x41, 0xfb,
0x60, 0xae, 0x18, 0xe7, 0xf7, 0x00, 0x12, 0x2a, 0xf1, 0xda, 0xaf, 0x18, 0xe7, 0x2d, 0x48, 0x78,
0x7d, 0x79, 0x7b, 0x9e, 0xa2, 0x86, 0xd3, 0x6e, 0xdc, 0x53, 0x72, 0x91, 0x14, 0x3e, 0xf1, 0x30,
0x21, 0xff, 0x68, 0x86, 0x98, 0x12, 0xf2, 0x7f, 0xe6, 0xbb, 0xcb, 0x1a, 0x4f, 0xdf, 0x01, 0x00,
0x00, 0xff, 0xff, 0x2a, 0x53, 0xc3, 0xbb, 0x23, 0x01, 0x00, 0x00,
func init() { proto.RegisterFile("transport/proto/handshake.proto", fileDescriptor_d7101ffe19b05443) }
var fileDescriptor_d7101ffe19b05443 = []byte{
// 203 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x5c, 0x8f, 0x3f, 0x4b, 0x04, 0x31,
0x10, 0xc5, 0xb9, 0x3f, 0x7a, 0xde, 0xa0, 0xa2, 0xa9, 0x22, 0x08, 0xca, 0x55, 0x82, 0xb8, 0x29,
0xfc, 0x06, 0x56, 0x57, 0xa7, 0xb4, 0x91, 0xec, 0x3a, 0x6e, 0x82, 0x26, 0x93, 0xcd, 0x64, 0xfd,
0xfc, 0x0e, 0x81, 0x45, 0xb1, 0x7a, 0xef, 0xf7, 0x0b, 0xe4, 0x25, 0x70, 0x57, 0x8b, 0x4b, 0x9c,
0xa9, 0x54, 0x93, 0x0b, 0x55, 0x32, 0xde, 0xa5, 0x77, 0xf6, 0xee, 0x13, 0xbb, 0xc6, 0xea, 0xa4,
0xc5, 0x21, 0xc1, 0xd5, 0x71, 0x39, 0xb1, 0x38, 0xcd, 0xc8, 0x55, 0x69, 0xd8, 0x7d, 0x63, 0xe1,
0x40, 0x49, 0xaf, 0xee, 0x57, 0x0f, 0x17, 0x76, 0x41, 0xa5, 0x60, 0xfb, 0x51, 0x28, 0xea, 0xb5,
0xe8, 0xbd, 0x6d, 0x5d, 0x5d, 0xc2, 0xba, 0x92, 0xde, 0x34, 0x23, 0x4d, 0xdd, 0xc2, 0xbe, 0x86,
0x28, 0xf7, 0xb8, 0x98, 0xf5, 0x56, 0xf4, 0xc6, 0xfe, 0x8a, 0x43, 0x07, 0xd7, 0x7f, 0xf6, 0xe4,
0x81, 0x89, 0x51, 0xdd, 0xc0, 0x59, 0xc1, 0xe9, 0xcd, 0x3b, 0xf6, 0x6d, 0xf1, 0xdc, 0xee, 0x84,
0x8f, 0x82, 0x2f, 0x4f, 0xaf, 0x8f, 0x63, 0xa8, 0x7e, 0xee, 0xbb, 0x81, 0xa2, 0x19, 0x5c, 0x26,
0x66, 0xfc, 0x42, 0x33, 0x4a, 0x86, 0x6c, 0xfe, 0xfd, 0xb2, 0x3f, 0x6d, 0xf1, 0xfc, 0x13, 0x00,
0x00, 0xff, 0xff, 0x51, 0xe0, 0x08, 0xd0, 0xff, 0x00, 0x00, 0x00,
}
......@@ -54,7 +54,7 @@ func Start(tps uint) {
mtx := &pb.Transaction{Body: tx.MetaTransaction.GetBytes()}
b, _ := proto.Marshal(mtx)
gossip.Events.NewTransaction.Trigger(&gossip.NewTransactionEvent{Body: b, Peer: &local.INSTANCE.Peer})
gossip.Events.TransactionReceived.Trigger(&gossip.TransactionReceivedEvent{Body: b, Peer: &local.INSTANCE.Peer})
if sentCounter >= tps {
duration := time.Since(start)
......
......@@ -26,7 +26,7 @@ func run(plugin *node.Plugin) {
}
func configureLogging(plugin *node.Plugin) {
gossip.Events.DropNeighbor.Attach(events.NewClosure(func(ev *gossip.DropNeighborEvent) {
gossip.Events.NeighborDropped.Attach(events.NewClosure(func(ev *gossip.NeighborDroppedEvent) {
log.Info("neighbor dropped: " + ev.Peer.Address() + " / " + ev.Peer.ID().String())
if Selection != nil {
Selection.DropPeer(ev.Peer)
......
......@@ -14,7 +14,7 @@ var PLUGIN = node.NewPlugin("Metrics", node.Enabled, configure, run)
func configure(plugin *node.Plugin) {
// increase received TPS counter whenever we receive a new transaction
gossip.Events.NewTransaction.Attach(events.NewClosure(func(_ *gossip.NewTransactionEvent) { increaseReceivedTPSCounter() }))
gossip.Events.TransactionReceived.Attach(events.NewClosure(func(_ *gossip.TransactionReceivedEvent) { increaseReceivedTPSCounter() }))
}
func run(plugin *node.Plugin) {
......
......@@ -23,7 +23,7 @@ var receivedTps uint64
var solidTps uint64
var PLUGIN = node.NewPlugin("Statusscreen TPS", node.Enabled, func(plugin *node.Plugin) {
gossip.Events.NewTransaction.Attach(events.NewClosure(func(_ *gossip.NewTransactionEvent) {
gossip.Events.TransactionReceived.Attach(events.NewClosure(func(_ *gossip.TransactionReceivedEvent) {
atomic.AddUint64(&receivedTpsCounter, 1)
}))
......
......@@ -29,7 +29,7 @@ func configureSolidifier(plugin *node.Plugin) {
task.Return(nil)
}, workerpool.WorkerCount(WORKER_COUNT), workerpool.QueueSize(10000))
gossip.Events.NewTransaction.Attach(events.NewClosure(func(ev *gossip.NewTransactionEvent) {
gossip.Events.TransactionReceived.Attach(events.NewClosure(func(ev *gossip.TransactionReceivedEvent) {
//log.Info("New Transaction", ev.Body)
pTx := &pb.Transaction{}
proto.Unmarshal(ev.Body, pTx)
......
......@@ -47,19 +47,19 @@ func TestSolidifier(t *testing.T) {
wg.Add(4)
tx := &pb.Transaction{Body: transaction1.MetaTransaction.GetBytes()}
b, _ := proto.Marshal(tx)
gossip.Events.NewTransaction.Trigger(&gossip.NewTransactionEvent{Body: b})
gossip.Events.TransactionReceived.Trigger(&gossip.TransactionReceivedEvent{Body: b})
tx = &pb.Transaction{Body: transaction2.MetaTransaction.GetBytes()}
b, _ = proto.Marshal(tx)
gossip.Events.NewTransaction.Trigger(&gossip.NewTransactionEvent{Body: b})
gossip.Events.TransactionReceived.Trigger(&gossip.TransactionReceivedEvent{Body: b})
tx = &pb.Transaction{Body: transaction3.MetaTransaction.GetBytes()}
b, _ = proto.Marshal(tx)
gossip.Events.NewTransaction.Trigger(&gossip.NewTransactionEvent{Body: b})
gossip.Events.TransactionReceived.Trigger(&gossip.TransactionReceivedEvent{Body: b})
tx = &pb.Transaction{Body: transaction4.MetaTransaction.GetBytes()}
b, _ = proto.Marshal(tx)
gossip.Events.NewTransaction.Trigger(&gossip.NewTransactionEvent{Body: b})
gossip.Events.TransactionReceived.Trigger(&gossip.TransactionReceivedEvent{Body: b})
// wait until all are solid
wg.Wait()
......
......@@ -34,7 +34,7 @@ func configure(plugin *node.Plugin) {
return c.JSON(http.StatusOK, tpsQueue)
})
gossip.Events.NewTransaction.Attach(events.NewClosure(func(_ *gossip.NewTransactionEvent) {
gossip.Events.TransactionReceived.Attach(events.NewClosure(func(_ *gossip.TransactionReceivedEvent) {
atomic.AddUint64(&receivedTpsCounter, 1)
}))
tangle.Events.TransactionSolid.Attach(events.NewClosure(func(_ *value_transaction.ValueTransaction) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment