Skip to content
Snippets Groups Projects
manager_test.go 8.43 KiB
package gossip

import (
	"log"
	"sync"
	"testing"
	"time"

	"github.com/golang/protobuf/proto"
	"github.com/iotaledger/autopeering-sim/peer"
	"github.com/iotaledger/autopeering-sim/peer/service"
	pb "github.com/iotaledger/goshimmer/packages/gossip/proto"
	"github.com/iotaledger/goshimmer/packages/gossip/transport"
	"github.com/iotaledger/hive.go/events"
	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/mock"
	"github.com/stretchr/testify/require"
	"go.uber.org/zap"
)

const graceTime = 10 * time.Millisecond

var (
	logger    *zap.SugaredLogger
	eventMock mock.Mock

	testTxData = []byte("testTx")
)

func transactionReceivedEvent(ev *TransactionReceivedEvent) { eventMock.Called(ev) }
func neighborDroppedEvent(ev *NeighborDroppedEvent)         { eventMock.Called(ev) }

// assertEvents initializes the mock and asserts the expectations
func assertEvents(t *testing.T) func() {
	eventMock = mock.Mock{}
	return func() {
		if !t.Failed() {
			eventMock.AssertExpectations(t)
		}
	}
}

func init() {
	l, err := zap.NewDevelopment()
	if err != nil {
		log.Fatalf("cannot initialize logger: %v", err)
	}
	logger = l.Sugar()

	// mock the events triggered by the gossip
	Events.TransactionReceived.Attach(events.NewClosure(transactionReceivedEvent))
	Events.NeighborDropped.Attach(events.NewClosure(neighborDroppedEvent))
}

func getTestTransaction([]byte) ([]byte, error) {
	return testTxData, nil
}

func newTest(t require.TestingT, name string) (*Manager, func(), *peer.Peer) {
	l := logger.Named(name)
	db := peer.NewMemoryDB(l.Named("db"))
	local, err := peer.NewLocal("peering", name, db)
	require.NoError(t, err)
	require.NoError(t, local.UpdateService(service.GossipKey, "tcp", "localhost:0"))

	trans, err := transport.Listen(local, l)
	require.NoError(t, err)

	mgr := NewManager(trans, l, getTestTransaction)
	// update the service with the actual address
	require.NoError(t, local.UpdateService(service.GossipKey, trans.LocalAddr().Network(), trans.LocalAddr().String()))

	teardown := func() {
		mgr.Close()
		trans.Close()
		db.Close()
	}
	return mgr, teardown, &local.Peer
}

func TestClose(t *testing.T) {
	defer assertEvents(t)()

	_, teardown, _ := newTest(t, "A")
	teardown()
}

func TestClosedConnection(t *testing.T) {
	defer assertEvents(t)()

	mgrA, closeA, peerA := newTest(t, "A")
	defer closeA()
	mgrB, closeB, peerB := newTest(t, "B")
	defer closeB()

	var wg sync.WaitGroup
	wg.Add(2)

	// connect in the following way
	// B -> A
	go func() {
		defer wg.Done()
		err := mgrA.AddInbound(peerB)
		assert.NoError(t, err)
	}()
	time.Sleep(graceTime)
	go func() {
		defer wg.Done()
		err := mgrB.AddOutbound(peerA)
		assert.NoError(t, err)
	}()

	// wait for the connections to establish
	wg.Wait()

	eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerA}).Once()
	eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerB}).Once()

	// A drops B
	err := mgrA.DropNeighbor(peerB.ID())
	require.NoError(t, err)
	time.Sleep(graceTime)

	// the events should be there even before we close
	eventMock.AssertExpectations(t)
}

func TestP2PSend(t *testing.T) {
	defer assertEvents(t)()

	mgrA, closeA, peerA := newTest(t, "A")
	defer closeA()
	mgrB, closeB, peerB := newTest(t, "B")
	defer closeB()

	var wg sync.WaitGroup
	wg.Add(2)

	// connect in the following way
	// B -> A
	go func() {
		defer wg.Done()
		err := mgrA.AddInbound(peerB)
		assert.NoError(t, err)
	}()
	time.Sleep(graceTime)
	go func() {
		defer wg.Done()
		err := mgrB.AddOutbound(peerA)
		assert.NoError(t, err)
	}()

	// wait for the connections to establish
	wg.Wait()

	eventMock.On("transactionReceivedEvent", &TransactionReceivedEvent{
		Body: testTxData,
		Peer: peerA,
	}).Once()
	eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerA}).Once()
	eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerB}).Once()

	mgrA.SendTransaction(testTxData)
	time.Sleep(graceTime)
}

