package gossip

import (
	"net"
	"sync"
	"sync/atomic"
	"testing"
	"time"

	"github.com/iotaledger/goshimmer/packages/autopeering/peer"
	"github.com/iotaledger/goshimmer/packages/autopeering/peer/service"
	"github.com/iotaledger/hive.go/events"
	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/require"
)

var testData = []byte("foobar")

func TestNeighborClose(t *testing.T) {
	a, _, teardown := newPipe()
	defer teardown()

	n := newTestNeighbor("A", a)
	n.Listen()
	require.NoError(t, n.Close())
}

func TestNeighborCloseTwice(t *testing.T) {
	a, _, teardown := newPipe()
	defer teardown()

	n := newTestNeighbor("A", a)
	n.Listen()
	require.NoError(t, n.Close())
	require.NoError(t, n.Close())
}

func TestNeighborWrite(t *testing.T) {
	a, b, teardown := newPipe()
	defer teardown()

	neighborA := newTestNeighbor("A", a)
	defer neighborA.Close()
	neighborA.Listen()

	neighborB := newTestNeighbor("B", b)
	defer neighborB.Close()

	var count uint32
	neighborB.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) {
		assert.Equal(t, testData, data)
		atomic.AddUint32(&count, 1)
	}))
	neighborB.Listen()

	_, err := neighborA.Write(testData)
	require.NoError(t, err)

	assert.Eventually(t, func() bool { return atomic.LoadUint32(&count) == 1 }, time.Second, 10*time.Millisecond)
}

func TestNeighborParallelWrite(t *testing.T) {
	a, b, teardown := newPipe()
	defer teardown()

	neighborA := newTestNeighbor("A", a)
	defer neighborA.Close()
	neighborA.Listen()

	neighborB := newTestNeighbor("B", b)
	defer neighborB.Close()

	var count uint32
	neighborB.Events.ReceiveMessage.Attach(events.NewClosure(func(data []byte) {
		assert.Equal(t, testData, data)
		atomic.AddUint32(&count, 1)
	}))
	neighborB.Listen()

	var (
		wg       sync.WaitGroup
		expected uint32
	)
	wg.Add(2)

	// Writer 1
	go func() {
		defer wg.Done()
		for i := 0; i < neighborQueueSize; i++ {
			_, err := neighborA.Write(testData)
			if err == ErrNeighborQueueFull {
				continue
			}
			assert.NoError(t, err)
			atomic.AddUint32(&expected, 1)
		}
	}()
	// Writer 2
	go func() {
		defer wg.Done()
		for i := 0; i < neighborQueueSize; i++ {
			_, err := neighborA.Write(testData)
			if err == ErrNeighborQueueFull {
				continue
			}
			assert.NoError(t, err)
			atomic.AddUint32(&expected, 1)
		}
	}()

	wg.Wait()

	done := func() bool {
		actual := atomic.LoadUint32(&count)
		return expected == actual
	}
	assert.Eventually(t, done, time.Second, 10*time.Millisecond)
}

func newTestNeighbor(name string, conn net.Conn) *Neighbor {
	return NewNeighbor(newTestPeer(name, conn.LocalAddr()), conn, log.Named(name))
}

func newTestPeer(name string, addr net.Addr) *peer.Peer {
	services := service.New()
	services.Update(service.PeeringKey, addr.Network(), addr.String())
	services.Update(service.GossipKey, addr.Network(), addr.String())

	return peer.NewPeer([]byte(name), services)
}

func newPipe() (net.Conn, net.Conn, func()) {
	a, b := net.Pipe()
	teardown := func() {
		_ = a.Close()
		_ = b.Close()
	}
	return a, b, teardown
}