-
Wolfgang Welz authoredWolfgang Welz authored
manager_test.go 11.64 KiB
package gossip
import (
"net"
"sync"
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
pb "github.com/iotaledger/goshimmer/packages/gossip/proto"
"github.com/iotaledger/goshimmer/packages/gossip/server"
"github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/hive.go/autopeering/peer/service"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/kvstore/mapdb"
"github.com/iotaledger/hive.go/logger"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
const graceTime = 10 * time.Millisecond
var (
log = logger.NewExampleLogger("gossip")
testMessageData = []byte("testMsg")
)
func loadTestMessage(message.Id) ([]byte, error) { return testMessageData, nil }
func TestClose(t *testing.T) {
_, teardown, _ := newMockedManager(t, "A")
teardown()
}
func TestClosedConnection(t *testing.T) {
mgrA, closeA, peerA := newMockedManager(t, "A")
defer closeA()
mgrB, closeB, peerB := newMockedManager(t, "B")
defer closeB()
var wg sync.WaitGroup
wg.Add(2)
// connect in the following way
// B -> A
mgrA.On("neighborAdded", mock.Anything).Once()
mgrB.On("neighborAdded", mock.Anything).Once()
go func() {
defer wg.Done()
err := mgrA.AddInbound(peerB)
assert.NoError(t, err)
}()
time.Sleep(graceTime)
go func() {
defer wg.Done()
err := mgrB.AddOutbound(peerA)
assert.NoError(t, err)
}()
// wait for the connections to establish
wg.Wait()
mgrA.On("neighborRemoved", mock.Anything).Once()
mgrB.On("neighborRemoved", mock.Anything).Once()
// A drops B
err := mgrA.DropNeighbor(peerB.ID())
require.NoError(t, err)
time.Sleep(graceTime)
// the events should be there even before we close
mgrA.AssertExpectations(t)
mgrB.AssertExpectations(t)
}
func TestP2PSend(t *testing.T) {
mgrA, closeA, peerA := newMockedManager(t, "A")
mgrB, closeB, peerB := newMockedManager(t, "B")
var wg sync.WaitGroup
wg.Add(2)
// connect in the following way
// B -> A
mgrA.On("neighborAdded", mock.Anything).Once()
mgrB.On("neighborAdded", mock.Anything).Once()
go func() {
defer wg.Done()
err := mgrA.AddInbound(peerB)
assert.NoError(t, err)
}()
time.Sleep(graceTime)
go func() {
defer wg.Done()
err := mgrB.AddOutbound(peerA)
assert.NoError(t, err)
}()
// wait for the connections to establish
wg.Wait()
mgrB.On("messageReceived", &MessageReceivedEvent{
Data: testMessageData,
Peer: peerA,
}).Once()
mgrA.SendMessage(testMessageData)
time.Sleep(graceTime)
mgrA.On("neighborRemoved", mock.Anything).Once()
mgrB.On("neighborRemoved", mock.Anything).Once()
closeA()
closeB()
time.Sleep(graceTime)
// the events should be there even before we close
mgrA.AssertExpectations(t)
mgrB.AssertExpectations(t)
}
func TestP2PSendTwice(t *testing.T) {
mgrA, closeA, peerA := newMockedManager(t, "A")
mgrB, closeB, peerB := newMockedManager(t, "B")
var wg sync.WaitGroup
wg.Add(2)
// connect in the following way
// B -> A
mgrA.On("neighborAdded", mock.Anything).Once()
mgrB.On("neighborAdded", mock.Anything).Once()
go func() {
defer wg.Done()
err := mgrA.AddInbound(peerB)
assert.NoError(t, err)
}()
time.Sleep(graceTime)
go func() {
defer wg.Done()
err := mgrB.AddOutbound(peerA)
assert.NoError(t, err)
}()
// wait for the connections to establish
wg.Wait()
mgrB.On("messageReceived", &MessageReceivedEvent{
Data: testMessageData,
Peer: peerA,
}).Twice()
mgrA.SendMessage(testMessageData)
time.Sleep(1 * time.Second) // wait a bit between the sends, to test timeouts
mgrA.SendMessage(testMessageData)
time.Sleep(graceTime)
mgrA.On("neighborRemoved", mock.Anything).Once()
mgrB.On("neighborRemoved", mock.Anything).Once()
closeA()
closeB()
time.Sleep(graceTime)
// the events should be there even before we close
mgrA.AssertExpectations(t)
mgrB.AssertExpectations(t)
}
func TestBroadcast(t *testing.T) {
mgrA, closeA, peerA := newMockedManager(t, "A")
mgrB, closeB, peerB := newMockedManager(t, "B")
mgrC, closeC, peerC := newMockedManager(t, "C")
var wg sync.WaitGroup
wg.Add(4)
// connect in the following way
// B -> A <- C
mgrA.On("neighborAdded", mock.Anything).Twice()
mgrB.On("neighborAdded", mock.Anything).Once()
mgrC.On("neighborAdded", mock.Anything).Once()
go func() {
defer wg.Done()
err := mgrA.AddInbound(peerB)
assert.NoError(t, err)
}()
go func() {
defer wg.Done()
err := mgrA.AddInbound(peerC)
assert.NoError(t, err)
}()
time.Sleep(graceTime)
go func() {
defer wg.Done()
err := mgrB.AddOutbound(peerA)
assert.NoError(t, err)
}()
go func() {
defer wg.Done()
err := mgrC.AddOutbound(peerA)
assert.NoError(t, err)
}()
// wait for the connections to establish
wg.Wait()
event := &MessageReceivedEvent{Data: testMessageData, Peer: peerA}
mgrB.On("messageReceived", event).Once()
mgrC.On("messageReceived", event).Once()
mgrA.SendMessage(testMessageData)
time.Sleep(graceTime)
mgrA.On("neighborRemoved", mock.Anything).Once()
mgrA.On("neighborRemoved", mock.Anything).Once()
mgrB.On("neighborRemoved", mock.Anything).Once()
mgrC.On("neighborRemoved", mock.Anything).Once()
closeA()
closeB()
closeC()
time.Sleep(graceTime)
mgrA.AssertExpectations(t)
mgrB.AssertExpectations(t)
mgrC.AssertExpectations(t)
}
func TestSingleSend(t *testing.T) {
mgrA, closeA, peerA := newMockedManager(t, "A")
mgrB, closeB, peerB := newMockedManager(t, "B")
mgrC, closeC, peerC := newMockedManager(t, "C")
var wg sync.WaitGroup
wg.Add(4)
// connect in the following way
// B -> A <- C
mgrA.On("neighborAdded", mock.Anything).Twice()
mgrB.On("neighborAdded", mock.Anything).Once()
mgrC.On("neighborAdded", mock.Anything).Once()
go func() {
defer wg.Done()
err := mgrA.AddInbound(peerB)
assert.NoError(t, err)
}()
go func() {
defer wg.Done()
err := mgrA.AddInbound(peerC)
assert.NoError(t, err)
}()
time.Sleep(graceTime)
go func() {
defer wg.Done()
err := mgrB.AddOutbound(peerA)
assert.NoError(t, err)
}()
go func() {
defer wg.Done()
err := mgrC.AddOutbound(peerA)
assert.NoError(t, err)
}()
// wait for the connections to establish
wg.Wait()
// only mgr should receive the message
mgrB.On("messageReceived", &MessageReceivedEvent{Data: testMessageData, Peer: peerA}).Once()
// A sends the message only to B
mgrA.SendMessage(testMessageData, peerB.ID())
time.Sleep(graceTime)
mgrA.On("neighborRemoved", mock.Anything).Once()
mgrA.On("neighborRemoved", mock.Anything).Once()
mgrB.On("neighborRemoved", mock.Anything).Once()
mgrC.On("neighborRemoved", mock.Anything).Once()
closeA()
closeB()
closeC()
time.Sleep(graceTime)
mgrA.AssertExpectations(t)
mgrB.AssertExpectations(t)
mgrC.AssertExpectations(t)
}
func TestDropUnsuccessfulAccept(t *testing.T) {
mgrA, closeA, _ := newMockedManager(t, "A")
defer closeA()
mgrB, closeB, peerB := newMockedManager(t, "B")
defer closeB()
mgrA.On("connectionFailed", peerB, mock.Anything).Once()
err := mgrA.AddInbound(peerB)
assert.Error(t, err)
mgrA.AssertExpectations(t)
mgrB.AssertExpectations(t)
}
func TestMessageRequest(t *testing.T) {
mgrA, closeA, peerA := newMockedManager(t, "A")
mgrB, closeB, peerB := newMockedManager(t, "B")
var wg sync.WaitGroup
wg.Add(2)
// connect in the following way
// B -> A
mgrA.On("neighborAdded", mock.Anything).Once()
mgrB.On("neighborAdded", mock.Anything).Once()
go func() {
defer wg.Done()
err := mgrA.AddInbound(peerB)
assert.NoError(t, err)
}()
time.Sleep(graceTime)
go func() {
defer wg.Done()
err := mgrB.AddOutbound(peerA)
assert.NoError(t, err)
}()
// wait for the connections to establish
wg.Wait()
id := message.Id{}
// mgrA should eventually receive the message
mgrA.On("messageReceived", &MessageReceivedEvent{Data: testMessageData, Peer: peerB}).Once()
b, err := proto.Marshal(&pb.MessageRequest{Id: id[:]})
require.NoError(t, err)
mgrA.RequestMessage(b)
time.Sleep(graceTime)
mgrA.On("neighborRemoved", mock.Anything).Once()
mgrB.On("neighborRemoved", mock.Anything).Once()
closeA()
closeB()
time.Sleep(graceTime)
mgrA.AssertExpectations(t)
mgrB.AssertExpectations(t)
}
func TestDropNeighbor(t *testing.T) {
mgrA, closeA, peerA := newTestManager(t, "A")
defer closeA()
mgrB, closeB, peerB := newTestManager(t, "B")
defer closeB()
// establish connection
connect := func() {
var wg sync.WaitGroup
signal := events.NewClosure(func(_ *Neighbor) { wg.Done() })
// we are expecting two signals
wg.Add(2)
// signal as soon as the neighbor is added
mgrA.Events().NeighborAdded.Attach(signal)
defer mgrA.Events().NeighborAdded.Detach(signal)
mgrB.Events().NeighborAdded.Attach(signal)
defer mgrB.Events().NeighborAdded.Detach(signal)
go func() { assert.NoError(t, mgrA.AddInbound(peerB)) }()
go func() { assert.NoError(t, mgrB.AddOutbound(peerA)) }()
wg.Wait() // wait until the events were triggered and the peers are connected
}
// close connection
disconnect := func() {
var wg sync.WaitGroup
signal := events.NewClosure(func(_ *Neighbor) { wg.Done() })
// we are expecting two signals
wg.Add(2)
// signal as soon as the neighbor is added
mgrA.Events().NeighborRemoved.Attach(signal)
defer mgrA.Events().NeighborRemoved.Detach(signal)
mgrB.Events().NeighborRemoved.Attach(signal)
defer mgrB.Events().NeighborRemoved.Detach(signal)
// assure that no DropNeighbor calls are leaking
wg.Add(2)
go func() {
defer wg.Done()
_ = mgrA.DropNeighbor(peerB.ID())
}()
go func() {
defer wg.Done()
_ = mgrB.DropNeighbor(peerA.ID())
}()
wg.Wait() // wait until the events were triggered and the go routines are done
}
// drop and connect many many times
for i := 0; i < 100; i++ {
connect()
assert.NotEmpty(t, mgrA.AllNeighbors())
assert.NotEmpty(t, mgrB.AllNeighbors())
disconnect()
assert.Empty(t, mgrA.AllNeighbors())
assert.Empty(t, mgrB.AllNeighbors())
}
}
func newTestDB(t require.TestingT) *peer.DB {
db, err := peer.NewDB(mapdb.NewMapDB())
require.NoError(t, err)
return db
}
func newTestManager(t require.TestingT, name string) (*Manager, func(), *peer.Peer) {
l := log.Named(name)
laddr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
require.NoError(t, err)
lis, err := net.ListenTCP("tcp", laddr)
require.NoError(t, err)
services := service.New()
services.Update(service.PeeringKey, "peering", 0)
services.Update(service.GossipKey, lis.Addr().Network(), lis.Addr().(*net.TCPAddr).Port)
local, err := peer.NewLocal(lis.Addr().(*net.TCPAddr).IP, services, newTestDB(t))
require.NoError(t, err)
srv := server.ServeTCP(local, lis, l)
// start the actual gossipping
mgr := NewManager(local, loadTestMessage, l)
mgr.Start(srv)
detach := func() {
mgr.Close()
srv.Close()
_ = lis.Close()
}
return mgr, detach, local.Peer
}
func newMockedManager(t *testing.T, name string) (*mockedManager, func(), *peer.Peer) {
mgr, detach, p := newTestManager(t, name)
return mockManager(t, mgr), detach, p
}
func mockManager(t mock.TestingT, mgr *Manager) *mockedManager {
e := &mockedManager{Manager: mgr}
e.Test(t)
e.Events().ConnectionFailed.Attach(events.NewClosure(e.connectionFailed))
e.Events().NeighborAdded.Attach(events.NewClosure(e.neighborAdded))
e.Events().NeighborRemoved.Attach(events.NewClosure(e.neighborRemoved))
e.Events().MessageReceived.Attach(events.NewClosure(e.messageReceived))
return e
}
type mockedManager struct {
mock.Mock
*Manager
}
func (e *mockedManager) connectionFailed(p *peer.Peer, err error) { e.Called(p, err) }
func (e *mockedManager) neighborAdded(n *Neighbor) { e.Called(n) }
func (e *mockedManager) neighborRemoved(n *Neighbor) { e.Called(n) }
func (e *mockedManager) messageReceived(ev *MessageReceivedEvent) { e.Called(ev) }