func TestP2PSendTwice(t *testing.T) {
	defer assertEvents(t)()

	mgrA, closeA, peerA := newTest(t, "A")
	defer closeA()
	mgrB, closeB, peerB := newTest(t, "B")
	defer closeB()

	var wg sync.WaitGroup
	wg.Add(2)

	// connect in the following way
	// B -> A
	go func() {
		defer wg.Done()
		err := mgrA.AddInbound(peerB)
		assert.NoError(t, err)
	}()
	time.Sleep(graceTime)
	go func() {
		defer wg.Done()
		err := mgrB.AddOutbound(peerA)
		assert.NoError(t, err)
	}()

	// wait for the connections to establish
	wg.Wait()

	eventMock.On("transactionReceivedEvent", &TransactionReceivedEvent{
		Body: testTxData,
		Peer: peerA,
	}).Twice()
	eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerA}).Once()
	eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerB}).Once()

	mgrA.SendTransaction(testTxData)
	time.Sleep(1 * time.Second) // wait a bit between the sends, to test timeouts
	mgrA.SendTransaction(testTxData)
	time.Sleep(graceTime)
}

func TestBroadcast(t *testing.T) {
	defer assertEvents(t)()

	mgrA, closeA, peerA := newTest(t, "A")
	defer closeA()
	mgrB, closeB, peerB := newTest(t, "B")
	defer closeB()
	mgrC, closeC, peerC := newTest(t, "C")
	defer closeC()

	var wg sync.WaitGroup
	wg.Add(4)

	// connect in the following way
	// B -> A <- C
	go func() {
		defer wg.Done()
		err := mgrA.AddInbound(peerB)
		assert.NoError(t, err)
	}()
	go func() {
		defer wg.Done()
		err := mgrA.AddInbound(peerC)
		assert.NoError(t, err)
	}()
	time.Sleep(graceTime)
	go func() {
		defer wg.Done()
		err := mgrB.AddOutbound(peerA)
		assert.NoError(t, err)
	}()
	go func() {
		defer wg.Done()
		err := mgrC.AddOutbound(peerA)
		assert.NoError(t, err)
	}()

	// wait for the connections to establish
	wg.Wait()

	eventMock.On("transactionReceivedEvent", &TransactionReceivedEvent{
		Body: testTxData,
		Peer: peerA,
	}).Twice()
	eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerA}).Twice()
	eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerB}).Once()
	eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerC}).Once()

	mgrA.SendTransaction(testTxData)
	time.Sleep(graceTime)
}

func TestSingleSend(t *testing.T) {
	defer assertEvents(t)()

	mgrA, closeA, peerA := newTest(t, "A")
	defer closeA()
	mgrB, closeB, peerB := newTest(t, "B")
	defer closeB()
	mgrC, closeC, peerC := newTest(t, "C")
	defer closeC()

	var wg sync.WaitGroup
	wg.Add(4)

	// connect in the following way
	// B -> A <- C
	go func() {
		defer wg.Done()
		err := mgrA.AddInbound(peerB)
		assert.NoError(t, err)
	}()
	go func() {
		defer wg.Done()
		err := mgrA.AddInbound(peerC)
		assert.NoError(t, err)
	}()
	time.Sleep(graceTime)
	go func() {
		defer wg.Done()
		err := mgrB.AddOutbound(peerA)
		assert.NoError(t, err)
	}()
	go func() {
		defer wg.Done()
		err := mgrC.AddOutbound(peerA)
		assert.NoError(t, err)
	}()

	// wait for the connections to establish
	wg.Wait()

	eventMock.On("transactionReceivedEvent", &TransactionReceivedEvent{
		Body: testTxData,
		Peer: peerA,
	}).Once()
	eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerA}).Twice()
	eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerB}).Once()
	eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerC}).Once()

	// A sends the transaction only to B
	mgrA.SendTransaction(testTxData, peerB.ID())
	time.Sleep(graceTime)
}

func TestDropUnsuccessfulAccept(t *testing.T) {
	defer assertEvents(t)()

	mgrA, closeA, _ := newTest(t, "A")
	defer closeA()
	_, closeB, peerB := newTest(t, "B")
	defer closeB()

	eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{
		Peer: peerB,
	}).Once()

	err := mgrA.AddInbound(peerB)
	assert.Error(t, err)
}

func TestTxRequest(t *testing.T) {
	defer assertEvents(t)()

	mgrA, closeA, peerA := newTest(t, "A")
	defer closeA()
	mgrB, closeB, peerB := newTest(t, "B")
	defer closeB()

	var wg sync.WaitGroup
	wg.Add(2)

	// connect in the following way
	// B -> A
	go func() {
		defer wg.Done()
		err := mgrA.AddInbound(peerB)
		assert.NoError(t, err)
	}()
	time.Sleep(graceTime)
	go func() {
		defer wg.Done()
		err := mgrB.AddOutbound(peerA)
		assert.NoError(t, err)
	}()

	// wait for the connections to establish
	wg.Wait()

	txHash := []byte("Hello!")

	eventMock.On("transactionReceivedEvent", &TransactionReceivedEvent{
		Body: testTxData,
		Peer: peerB,
	}).Once()
	eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerA}).Once()
	eventMock.On("neighborDroppedEvent", &NeighborDroppedEvent{Peer: peerB}).Once()

	b, err := proto.Marshal(&pb.TransactionRequest{Hash: txHash})
	require.NoError(t, err)
	mgrA.RequestTransaction(b)
	time.Sleep(graceTime)
}