Skip to content
Snippets Groups Projects
Unverified Commit e7e4f19e authored by Wolfgang Welz's avatar Wolfgang Welz Committed by GitHub
Browse files

Fix potential race conditions in gossip (#94)

* fix: use queue for neighbor connection

* refactor: do not request transactions via event

* fix: remove unused connection

* chore: fix linter warnings

* feat: improve gossip events and logs

* fix: log after connection is closed
parent 312b67af
Branches
Tags
No related merge requests found
Showing
with 644 additions and 322 deletions
......@@ -28,17 +28,17 @@ func init() {
// newTest creates a new discovery server and also returns the teardown.
func newTest(t require.TestingT, trans transport.Transport, logger *logger.Logger, masters ...*peer.Peer) (*server.Server, *Protocol, func()) {
log := logger.Named(trans.LocalAddr().String())
db := peer.NewMemoryDB(log.Named("db"))
l := logger.Named(trans.LocalAddr().String())
db := peer.NewMemoryDB(l.Named("db"))
local, err := peer.NewLocal(trans.LocalAddr().Network(), trans.LocalAddr().String(), db)
require.NoError(t, err)
cfg := Config{
Log: log,
Log: l,
MasterPeers: masters,
}
prot := New(local, cfg)
srv := server.Listen(local, trans, log.Named("srv"), prot)
srv := server.Listen(local, trans, l.Named("srv"), prot)
prot.Start(srv)
teardown := func() {
......
......@@ -30,8 +30,8 @@ func (d dummyDiscovery) GetVerifiedPeers() []*peer.Peer { retur
// newTest creates a new neighborhood server and also returns the teardown.
func newTest(t require.TestingT, trans transport.Transport) (*server.Server, *Protocol, func()) {
log := log.Named(trans.LocalAddr().String())
db := peer.NewMemoryDB(log.Named("db"))
l := log.Named(trans.LocalAddr().String())
db := peer.NewMemoryDB(l.Named("db"))
local, err := peer.NewLocal(trans.LocalAddr().Network(), trans.LocalAddr().String(), db)
require.NoError(t, err)
......@@ -39,10 +39,10 @@ func newTest(t require.TestingT, trans transport.Transport) (*server.Server, *Pr
peerMap[local.ID()] = &local.Peer
cfg := Config{
Log: log,
Log: l,
}
prot := New(local, dummyDiscovery{}, cfg)
srv := server.Listen(local, trans, log.Named("srv"), prot)
srv := server.Listen(local, trans, l.Named("srv"), prot)
prot.Start(srv)
teardown := func() {
......@@ -130,20 +130,20 @@ func TestProtDropPeer(t *testing.T) {
// newTest creates a new server handling discover as well as neighborhood and also returns the teardown.
func newFullTest(t require.TestingT, trans transport.Transport, masterPeers ...*peer.Peer) (*server.Server, *Protocol, func()) {
log := log.Named(trans.LocalAddr().String())
db := peer.NewMemoryDB(log.Named("db"))
l := log.Named(trans.LocalAddr().String())
db := peer.NewMemoryDB(l.Named("db"))
local, err := peer.NewLocal(trans.LocalAddr().Network(), trans.LocalAddr().String(), db)
require.NoError(t, err)
discovery := discover.New(local, discover.Config{
Log: log.Named("disc"),
Log: l.Named("disc"),
MasterPeers: masterPeers,
})
selection := New(local, discovery, Config{
Log: log.Named("sel"),
Log: l.Named("sel"),
})
srv := server.Listen(local, trans, log.Named("srv"), discovery, selection)
srv := server.Listen(local, trans, l.Named("srv"), discovery, selection)
discovery.Start(srv)
selection.Start(srv)
......
......@@ -113,8 +113,8 @@ func TestSrvEncodeDecodePing(t *testing.T) {
}
func newTestServer(t require.TestingT, name string, trans transport.Transport) (*Server, func()) {
log := log.Named(name)
db := peer.NewMemoryDB(log.Named("db"))
l := log.Named(name)
db := peer.NewMemoryDB(l.Named("db"))
local, err := peer.NewLocal(trans.LocalAddr().Network(), trans.LocalAddr().String(), db)
require.NoError(t, err)
......@@ -123,7 +123,7 @@ func newTestServer(t require.TestingT, name string, trans transport.Transport) (
s, _ = salt.NewSalt(100 * time.Second)
local.SetPublicSalt(s)
srv := Listen(local, trans, log, HandlerFunc(handle))
srv := Listen(local, trans, l, HandlerFunc(handle))
teardown := func() {
srv.Close()
......
package gossip
import (
"github.com/iotaledger/goshimmer/packages/autopeering/peer"
"github.com/iotaledger/goshimmer/packages/autopeering/peer/service"
)
// IsSupported returns whether the peer supports the gossip service.
func IsSupported(p *peer.Peer) bool {
return p.Services().Get(service.GossipKey) != nil
}
// GetAddress returns the address of the gossip service.
func GetAddress(p *peer.Peer) string {
gossip := p.Services().Get(service.GossipKey)
if gossip == nil {
panic("peer does not support gossip")
}
return gossip.String()
}
......@@ -7,4 +7,5 @@ var (
ErrClosed = errors.New("manager closed")
ErrNotANeighbor = errors.New("peer is not a neighbor")
ErrDuplicateNeighbor = errors.New("peer already connected")
ErrInvalidPacket = errors.New("invalid packet")
)
......@@ -3,25 +3,23 @@ package gossip
import (
"github.com/iotaledger/goshimmer/packages/autopeering/peer"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/iota.go/trinary"
)
// Events contains all the events related to the gossip protocol.
var Events = struct {
// A NeighborDropped event is triggered when a neighbor has been dropped.
NeighborDropped *events.Event
// A ConnectionFailed event is triggered when a neighbor connection to a peer could not be established.
ConnectionFailed *events.Event
// A NeighborAdded event is triggered when a connection to a new neighbor has been established.
NeighborAdded *events.Event
// A NeighborRemoved event is triggered when a neighbor has been dropped.
NeighborRemoved *events.Event
// A TransactionReceived event is triggered when a new transaction is received by the gossip protocol.
TransactionReceived *events.Event
// A RequestTransaction should be triggered for a transaction to be requested through the gossip protocol.
RequestTransaction *events.Event
}{
NeighborDropped: events.NewEvent(neighborDropped),
ConnectionFailed: events.NewEvent(peerCaller),
NeighborAdded: events.NewEvent(neighborCaller),
NeighborRemoved: events.NewEvent(peerCaller),
TransactionReceived: events.NewEvent(transactionReceived),
RequestTransaction: events.NewEvent(requestTransaction),
}
type NeighborDroppedEvent struct {
Peer *peer.Peer
}
type TransactionReceivedEvent struct {
......@@ -29,18 +27,14 @@ type TransactionReceivedEvent struct {
Peer *peer.Peer // peer that send the transaction
}
type RequestTransactionEvent struct {
Hash trinary.Trytes // hash of the transaction to request
func peerCaller(handler interface{}, params ...interface{}) {
handler.(func(*peer.Peer))(params[0].(*peer.Peer))
}
func neighborDropped(handler interface{}, params ...interface{}) {
handler.(func(*NeighborDroppedEvent))(params[0].(*NeighborDroppedEvent))
func neighborCaller(handler interface{}, params ...interface{}) {
handler.(func(*Neighbor))(params[0].(*Neighbor))
}
func transactionReceived(handler interface{}, params ...interface{}) {
handler.(func(*TransactionReceivedEvent))(params[0].(*TransactionReceivedEvent))
}
func requestTransaction(handler interface{}, params ...interface{}) {
handler.(func(*RequestTransactionEvent))(params[0].(*RequestTransactionEvent))
}
package gossip
import (
"io"
"net"
"strings"
"sync"
"github.com/iotaledger/goshimmer/packages/autopeering/peer/service"
"github.com/golang/protobuf/proto"
"github.com/iotaledger/goshimmer/packages/autopeering/peer"
"github.com/iotaledger/goshimmer/packages/autopeering/peer/service"
pb "github.com/iotaledger/goshimmer/packages/gossip/proto"
"github.com/iotaledger/goshimmer/packages/gossip/server"
"github.com/iotaledger/hive.go/events"
"github.com/pkg/errors"
"go.uber.org/zap"
)
......@@ -32,22 +30,17 @@ type Manager struct {
mu sync.RWMutex
srv *server.TCP
neighbors map[peer.ID]*neighbor
neighbors map[peer.ID]*Neighbor
running bool
}
type neighbor struct {
peer *peer.Peer
conn *server.Connection
}
func NewManager(local *peer.Local, f GetTransaction, log *zap.SugaredLogger) *Manager {
return &Manager{
local: local,
getTransaction: f,
log: log,
srv: nil,
neighbors: make(map[peer.ID]*neighbor),
neighbors: make(map[peer.ID]*Neighbor),
running: false,
}
}
......@@ -62,15 +55,20 @@ func (m *Manager) Start(srv *server.TCP) {
// Close stops the manager and closes all established connections.
func (m *Manager) Close() {
m.stop()
m.wg.Wait()
}
func (m *Manager) stop() {
m.mu.Lock()
defer m.mu.Unlock()
m.running = false
// close all connections
for _, n := range m.neighbors {
_ = n.conn.Close()
}
m.mu.Unlock()
m.wg.Wait()
// close all neighbor connections
for _, nbr := range m.neighbors {
_ = nbr.Close()
}
}
// LocalAddr returns the public address of the gossip service.
......@@ -104,7 +102,7 @@ func (m *Manager) AddInbound(p *peer.Peer) error {
return m.addNeighbor(p, srv.AcceptPeer)
}
// NeighborDropped disconnects the neighbor with the given ID.
// NeighborRemoved disconnects the neighbor with the given ID.
func (m *Manager) DropNeighbor(id peer.ID) error {
m.mu.Lock()
defer m.mu.Unlock()
......@@ -113,9 +111,10 @@ func (m *Manager) DropNeighbor(id peer.ID) error {
}
n := m.neighbors[id]
delete(m.neighbors, id)
disconnect(n.conn)
return nil
err := n.Close()
Events.NeighborRemoved.Trigger(n.Peer)
return err
}
// RequestTransaction requests the transaction with the given hash from the neighbors.
......@@ -127,8 +126,8 @@ func (m *Manager) RequestTransaction(txHash []byte, to ...peer.ID) {
m.send(marshal(req), to...)
}
// SendTransaction sends the given transaction data to the neighbors.
// If no peer is provided, it is send to all neighbors.
// SendTransaction adds the given transaction data to the send queue of the neighbors.
// The actual send then happens asynchronously. If no peer is provided, it is send to all neighbors.
func (m *Manager) SendTransaction(txData []byte, to ...peer.ID) {
tx := &pb.Transaction{
Data: txData,
......@@ -136,16 +135,16 @@ func (m *Manager) SendTransaction(txData []byte, to ...peer.ID) {
m.send(marshal(tx), to...)
}
func (m *Manager) getNeighbors(ids ...peer.ID) []*neighbor {
func (m *Manager) getNeighbors(ids ...peer.ID) []*Neighbor {
if len(ids) > 0 {
return m.getNeighborsById(ids)
}
return m.getAllNeighbors()
}
func (m *Manager) getAllNeighbors() []*neighbor {
func (m *Manager) getAllNeighbors() []*Neighbor {
m.mu.RLock()
result := make([]*neighbor, 0, len(m.neighbors))
result := make([]*Neighbor, 0, len(m.neighbors))
for _, n := range m.neighbors {
result = append(result, n)
}
......@@ -154,8 +153,8 @@ func (m *Manager) getAllNeighbors() []*neighbor {
return result
}
func (m *Manager) getNeighborsById(ids []peer.ID) []*neighbor {
result := make([]*neighbor, 0, len(ids))
func (m *Manager) getNeighborsById(ids []peer.ID) []*Neighbor {
result := make([]*Neighbor, 0, len(ids))
m.mu.RLock()
for _, id := range ids {
......@@ -168,102 +167,74 @@ func (m *Manager) getNeighborsById(ids []peer.ID) []*neighbor {
return result
}
func (m *Manager) send(msg []byte, to ...peer.ID) {
if l := len(msg); l > maxPacketSize {
m.log.Errorw("message too large", "len", l, "max", maxPacketSize)
}
func (m *Manager) send(b []byte, to ...peer.ID) {
neighbors := m.getNeighbors(to...)
for _, nbr := range neighbors {
m.log.Debugw("Sending", "to", nbr.peer.ID(), "msg", msg)
_, err := nbr.conn.Write(msg)
if err != nil {
m.log.Debugw("send error", "err", err)
if _, err := nbr.Write(b); err != nil {
m.log.Warnw("send error", "err", err)
}
}
}
func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (*server.Connection, error)) error {
func (m *Manager) addNeighbor(peer *peer.Peer, connectorFunc func(*peer.Peer) (net.Conn, error)) error {
conn, err := connectorFunc(peer)
if err != nil {
m.log.Debugw("addNeighbor failed", "peer", peer.ID(), "err", err)
Events.NeighborDropped.Trigger(&NeighborDroppedEvent{Peer: peer})
Events.ConnectionFailed.Trigger(peer)
return err
}
m.mu.Lock()
defer m.mu.Unlock()
if !m.running {
disconnect(conn)
_ = conn.Close()
Events.ConnectionFailed.Trigger(peer)
return ErrClosed
}
if _, ok := m.neighbors[peer.ID()]; ok {
disconnect(conn)
_ = conn.Close()
Events.ConnectionFailed.Trigger(peer)
return ErrDuplicateNeighbor
}
// add the neighbor
n := &neighbor{
peer: peer,
conn: conn,
// create and add the neighbor
n := NewNeighbor(peer, conn, m.log)
n.Events.Close.Attach(events.NewClosure(func() { _ = m.DropNeighbor(peer.ID()) }))
n.Events.ReceiveData.Attach(events.NewClosure(func(data []byte) {
if err := m.handlePacket(data, peer); err != nil {
m.log.Debugw("error handling packet", "err", err)
}
}))
m.neighbors[peer.ID()] = n
go m.readLoop(n)
n.Listen()
Events.NeighborAdded.Trigger(n)
return nil
}
func (m *Manager) readLoop(nbr *neighbor) {
m.wg.Add(1)
defer m.wg.Done()
// create a buffer for the packages
b := make([]byte, maxPacketSize)
for {
n, err := nbr.conn.Read(b)
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
// ignore temporary read errors.
m.log.Debugw("temporary read error", "err", err)
continue
} else if err != nil {
// return from the loop on all other errors
if err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") {
m.log.Warnw("read error", "err", err)
}
m.log.Debugw("connection closed",
"id", nbr.peer.ID(),
"addr", nbr.conn.RemoteAddr().String(),
)
_ = nbr.conn.Close() // just make sure that the connection is closed as fast as possible
_ = m.DropNeighbor(nbr.peer.ID())
return
}
if err := m.handlePacket(b[:n], nbr); err != nil {
m.log.Warnw("failed to handle packet", "id", nbr.peer.ID(), "err", err)
}
}
func (m *Manager) handlePacket(data []byte, p *peer.Peer) error {
// ignore empty packages
if len(data) == 0 {
return nil
}
func (m *Manager) handlePacket(data []byte, n *neighbor) error {
switch pb.MType(data[0]) {
// Incoming Transaction
case pb.MTransaction:
msg := new(pb.Transaction)
if err := proto.Unmarshal(data[1:], msg); err != nil {
return errors.Wrap(err, "invalid message")
return errors.Wrap(err, "invalid packet")
}
m.log.Debugw("Received Transaction", "data", msg.GetData())
Events.TransactionReceived.Trigger(&TransactionReceivedEvent{Data: msg.GetData(), Peer: n.peer})
Events.TransactionReceived.Trigger(&TransactionReceivedEvent{Data: msg.GetData(), Peer: p})
// Incoming Transaction request
case pb.MTransactionRequest:
msg := new(pb.TransactionRequest)
if err := proto.Unmarshal(data[1:], msg); err != nil {
return errors.Wrap(err, "invalid message")
return errors.Wrap(err, "invalid packet")
}
m.log.Debugw("Received Tx Req", "data", msg.GetHash())
// do something
......@@ -272,10 +243,11 @@ func (m *Manager) handlePacket(data []byte, n *neighbor) error {
m.log.Debugw("Tx not available", "tx", msg.GetHash())
} else {
m.log.Debugw("Tx found", "tx", tx)
m.SendTransaction(tx, n.peer.ID())
m.SendTransaction(tx, p.ID())
}
default:
return ErrInvalidPacket
}
return nil
......@@ -293,8 +265,3 @@ func marshal(msg pb.Message) []byte {
}
return append([]byte{byte(mType)}, data...)
}
func disconnect(conn *server.Connection) {
_ = conn.Close()
Events.NeighborDropped.Trigger(&NeighborDroppedEvent{Peer: conn.Peer()})
}
package gossip
import (
"log"
"net"
"sync"
"testing"
......@@ -13,54 +12,25 @@ import (
pb "github.com/iotaledger/goshimmer/packages/gossip/proto"
"github.com/iotaledger/goshimmer/packages/gossip/server"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
const graceTime = 10 * time.Millisecond
var (
logger *zap.SugaredLogger
eventMock mock.Mock
log = logger.NewExampleLogger("gossip")
testTxData = []byte("testTx")
)
func transactionReceivedEvent(ev *TransactionReceivedEvent) { eventMock.Called(ev) }
func neighborDroppedEvent(ev *NeighborDroppedEvent) { eventMock.Called(ev) }
// assertEvents initializes the mock and asserts the expectations
func assertEvents(t *testing.T) func() {
eventMock = mock.Mock{}
return func() {
if !t.Failed() {
eventMock.AssertExpectations(t)
}
}
}
func init() {
l, err := zap.NewDevelopment()
if err != nil {
log.Fatalf("cannot initialize logger: %v", err)
}
logger = l.Sugar()
// mock the events triggered by the gossip
Events.TransactionReceived.Attach(events.NewClosure(transactionReceivedEvent))
Events.NeighborDropped.Attach(events.NewClosure(neighborDroppedEvent))
}
func getTestTransaction([]byte) ([]byte, error) {
return testTxData, nil
}
func getTestTransaction([]byte) ([]byte, error) { return testTxData, nil }
func getTCPAddress(t require.TestingT) string {
laddr, err := net.ResolveTCPAddr("tcp", "localhost:0")
tcpAddr, err := net.ResolveTCPAddr("tcp", "localhost:0")
require.NoError(t, err)
lis, err := net.ListenTCP("tcp", laddr)
lis, err := net.ListenTCP("tcp", tcpAddr)
require.NoError(t, err)
addr := lis.Addr().String()
......@@ -69,8 +39,8 @@ func getTCPAddress(t require.TestingT) string {
return addr
}
func newTest(t require.TestingT, name string) (*Manager, func(), *peer.Peer) {
l := logger.Named(name)
func newTestManager(t require.TestingT, name string) (*Manager, func(), *peer.Peer) {
l := log.Named(name)
db := peer.NewMemoryDB(l.Named("db"))
local, err := peer.NewLocal("peering", name, db)
require.NoError(t, err)
......@@ -89,31 +59,36 @@ func newTest(t require.TestingT, name string) (*Manager, func(), *peer.Peer) {
// start the actual gossipping
mgr.Start(srv)
teardown := func() {
detach := func() {
mgr.Close()
srv.Close()
db.Close()
}
return mgr, teardown, &local.Peer
return mgr, detach, &local.Peer
}
func TestClose(t *testing.T) {
defer assertEvents(t)()
_, detach := newEventMock(t)
defer detach()
_, teardown, _ := newTest(t, "A")
_, teardown, _ := newTestManager(t, "A")
teardown()
}
func TestClosedConnection(t *testing.T) {
defer assertEvents(t)()
e, detach := newEventMock(t)
defer detach()
mgrA, closeA, peerA := newTest(t, "A")
mgrA, closeA, peerA := newTestManager(t, "A")
defer closeA()
mgrB, closeB, peerB := newTest(t, "B")
mgrB, closeB, peerB := newTestManager(t, "B")
defer closeB()
connections := 2
e.On("neighborAdded", mock.Anything).Times(connections)
var wg sync.WaitGroup
wg.Add(2)
wg.Add(connections)
// connect in the following way
// B -> A
......@@ -132,8 +107,8 @@ func TestClosedConnection(t *testing.T) {
// wait for the connections to establish
wg.Wait()
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerA}).Once()
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerB}).Once()
e.On("neighborRemoved", peerA).Once()
e.On("neighborRemoved", peerB).Once()
// A drops B
err := mgrA.DropNeighbor(peerB.ID())
......@@ -141,19 +116,21 @@ func TestClosedConnection(t *testing.T) {
time.Sleep(graceTime)
// the events should be there even before we close
eventMock.AssertExpectations(t)
e.AssertExpectations(t)
}
func TestP2PSend(t *testing.T) {
defer assertEvents(t)()
e, detach := newEventMock(t)
defer detach()
mgrA, closeA, peerA := newTest(t, "A")
defer closeA()
mgrB, closeB, peerB := newTest(t, "B")
defer closeB()
mgrA, closeA, peerA := newTestManager(t, "A")
mgrB, closeB, peerB := newTestManager(t, "B")
connections := 2
e.On("neighborAdded", mock.Anything).Times(connections)
var wg sync.WaitGroup
wg.Add(2)
wg.Add(connections)
// connect in the following way
// B -> A
......@@ -172,27 +149,36 @@ func TestP2PSend(t *testing.T) {
// wait for the connections to establish
wg.Wait()
eventMock.On("transactionReceivedEvent", &TransactionReceivedEvent{
e.On("transactionReceived", &TransactionReceivedEvent{
Data: testTxData,
Peer: peerA,
}).Once()
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerA}).Once()
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerB}).Once()
mgrA.SendTransaction(testTxData)
time.Sleep(graceTime)
e.On("neighborRemoved", peerA).Once()
e.On("neighborRemoved", peerB).Once()
closeA()
closeB()
time.Sleep(graceTime)
e.AssertExpectations(t)
}
func TestP2PSendTwice(t *testing.T) {
defer assertEvents(t)()
e, detach := newEventMock(t)
defer detach()
mgrA, closeA, peerA := newTest(t, "A")
defer closeA()
mgrB, closeB, peerB := newTest(t, "B")
defer closeB()
mgrA, closeA, peerA := newTestManager(t, "A")
mgrB, closeB, peerB := newTestManager(t, "B")
connections := 2
e.On("neighborAdded", mock.Anything).Times(connections)
var wg sync.WaitGroup
wg.Add(2)
wg.Add(connections)
// connect in the following way
// B -> A
......@@ -211,31 +197,39 @@ func TestP2PSendTwice(t *testing.T) {
// wait for the connections to establish
wg.Wait()
eventMock.On("transactionReceivedEvent", &TransactionReceivedEvent{
e.On("transactionReceived", &TransactionReceivedEvent{
Data: testTxData,
Peer: peerA,
}).Twice()
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerA}).Once()
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerB}).Once()
mgrA.SendTransaction(testTxData)
time.Sleep(1 * time.Second) // wait a bit between the sends, to test timeouts
mgrA.SendTransaction(testTxData)
time.Sleep(graceTime)
e.On("neighborRemoved", peerA).Once()
e.On("neighborRemoved", peerB).Once()
closeA()
closeB()
time.Sleep(graceTime)
e.AssertExpectations(t)
}
func TestBroadcast(t *testing.T) {
defer assertEvents(t)()
e, detach := newEventMock(t)
defer detach()
mgrA, closeA, peerA := newTest(t, "A")
defer closeA()
mgrB, closeB, peerB := newTest(t, "B")
defer closeB()
mgrC, closeC, peerC := newTest(t, "C")
defer closeC()
mgrA, closeA, peerA := newTestManager(t, "A")
mgrB, closeB, peerB := newTestManager(t, "B")
mgrC, closeC, peerC := newTestManager(t, "C")
connections := 4
e.On("neighborAdded", mock.Anything).Times(connections)
var wg sync.WaitGroup
wg.Add(4)
wg.Add(connections)
// connect in the following way
// B -> A <- C
......@@ -264,30 +258,39 @@ func TestBroadcast(t *testing.T) {
// wait for the connections to establish
wg.Wait()
eventMock.On("transactionReceivedEvent", &TransactionReceivedEvent{
e.On("transactionReceived", &TransactionReceivedEvent{
Data: testTxData,
Peer: peerA,
}).Twice()
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerA}).Twice()
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerB}).Once()
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerC}).Once()
mgrA.SendTransaction(testTxData)
time.Sleep(graceTime)
e.On("neighborRemoved", peerA).Twice()
e.On("neighborRemoved", peerB).Once()
e.On("neighborRemoved", peerC).Once()
closeA()
closeB()
closeC()
time.Sleep(graceTime)
e.AssertExpectations(t)
}
func TestSingleSend(t *testing.T) {
defer assertEvents(t)()
e, detach := newEventMock(t)
defer detach()
mgrA, closeA, peerA := newTest(t, "A")
defer closeA()
mgrB, closeB, peerB := newTest(t, "B")
defer closeB()
mgrC, closeC, peerC := newTest(t, "C")
defer closeC()
mgrA, closeA, peerA := newTestManager(t, "A")
mgrB, closeB, peerB := newTestManager(t, "B")
mgrC, closeC, peerC := newTestManager(t, "C")
connections := 4
e.On("neighborAdded", mock.Anything).Times(connections)
var wg sync.WaitGroup
wg.Add(4)
wg.Add(connections)
// connect in the following way
// B -> A <- C
......@@ -316,45 +319,56 @@ func TestSingleSend(t *testing.T) {
// wait for the connections to establish
wg.Wait()
eventMock.On("transactionReceivedEvent", &TransactionReceivedEvent{
e.On("transactionReceived", &TransactionReceivedEvent{
Data: testTxData,
Peer: peerA,
}).Once()
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerA}).Twice()
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerB}).Once()
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerC}).Once()
// A sends the transaction only to B
mgrA.SendTransaction(testTxData, peerB.ID())
time.Sleep(graceTime)
e.On("neighborRemoved", peerA).Twice()
e.On("neighborRemoved", peerB).Once()
e.On("neighborRemoved", peerC).Once()
closeA()
closeB()
closeC()
time.Sleep(graceTime)
e.AssertExpectations(t)
}
func TestDropUnsuccessfulAccept(t *testing.T) {
defer assertEvents(t)()
e, detach := newEventMock(t)
defer detach()
mgrA, closeA, _ := newTest(t, "A")
mgrA, closeA, _ := newTestManager(t, "A")
defer closeA()
_, closeB, peerB := newTest(t, "B")
_, closeB, peerB := newTestManager(t, "B")
defer closeB()
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{
Peer: peerB,
}).Once()
e.On("connectionFailed", peerB).Once()
err := mgrA.AddInbound(peerB)
assert.Error(t, err)
e.AssertExpectations(t)
}
func TestTxRequest(t *testing.T) {
defer assertEvents(t)()
e, detach := newEventMock(t)
defer detach()
mgrA, closeA, peerA := newTest(t, "A")
defer closeA()
mgrB, closeB, peerB := newTest(t, "B")
defer closeB()
mgrA, closeA, peerA := newTestManager(t, "A")
mgrB, closeB, peerB := newTestManager(t, "B")
connections := 2
e.On("neighborAdded", mock.Anything).Times(connections)
var wg sync.WaitGroup
wg.Add(2)
wg.Add(connections)
// connect in the following way
// B -> A
......@@ -375,15 +389,53 @@ func TestTxRequest(t *testing.T) {
txHash := []byte("Hello!")
eventMock.On("transactionReceivedEvent", &TransactionReceivedEvent{
e.On("transactionReceived", &TransactionReceivedEvent{
Data: testTxData,
Peer: peerB,
}).Once()
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerA}).Once()
eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerB}).Once()
b, err := proto.Marshal(&pb.TransactionRequest{Hash: txHash})
require.NoError(t, err)
mgrA.RequestTransaction(b)
time.Sleep(graceTime)
e.On("neighborRemoved", peerA).Once()
e.On("neighborRemoved", peerB).Once()
closeA()
closeB()
time.Sleep(graceTime)
e.AssertExpectations(t)
}
func newEventMock(t mock.TestingT) (*eventMock, func()) {
e := &eventMock{}
e.Test(t)
connectionFailedC := events.NewClosure(e.connectionFailed)
neighborAddedC := events.NewClosure(e.neighborAdded)
neighborRemoved := events.NewClosure(e.neighborRemoved)
transactionReceivedC := events.NewClosure(e.transactionReceived)
Events.ConnectionFailed.Attach(connectionFailedC)
Events.NeighborAdded.Attach(neighborAddedC)
Events.NeighborRemoved.Attach(neighborRemoved)
Events.TransactionReceived.Attach(transactionReceivedC)
return e, func() {
Events.ConnectionFailed.Detach(connectionFailedC)
Events.NeighborAdded.Detach(neighborAddedC)
Events.NeighborRemoved.Detach(neighborRemoved)
Events.TransactionReceived.Detach(transactionReceivedC)
}
}
type eventMock struct {
mock.Mock
}
func (e *eventMock) connectionFailed(p *peer.Peer) { e.Called(p) }
func (e *eventMock) neighborAdded(n *Neighbor) { e.Called(n) }
func (e *eventMock) neighborRemoved(p *peer.Peer) { e.Called(p) }
func (e *eventMock) transactionReceived(ev *TransactionReceivedEvent) { e.Called(ev) }
package gossip
import (
"errors"
"io"
"net"
"strings"
"sync"
"github.com/iotaledger/goshimmer/packages/autopeering/peer"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/network"
)
var (
ErrNeighborQueueFull = errors.New("send queue is full")
)
const neighborQueueSize = 1000
type Neighbor struct {
*peer.Peer
*network.ManagedConnection
log *logger.Logger
queue chan []byte
wg sync.WaitGroup
closing chan struct{}
disconnectOnce sync.Once
}
// NewNeighbor creates a new neighbor from the provided peer and connection.
func NewNeighbor(peer *peer.Peer, conn net.Conn, log *logger.Logger) *Neighbor {
if !IsSupported(peer) {
panic("peer does not support gossip")
}
// always include ID and address with every log message
log = log.With(
"id", peer.ID(),
"network", conn.LocalAddr().Network(),
"addr", conn.RemoteAddr().String(),
)
return &Neighbor{
Peer: peer,
ManagedConnection: network.NewManagedConnection(conn),
log: log,
queue: make(chan []byte, neighborQueueSize),
closing: make(chan struct{}),
}
}
// Listen starts the communication to the neighbor.
func (n *Neighbor) Listen() {
n.wg.Add(2)
go n.readLoop()
go n.writeLoop()
n.log.Info("Connection established")
}
// Close closes the connection to the neighbor and stops all communication.
func (n *Neighbor) Close() error {
err := n.disconnect()
// wait for everything to finish
n.wg.Wait()
n.log.Infow("Connection closed",
"read", n.BytesRead,
"written", n.BytesWritten,
)
return err
}
// IsOutbound returns true if the neighbor is an outbound neighbor.
func (n *Neighbor) IsOutbound() bool {
return GetAddress(n.Peer) == n.Conn.RemoteAddr().String()
}
func (n *Neighbor) disconnect() (err error) {
n.disconnectOnce.Do(func() {
close(n.closing)
close(n.queue)
err = n.ManagedConnection.Close()
})
return
}
func (n *Neighbor) writeLoop() {
defer n.wg.Done()
for {
select {
case msg := <-n.queue:
if len(msg) == 0 {
continue
}
if _, err := n.ManagedConnection.Write(msg); err != nil {
n.log.Warn("write error", "err", err)
}
case <-n.closing:
return
}
}
}
func (n *Neighbor) readLoop() {
defer n.wg.Done()
// create a buffer for the packages
b := make([]byte, maxPacketSize)
for {
_, err := n.ManagedConnection.Read(b)
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
// ignore temporary read errors.
n.log.Debugw("temporary read error", "err", err)
continue
} else if err != nil {
// return from the loop on all other errors
if err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") {
n.log.Warnw("read error", "err", err)
}
_ = n.ManagedConnection.Close()
return
}
}
}
func (n *Neighbor) Write(b []byte) (int, error) {
l := len(b)
if l > maxPacketSize {
n.log.Errorw("message too large", "len", l, "max", maxPacketSize)
}
// add to queue
select {
case n.queue <- b:
return l, nil
default:
return 0, ErrNeighborQueueFull
}
}
package gossip
import (
"net"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/iotaledger/goshimmer/packages/autopeering/peer"
"github.com/iotaledger/goshimmer/packages/autopeering/peer/service"
"github.com/iotaledger/hive.go/events"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var testData = []byte("foobar")
func TestNeighborClose(t *testing.T) {
a, _, teardown := newPipe()
defer teardown()
n := newTestNeighbor("A", a)
n.Listen()
require.NoError(t, n.Close())
}
func TestNeighborCloseTwice(t *testing.T) {
a, _, teardown := newPipe()
defer teardown()
n := newTestNeighbor("A", a)
n.Listen()
require.NoError(t, n.Close())
require.NoError(t, n.Close())
}
func TestNeighborWriteToClosed(t *testing.T) {
a, _, teardown := newPipe()
defer teardown()
n := newTestNeighbor("A", a)
n.Listen()
require.NoError(t, n.Close())
assert.Panics(t, func() {
_, _ = n.Write(testData)
})
}
func TestNeighborWrite(t *testing.T) {
a, b, teardown := newPipe()
defer teardown()
neighborA := newTestNeighbor("A", a)
defer neighborA.Close()
neighborA.Listen()
neighborB := newTestNeighbor("B", b)
defer neighborB.Close()
var count uint32
neighborB.Events.ReceiveData.Attach(events.NewClosure(func(data []byte) {
assert.Equal(t, testData, data)
atomic.AddUint32(&count, 1)
}))
neighborB.Listen()
_, err := neighborA.Write(testData)
require.NoError(t, err)
assert.Eventually(t, func() bool { return atomic.LoadUint32(&count) == 1 }, time.Second, 10*time.Millisecond)
}
func TestNeighborParallelWrite(t *testing.T) {
a, b, teardown := newPipe()
defer teardown()
neighborA := newTestNeighbor("A", a)
defer neighborA.Close()
neighborA.Listen()
neighborB := newTestNeighbor("B", b)
defer neighborB.Close()
var count uint32
neighborB.Events.ReceiveData.Attach(events.NewClosure(func(data []byte) {
assert.Equal(t, testData, data)
atomic.AddUint32(&count, 1)
}))
neighborB.Listen()
var (
wg sync.WaitGroup
expected uint32
)
wg.Add(2)
// Writer 1
go func() {
defer wg.Done()
for i := 0; i < neighborQueueSize; i++ {
_, err := neighborA.Write(testData)
if err == ErrNeighborQueueFull {
continue
}
assert.NoError(t, err)
atomic.AddUint32(&expected, 1)
}
}()
// Writer 2
go func() {
defer wg.Done()
for i := 0; i < neighborQueueSize; i++ {
_, err := neighborA.Write(testData)
if err == ErrNeighborQueueFull {
continue
}
assert.NoError(t, err)
atomic.AddUint32(&expected, 1)
}
}()
wg.Wait()
done := func() bool {
actual := atomic.LoadUint32(&count)
return expected == actual
}
assert.Eventually(t, done, time.Second, 10*time.Millisecond)
}
func newTestNeighbor(name string, conn net.Conn) *Neighbor {
return NewNeighbor(newTestPeer(name, conn.LocalAddr()), conn, log.Named(name))
}
func newTestPeer(name string, addr net.Addr) *peer.Peer {
services := service.New()
services.Update(service.PeeringKey, addr.Network(), addr.String())
services.Update(service.GossipKey, addr.Network(), addr.String())
return peer.NewPeer([]byte(name), services)
}
func newPipe() (net.Conn, net.Conn, func()) {
a, b := net.Pipe()
teardown := func() {
_ = a.Close()
_ = b.Close()
}
return a, b, teardown
}
package server
import (
"net"
"time"
"github.com/iotaledger/goshimmer/packages/autopeering/peer"
)
// Connection represents a network connection to a neighbor peer.
type Connection struct {
net.Conn
peer *peer.Peer
}
func newConnection(c net.Conn, p *peer.Peer) *Connection {
// make sure the connection has no timeouts
_ = c.SetDeadline(time.Time{})
return &Connection{
Conn: c,
peer: p,
}
}
// Peer returns the peer associated with that connection.
func (c *Connection) Peer() *peer.Peer {
return c.peer
}
......@@ -55,7 +55,7 @@ type TCP struct {
// connect contains the result of an incoming connection.
type connect struct {
c *Connection
c net.Conn
err error
}
......@@ -133,7 +133,7 @@ func (t *TCP) LocalAddr() net.Addr {
// DialPeer establishes a gossip connection to the given peer.
// If the peer does not accept the connection or the handshake fails, an error is returned.
func (t *TCP) DialPeer(p *peer.Peer) (*Connection, error) {
func (t *TCP) DialPeer(p *peer.Peer) (net.Conn, error) {
gossipAddr := p.Services().Get(service.GossipKey)
if gossipAddr == nil {
return nil, ErrNoGossip
......@@ -153,12 +153,12 @@ func (t *TCP) DialPeer(p *peer.Peer) (*Connection, error) {
"id", p.ID(),
"addr", conn.RemoteAddr(),
)
return newConnection(conn, p), nil
return conn, nil
}
// AcceptPeer awaits an incoming connection from the given peer.
// If the peer does not establish the connection or the handshake fails, an error is returned.
func (t *TCP) AcceptPeer(p *peer.Peer) (*Connection, error) {
func (t *TCP) AcceptPeer(p *peer.Peer) (net.Conn, error) {
if p.Services().Get(service.GossipKey) == nil {
return nil, ErrNoGossip
}
......@@ -273,7 +273,7 @@ func (t *TCP) matchAccept(m *acceptMatcher, req []byte, conn net.Conn) {
t.closeConnection(conn)
return
}
m.connected <- connect{newConnection(conn, m.peer), nil}
m.connected <- connect{conn, nil}
}
func (t *TCP) listenLoop() {
......
package server
import (
"log"
"net"
"sync"
"testing"
......@@ -9,22 +8,14 @@ import (
"github.com/iotaledger/goshimmer/packages/autopeering/peer"
"github.com/iotaledger/goshimmer/packages/autopeering/peer/service"
"github.com/iotaledger/hive.go/logger"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
const graceTime = 5 * time.Millisecond
var logger *zap.SugaredLogger
func init() {
l, err := zap.NewDevelopment()
if err != nil {
log.Fatalf("cannot initialize logger: %v", err)
}
logger = l.Sugar()
}
var log = logger.NewExampleLogger("server")
func getTCPAddress(t require.TestingT) string {
laddr, err := net.ResolveTCPAddr("tcp", "localhost:0")
......@@ -39,7 +30,7 @@ func getTCPAddress(t require.TestingT) string {
}
func newTest(t require.TestingT, name string) (*TCP, func()) {
l := logger.Named(name)
l := log.Named(name)
db := peer.NewMemoryDB(l.Named("db"))
local, err := peer.NewLocal("peering", name, db)
require.NoError(t, err)
......
......@@ -2,6 +2,7 @@ package autopeering
import (
"github.com/iotaledger/goshimmer/packages/autopeering/discover"
"github.com/iotaledger/goshimmer/packages/autopeering/peer"
"github.com/iotaledger/goshimmer/packages/gossip"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
......@@ -27,11 +28,12 @@ func run(*node.Plugin) {
}
func configureEvents() {
gossip.Events.NeighborDropped.Attach(events.NewClosure(func(ev *gossip.NeighborDroppedEvent) {
log.Info("neighbor dropped: " + ev.Peer.Address() + " / " + ev.Peer.ID().String())
if Selection != nil {
Selection.DropPeer(ev.Peer)
}
// notify the selection when a connection is closed or failed.
gossip.Events.ConnectionFailed.Attach(events.NewClosure(func(p *peer.Peer) {
Selection.DropPeer(p)
}))
gossip.Events.NeighborRemoved.Attach(events.NewClosure(func(p *peer.Peer) {
Selection.DropPeer(p)
}))
discover.Events.PeerDiscovered.Attach(events.NewClosure(func(ev *discover.DiscoveredEvent) {
......
......@@ -14,6 +14,7 @@ import (
"github.com/iotaledger/goshimmer/plugins/tangle"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/typeutils"
"github.com/iotaledger/iota.go/trinary"
)
var (
......@@ -35,7 +36,7 @@ func configureGossip() {
log.Fatalf("could not update services: %v", err)
}
mgr = gp.NewManager(lPeer, getTransaction, log)
mgr = gp.NewManager(lPeer, loadTransaction, log)
}
func start(shutdownSignal <-chan struct{}) {
......@@ -56,7 +57,7 @@ func start(shutdownSignal <-chan struct{}) {
log.Info("Stopping Gossip ...")
}
func getTransaction(hash []byte) ([]byte, error) {
func loadTransaction(hash []byte) ([]byte, error) {
log.Infof("Retrieving tx: hash=%s", hash)
tx, err := tangle.GetTransaction(typeutils.BytesToString(hash))
......@@ -68,3 +69,11 @@ func getTransaction(hash []byte) ([]byte, error) {
}
return tx.GetBytes(), nil
}
func requestTransaction(hash trinary.Hash) {
if contains, _ := tangle.ContainsTransaction(hash); contains {
// Do not request tx that we already know
return
}
mgr.RequestTransaction(typeutils.StringToBytes(hash))
}
package gossip
import (
"github.com/iotaledger/goshimmer/packages/autopeering/peer/service"
"github.com/iotaledger/goshimmer/packages/autopeering/peer"
"github.com/iotaledger/goshimmer/packages/autopeering/selection"
"github.com/iotaledger/goshimmer/packages/gossip"
"github.com/iotaledger/goshimmer/packages/model/value_transaction"
......@@ -10,7 +10,6 @@ import (
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
"github.com/iotaledger/hive.go/typeutils"
)
const name = "Gossip" // name of the plugin
......@@ -32,33 +31,37 @@ func run(*node.Plugin) {
func configureEvents() {
selection.Events.Dropped.Attach(events.NewClosure(func(ev *selection.DroppedEvent) {
log.Info("neighbor removed: " + ev.DroppedID.String())
go mgr.DropNeighbor(ev.DroppedID)
go func() {
if err := mgr.DropNeighbor(ev.DroppedID); err != nil {
log.Debugw("error dropping neighbor", "id", ev.DroppedID, "err", err)
}
}()
}))
selection.Events.IncomingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) {
gossipService := ev.Peer.Services().Get(service.GossipKey)
if gossipService != nil {
log.Info("accepted neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String())
go mgr.AddInbound(ev.Peer)
go func() {
if err := mgr.AddInbound(ev.Peer); err != nil {
log.Debugw("error adding inbound", "id", ev.Peer.ID(), "err", err)
}
}()
}))
selection.Events.OutgoingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) {
gossipService := ev.Peer.Services().Get(service.GossipKey)
if gossipService != nil {
log.Info("chosen neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String())
go mgr.AddOutbound(ev.Peer)
go func() {
if err := mgr.AddOutbound(ev.Peer); err != nil {
log.Debugw("error adding outbound", "id", ev.Peer.ID(), "err", err)
}
}()
}))
tangle.Events.TransactionSolid.Attach(events.NewClosure(func(tx *value_transaction.ValueTransaction) {
log.Debugf("gossip solid tx: hash=%s", tx.GetHash())
go mgr.SendTransaction(tx.GetBytes())
gossip.Events.NeighborAdded.Attach(events.NewClosure(func(n *gossip.Neighbor) {
log.Infof("Neighbor added: %s / %s", gossip.GetAddress(n.Peer), n.ID())
}))
gossip.Events.NeighborRemoved.Attach(events.NewClosure(func(p *peer.Peer) {
log.Infof("Neighbor removed: %s / %s", gossip.GetAddress(p), p.ID())
}))
gossip.Events.RequestTransaction.Attach(events.NewClosure(func(ev *gossip.RequestTransactionEvent) {
log.Debugf("gossip tx request: hash=%s", ev.Hash)
go mgr.RequestTransaction(typeutils.StringToBytes(ev.Hash))
// gossip transactions on solidification
tangle.Events.TransactionSolid.Attach(events.NewClosure(func(tx *value_transaction.ValueTransaction) {
mgr.SendTransaction(tx.GetBytes())
}))
tangle.SetRequester(tangle.RequesterFunc(requestTransaction))
}
......@@ -44,7 +44,7 @@ func run(*node.Plugin) {
}
stopped := make(chan struct{})
err := daemon.BackgroundWorker(name+" Refresher", func(shutdown <-chan struct{}) {
if err := daemon.BackgroundWorker(name+" Refresher", func(shutdown <-chan struct{}) {
for {
select {
case <-time.After(repaintInterval):
......@@ -57,13 +57,12 @@ func run(*node.Plugin) {
return
}
}
})
if err != nil {
}); err != nil {
log.Errorf("Failed to start as daemon: %s", err)
return
}
err = daemon.BackgroundWorker(name+" App", func(<-chan struct{}) {
if err := daemon.BackgroundWorker(name+" App", func(<-chan struct{}) {
defer close(stopped)
// switch logging to status screen
......@@ -73,8 +72,7 @@ func run(*node.Plugin) {
if err := app.SetRoot(frame, true).SetFocus(frame).Run(); err != nil {
log.Errorf("Error running application: %s", err)
}
})
if err != nil {
}); err != nil {
log.Errorf("Failed to start as daemon: %s", err)
close(stopped)
}
......
......@@ -5,14 +5,12 @@ import (
"github.com/iotaledger/hive.go/events"
)
var Events = pluginEvents{
TransactionStored: events.NewEvent(transactionCaller),
TransactionSolid: events.NewEvent(transactionCaller),
}
type pluginEvents struct {
var Events = struct {
TransactionStored *events.Event
TransactionSolid *events.Event
}{
TransactionStored: events.NewEvent(transactionCaller),
TransactionSolid: events.NewEvent(transactionCaller),
}
func transactionCaller(handler interface{}, params ...interface{}) {
......
......@@ -3,6 +3,7 @@ package tangle
import (
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
"github.com/iotaledger/iota.go/trinary"
)
// region plugin module setup //////////////////////////////////////////////////////////////////////////////////////////
......@@ -24,4 +25,13 @@ func run(*node.Plugin) {
runSolidifier()
}
// Requester provides the functionality to request a transaction from the network.
type Requester interface {
RequestTransaction(hash trinary.Hash)
}
type RequesterFunc func(hash trinary.Hash)
func (f RequesterFunc) RequestTransaction(hash trinary.Hash) { f(hash) }
// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
......@@ -21,16 +21,23 @@ import (
const UnsolidInterval = 30
var (
workerCount = runtime.NumCPU()
workerPool *workerpool.WorkerPool
unsolidTxs *UnsolidTxs
requester Requester
)
func SetRequester(req Requester) {
requester = req
}
func configureSolidifier() {
workerPool = workerpool.New(func(task workerpool.Task) {
processMetaTransaction(task.Param(0).(*meta_transaction.MetaTransaction))
task.Return(nil)
}, workerpool.WorkerCount(WORKER_COUNT), workerpool.QueueSize(10000))
}, workerpool.WorkerCount(workerCount), workerpool.QueueSize(10000))
unsolidTxs = NewUnsolidTxs()
......@@ -221,8 +228,10 @@ func updateUnsolidTxs(tx *value_transaction.ValueTransaction) {
}
func requestTransaction(hash trinary.Trytes) {
log.Infof("Requesting hash: hash=%s", hash)
gossip.Events.RequestTransaction.Trigger(&gossip.RequestTransactionEvent{Hash: hash})
if requester == nil {
return
}
var WORKER_COUNT = runtime.NumCPU()
log.Infow("Requesting tx", "hash", hash)
requester.RequestTransaction(hash)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment