Skip to content
Snippets Groups Projects
Commit 3af58727 authored by capossele's avatar capossele
Browse files

:construction: WIP

parent b04d665a
No related branches found
No related tags found
No related merge requests found
Showing
with 1593 additions and 160 deletions
......@@ -4,6 +4,7 @@ go 1.13
require (
github.com/StabbyCutyou/buffstreams v2.0.0+incompatible
github.com/capossele/gossip v0.0.0-20191205112840-0e578079b414
github.com/dgraph-io/badger v1.6.0
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/gdamore/tcell v1.3.0
......@@ -11,7 +12,7 @@ require (
github.com/golang/protobuf v1.3.2
github.com/google/open-location-code/go v0.0.0-20190903173953-119bc96a3a51
github.com/gorilla/websocket v1.4.1
github.com/iotaledger/autopeering-sim v0.0.0-20191201144404-58a6f3b1a56d
github.com/iotaledger/autopeering-sim v0.0.0-20191202192349-f8e7a238c2bb
github.com/iotaledger/hive.go v0.0.0-20191125115115-f88d4ecab6dd
github.com/iotaledger/iota.go v1.0.0-beta.10
github.com/labstack/echo v3.3.10+incompatible
......
......@@ -18,6 +18,10 @@ github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5
github.com/beevik/ntp v0.2.0/go.mod h1:hIHWr+l3+/clUnF44zdK+CWW7fO8dR5cIylAQ76NRpg=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/capossele/gossip v0.0.0-20191204191545-36eddf08c1aa h1:46C1Ce+93zGZ+JJ4JUw3EuXHQXqKYzIX7+CRG0fweNk=
github.com/capossele/gossip v0.0.0-20191204191545-36eddf08c1aa/go.mod h1:P8rJEmorO5QpCL236F8vNhUFwFFjezt2d/+/LeRlELA=
github.com/capossele/gossip v0.0.0-20191205112840-0e578079b414 h1:C9Q279xU15Qt5WVZNH6t/1L7exbTVYp1uMRS9NSmL/U=
github.com/capossele/gossip v0.0.0-20191205112840-0e578079b414/go.mod h1:DnYLNZclq7cY6s2oA6wwhQ4tDB0j38enJHPrhpzOpJc=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
......@@ -113,6 +117,8 @@ github.com/iotaledger/autopeering-sim v0.0.0-20191127100001-7ff75c77f051 h1:6JWv
github.com/iotaledger/autopeering-sim v0.0.0-20191127100001-7ff75c77f051/go.mod h1:JiaqaxLkQVnd8e/sya9y/LlRW56WlRKRl2TQXQCVssI=
github.com/iotaledger/autopeering-sim v0.0.0-20191201144404-58a6f3b1a56d h1:wZoNQRwLj4PqhKfruVQ1Yeci6kaSXnE9nSNCFtJWZ9s=
github.com/iotaledger/autopeering-sim v0.0.0-20191201144404-58a6f3b1a56d/go.mod h1:JiaqaxLkQVnd8e/sya9y/LlRW56WlRKRl2TQXQCVssI=
github.com/iotaledger/autopeering-sim v0.0.0-20191202192349-f8e7a238c2bb h1:nfe6HpDhLjvnliVwaz8sO2E/fpD4BEnI/FwfrU+iDRU=
github.com/iotaledger/autopeering-sim v0.0.0-20191202192349-f8e7a238c2bb/go.mod h1:JiaqaxLkQVnd8e/sya9y/LlRW56WlRKRl2TQXQCVssI=
github.com/iotaledger/goshimmer v0.0.0-20191113134331-c2d1b2f9d533/go.mod h1:7vYiofXphp9+PkgVAEM0pvw3aoi4ksrZ7lrEgX50XHs=
github.com/iotaledger/hive.go v0.0.0-20191118130432-89eebe8fe8eb h1:nuS/LETRJ8obUyBIZeyxeei0ZPlyOMj8YPziOgSM4Og=
github.com/iotaledger/hive.go v0.0.0-20191118130432-89eebe8fe8eb/go.mod h1:1Thhlil4lHzuy53EVvmEbEvWBFY0Tasp4kCBfxBCPIk=
......
......@@ -7,7 +7,6 @@ import (
"github.com/iotaledger/goshimmer/plugins/cli"
"github.com/iotaledger/goshimmer/plugins/dashboard"
"github.com/iotaledger/goshimmer/plugins/gossip"
gossip_on_solidification "github.com/iotaledger/goshimmer/plugins/gossip-on-solidification"
"github.com/iotaledger/goshimmer/plugins/gracefulshutdown"
"github.com/iotaledger/goshimmer/plugins/metrics"
"github.com/iotaledger/goshimmer/plugins/statusscreen"
......@@ -28,7 +27,7 @@ func main() {
cli.PLUGIN,
autopeering.PLUGIN,
gossip.PLUGIN,
gossip_on_solidification.PLUGIN,
//gossip_on_solidification.PLUGIN,
tangle.PLUGIN,
bundleprocessor.PLUGIN,
analysis.PLUGIN,
......
package gossip
import (
"github.com/iotaledger/autopeering-sim/peer"
"github.com/iotaledger/hive.go/events"
)
// Events contains all the events that are triggered during the gossip protocol.
type Events struct {
NewTransaction *events.Event
DropNeighbor *events.Event
}
type NewTransactionEvent struct {
Body []byte
Peer *peer.Peer
}
type DropNeighborEvent struct {
Peer *peer.Peer
}
func newTransaction(handler interface{}, params ...interface{}) {
handler.(func(*NewTransactionEvent))(params[0].(*NewTransactionEvent))
}
func dropNeighbor(handler interface{}, params ...interface{}) {
handler.(func(*DropNeighborEvent))(params[0].(*DropNeighborEvent))
}
package gossip
import (
"net"
"github.com/capossele/gossip/neighbor"
pb "github.com/capossele/gossip/proto"
"github.com/capossele/gossip/transport"
"github.com/golang/protobuf/proto"
"github.com/iotaledger/autopeering-sim/peer"
"github.com/iotaledger/hive.go/events"
"github.com/pkg/errors"
"go.uber.org/zap"
)
const (
maxAttempts = 3
)
var (
Event Events
)
type GetTransaction func(txHash []byte) ([]byte, error)
type Manager struct {
neighborhood *neighbor.NeighborMap
trans *transport.TransportTCP
log *zap.SugaredLogger
getTransaction GetTransaction
Events Events
}
func NewManager(t *transport.TransportTCP, log *zap.SugaredLogger, f GetTransaction) *Manager {
mgr := &Manager{
neighborhood: neighbor.NewMap(),
trans: t,
log: log,
getTransaction: f,
Events: Events{
NewTransaction: events.NewEvent(newTransaction),
DropNeighbor: events.NewEvent(dropNeighbor)},
}
Event = mgr.Events
return mgr
}
func (m *Manager) AddOutbound(p *peer.Peer) error {
return m.addNeighbor(p, m.trans.DialPeer)
}
func (m *Manager) AddInbound(p *peer.Peer) error {
return m.addNeighbor(p, m.trans.AcceptPeer)
}
func (m *Manager) DropNeighbor(id peer.ID) {
m.deleteNeighbor(id)
}
func (m *Manager) RequestTransaction(data []byte, to ...*neighbor.Neighbor) {
req := &pb.TransactionRequest{}
err := proto.Unmarshal(data, req)
if err != nil {
m.log.Warnw("Data to send is not a Transaction Request", "err", err)
}
msg := marshal(req)
m.send(msg, to...)
}
func (m *Manager) Send(data []byte, to ...*neighbor.Neighbor) {
tx := &pb.Transaction{}
err := proto.Unmarshal(data, tx)
if err != nil {
m.log.Warnw("Data to send is not a Transaction", "err", err)
}
msg := marshal(tx)
m.send(msg, to...)
}
func (m *Manager) send(msg []byte, to ...*neighbor.Neighbor) {
neighbors := m.neighborhood.GetSlice()
if to != nil {
neighbors = to
}
for _, neighbor := range neighbors {
m.log.Debugw("Sending", "to", neighbor.Peer.ID().String(), "msg", msg)
err := neighbor.Conn.Write(msg)
if err != nil {
m.log.Debugw("send error", "err", err)
}
}
}
func (m *Manager) addNeighbor(peer *peer.Peer, handshake func(*peer.Peer) (*transport.Connection, error)) error {
if _, ok := m.neighborhood.Load(peer.ID().String()); ok {
return errors.New("Neighbor already added")
}
var err error
var conn *transport.Connection
i := 0
for i = 0; i < maxAttempts; i++ {
conn, err = handshake(peer)
if err != nil {
m.log.Warnw("Connection attempt failed", "attempt", i+1)
} else {
break
}
}
if i == maxAttempts {
m.log.Warnw("Connection failed to", "peer", peer.ID().String())
m.Events.DropNeighbor.Trigger(&DropNeighborEvent{Peer: peer})
return err
}
// add the new neighbor
neighbor := neighbor.New(peer, conn)
m.neighborhood.Store(peer.ID().String(), neighbor)
// start listener for the new neighbor
go m.readLoop(neighbor)
return nil
}
func (m *Manager) deleteNeighbor(id peer.ID) {
m.log.Debugw("Deleting neighbor", "neighbor", id.String())
p, ok := m.neighborhood.Delete(id.String())
if ok {
m.Events.DropNeighbor.Trigger(&DropNeighborEvent{Peer: p.Peer})
}
}
func (m *Manager) readLoop(neighbor *neighbor.Neighbor) {
for {
data, err := neighbor.Conn.Read()
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
// ignore temporary read errors.
//m.log.Debugw("temporary read error", "err", err)
continue
} else if err != nil {
// return from the loop on all other errors
m.log.Debugw("reading stopped")
m.deleteNeighbor(neighbor.Peer.ID())
return
}
if err := m.handlePacket(data, neighbor); err != nil {
m.log.Warnw("failed to handle packet", "from", neighbor.Peer.ID().String(), "err", err)
}
}
}
func (m *Manager) handlePacket(data []byte, neighbor *neighbor.Neighbor) error {
switch pb.MType(data[0]) {
// Incoming Transaction
case pb.MTransaction:
msg := new(pb.Transaction)
if err := proto.Unmarshal(data[1:], msg); err != nil {
return errors.Wrap(err, "invalid message")
}
m.log.Debugw("Received Transaction", "data", msg.GetBody())
m.Events.NewTransaction.Trigger(&NewTransactionEvent{Body: msg.GetBody(), Peer: neighbor.Peer})
// Incoming Transaction request
case pb.MTransactionRequest:
msg := new(pb.TransactionRequest)
if err := proto.Unmarshal(data[1:], msg); err != nil {
return errors.Wrap(err, "invalid message")
}
m.log.Debugw("Received Tx Req", "data", msg.GetHash())
// do something
tx, err := m.getTransaction(msg.GetHash())
if err != nil {
m.log.Debugw("Tx not available", "tx", msg.GetHash())
} else {
m.log.Debugw("Tx found", "tx", tx)
m.Send(tx, neighbor)
}
default:
return nil
}
return nil
}
func marshal(msg pb.Message) []byte {
mType := msg.Type()
if mType > 0xFF {
panic("invalid message")
}
data, err := proto.Marshal(msg)
if err != nil {
panic("invalid message")
}
return append([]byte{byte(mType)}, data...)
}
package gossip
import (
"log"
"sync"
"testing"
"time"
pb "github.com/capossele/gossip/proto"
"github.com/capossele/gossip/transport"
"github.com/golang/protobuf/proto"
"github.com/iotaledger/autopeering-sim/peer"
"github.com/iotaledger/autopeering-sim/peer/service"
"github.com/iotaledger/hive.go/events"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
const graceTime = 5 * time.Millisecond
var logger *zap.SugaredLogger
func init() {
l, err := zap.NewDevelopment()
if err != nil {
log.Fatalf("cannot initialize logger: %v", err)
}
logger = l.Sugar()
}
func testGetTransaction([]byte) ([]byte, error) {
tx := &pb.TransactionRequest{
Hash: []byte("testTx"),
}
b, _ := proto.Marshal(tx)
return b, nil
}
func newTest(t require.TestingT, name string) (*Manager, func(), *peer.Peer) {
log := logger.Named(name)
db := peer.NewMemoryDB(log.Named("db"))
local, err := peer.NewLocal("peering", name, db)
require.NoError(t, err)
require.NoError(t, local.UpdateService(service.GossipKey, "tcp", "localhost:0"))
trans, err := transport.Listen(local, log)
require.NoError(t, err)
mgr := NewManager(trans, log, testGetTransaction)
// update the service with the actual address
require.NoError(t, local.UpdateService(service.GossipKey, trans.LocalAddr().Network(), trans.LocalAddr().String()))
teardown := func() {
trans.Close()
db.Close()
}
return mgr, teardown, &local.Peer
}
func TestClose(t *testing.T) {
_, teardown, _ := newTest(t, "A")
teardown()
}
func TestUnicast(t *testing.T) {
mgrA, closeA, peerA := newTest(t, "A")
defer closeA()
mgrB, closeB, peerB := newTest(t, "B")
defer closeB()
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
err := mgrA.addNeighbor(peerB, mgrA.trans.AcceptPeer)
assert.NoError(t, err)
}()
time.Sleep(graceTime)
go func() {
defer wg.Done()
err := mgrB.addNeighbor(peerA, mgrB.trans.DialPeer)
assert.NoError(t, err)
}()
// wait for the connections to establish
wg.Wait()
tx := &pb.Transaction{Body: []byte("Hello!")}
triggered := make(chan struct{}, 1)
mgrB.Events.NewTransaction.Attach(events.NewClosure(func(ev *NewTransactionEvent) {
require.Empty(t, triggered) // only once
assert.Equal(t, tx.GetBody(), ev.Body)
assert.Equal(t, peerA, ev.Peer)
triggered <- struct{}{}
}))
b, err := proto.Marshal(tx)
require.NoError(t, err)
mgrA.Send(b)
// eventually the event should be triggered
assert.Eventually(t, func() bool { return len(triggered) >= 1 }, time.Second, 10*time.Millisecond)
}
func TestBroadcast(t *testing.T) {
mgrA, closeA, peerA := newTest(t, "A")
defer closeA()
mgrB, closeB, peerB := newTest(t, "B")
defer closeB()
mgrC, closeC, peerC := newTest(t, "C")
defer closeC()
var wg sync.WaitGroup
wg.Add(4)
go func() {
defer wg.Done()
err := mgrA.addNeighbor(peerB, mgrA.trans.AcceptPeer)
assert.NoError(t, err)
}()
go func() {
defer wg.Done()
err := mgrA.addNeighbor(peerC, mgrA.trans.AcceptPeer)
assert.NoError(t, err)
}()
time.Sleep(graceTime)
go func() {
defer wg.Done()
err := mgrB.addNeighbor(peerA, mgrB.trans.DialPeer)
assert.NoError(t, err)
}()
go func() {
defer wg.Done()
err := mgrC.addNeighbor(peerA, mgrC.trans.DialPeer)
assert.NoError(t, err)
}()
// wait for the connections to establish
wg.Wait()
tx := &pb.Transaction{Body: []byte("Hello!")}
triggeredB := make(chan struct{}, 1)
mgrB.Events.NewTransaction.Attach(events.NewClosure(func(ev *NewTransactionEvent) {
require.Empty(t, triggeredB) // only once
assert.Equal(t, tx.GetBody(), ev.Body)
assert.Equal(t, peerA, ev.Peer)
triggeredB <- struct{}{}
}))
triggeredC := make(chan struct{}, 1)
mgrC.Events.NewTransaction.Attach(events.NewClosure(func(ev *NewTransactionEvent) {
require.Empty(t, triggeredC) // only once
assert.Equal(t, tx.GetBody(), ev.Body)
assert.Equal(t, peerA, ev.Peer)
triggeredC <- struct{}{}
}))
b, err := proto.Marshal(tx)
assert.NoError(t, err)
mgrA.Send(b)
// eventually the events should be triggered
success := func() bool {
return len(triggeredB) >= 1 && len(triggeredC) >= 1
}
assert.Eventually(t, success, time.Second, 10*time.Millisecond)
}
func TestDropUnsuccessfulAccept(t *testing.T) {
mgrA, closeA, _ := newTest(t, "A")
defer closeA()
_, closeB, peerB := newTest(t, "B")
defer closeB()
triggered := make(chan struct{}, 1)
mgrA.Events.DropNeighbor.Attach(events.NewClosure(func(ev *DropNeighborEvent) {
require.Empty(t, triggered) // only once
assert.Equal(t, peerB, ev.Peer)
triggered <- struct{}{}
}))
err := mgrA.addNeighbor(peerB, mgrA.trans.AcceptPeer)
assert.Error(t, err)
// eventually the event should be triggered
assert.Eventually(t, func() bool { return len(triggered) >= 1 }, time.Second, 10*time.Millisecond)
}
func TestTxRequest(t *testing.T) {
mgrA, closeA, peerA := newTest(t, "A")
defer closeA()
mgrB, closeB, peerB := newTest(t, "B")
defer closeB()
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
err := mgrA.addNeighbor(peerB, mgrA.trans.AcceptPeer)
assert.NoError(t, err)
logger.Debugw("Len", "len", mgrA.neighborhood.Len())
}()
go func() {
defer wg.Done()
err := mgrB.addNeighbor(peerA, mgrB.trans.DialPeer)
assert.NoError(t, err)
logger.Debugw("Len", "len", mgrB.neighborhood.Len())
}()
wg.Wait()
tx := &pb.TransactionRequest{
Hash: []byte("Hello!"),
}
b, err := proto.Marshal(tx)
assert.NoError(t, err)
sendChan := make(chan struct{})
sendSuccess := false
mgrA.Events.NewTransaction.Attach(events.NewClosure(func(ev *NewTransactionEvent) {
logger.Debugw("New TX Event triggered", "data", ev.Body, "from", ev.Peer.ID().String())
assert.Equal(t, []byte("testTx"), ev.Body)
assert.Equal(t, peerB, ev.Peer)
sendChan <- struct{}{}
}))
mgrA.RequestTransaction(b)
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
select {
case <-sendChan:
sendSuccess = true
case <-timer.C:
sendSuccess = false
}
assert.True(t, sendSuccess)
}
package gossip
package neighbor
import (
"sync"
"github.com/capossele/gossip/transport"
"github.com/iotaledger/autopeering-sim/peer"
)
// NeighborMap is the mapping of neighbor identifier and their neighbor struct
// It uses a mutex to handle concurrent access to its internal map
// Neighbor defines a neighbor
type Neighbor struct {
Peer *peer.Peer
Conn *transport.Connection
}
// NeighborMap implements a map of neighbors thread safe
type NeighborMap struct {
sync.RWMutex
internal map[string]*Neighbor
}
// NewPeerMap returns a new PeerMap
func NewNeighborMap() *NeighborMap {
// NewMap returns a new NeighborMap
func NewMap() *NeighborMap {
return &NeighborMap{
internal: make(map[string]*Neighbor),
}
}
// Len returns the number of peers stored in a PeerMap
// New returns a new Neighbor
func New(peer *peer.Peer, conn *transport.Connection) *Neighbor {
return &Neighbor{
Peer: peer,
Conn: conn,
}
}
// Len returns the number of neighbors stored in a NeighborMap
func (nm *NeighborMap) Len() int {
nm.RLock()
defer nm.RUnlock()
......@@ -36,7 +52,7 @@ func (nm *NeighborMap) GetMap() map[string]*Neighbor {
return newMap
}
// GetMap returns the content of the entire internal map
// GetSlice returns a slice of the content of the entire internal map
func (nm *NeighborMap) GetSlice() []*Neighbor {
newSlice := make([]*Neighbor, nm.Len())
nm.RLock()
......@@ -49,9 +65,9 @@ func (nm *NeighborMap) GetSlice() []*Neighbor {
return newSlice
}
// Load returns the peer for a given key.
// Load returns the neighbor for a given key.
// It also return a bool to communicate the presence of the given
// peer into the internal map
// neighbor into the internal map
func (nm *NeighborMap) Load(key string) (value *Neighbor, ok bool) {
nm.RLock()
defer nm.RUnlock()
......@@ -60,18 +76,21 @@ func (nm *NeighborMap) Load(key string) (value *Neighbor, ok bool) {
}
// Delete removes the entire entry for a given key and return true if successful
func (nm *NeighborMap) Delete(key string) (deletedPeer *Neighbor, ok bool) {
deletedPeer, ok = nm.Load(key)
func (nm *NeighborMap) Delete(key string) (deletedNeighbor *Neighbor, ok bool) {
deletedNeighbor, ok = nm.Load(key)
if !ok {
return nil, false
}
nm.Lock()
defer nm.Unlock()
if deletedNeighbor.Conn != nil {
deletedNeighbor.Conn.Close()
}
delete(nm.internal, key)
return deletedPeer, true
return deletedNeighbor, true
}
// Store adds a new peer to the PeerMap
// Store adds a new neighbor to the NeighborMap
func (nm *NeighborMap) Store(key string, value *Neighbor) {
nm.Lock()
defer nm.Unlock()
......
package proto
import (
"github.com/golang/protobuf/proto"
)
// MType is the type of message type enum.
type MType uint
// An enum for the different message types.
const (
MTransaction MType = 20 + iota
MTransactionRequest
)
// Message extends the proto.Message interface with additional util functions.
type Message interface {
proto.Message
// Name returns the name of the corresponding message type for debugging.
Name() string
// Type returns the type of the corresponding message as an enum.
Type() MType
}
func (m *Transaction) Name() string { return "TRANSACTION" }
func (m *Transaction) Type() MType { return MTransaction }
func (m *TransactionRequest) Name() string { return "TRANSACTION_REQUEST" }
func (m *TransactionRequest) Type() MType { return MTransactionRequest }
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: proto/message.proto
package proto
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type Transaction struct {
// body of the tx
Body []byte `protobuf:"bytes,1,opt,name=body,proto3" json:"body,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Transaction) Reset() { *m = Transaction{} }
func (m *Transaction) String() string { return proto.CompactTextString(m) }
func (*Transaction) ProtoMessage() {}
func (*Transaction) Descriptor() ([]byte, []int) {
return fileDescriptor_33f3a5e1293a7bcd, []int{0}
}
func (m *Transaction) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Transaction.Unmarshal(m, b)
}
func (m *Transaction) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Transaction.Marshal(b, m, deterministic)
}
func (m *Transaction) XXX_Merge(src proto.Message) {
xxx_messageInfo_Transaction.Merge(m, src)
}
func (m *Transaction) XXX_Size() int {
return xxx_messageInfo_Transaction.Size(m)
}
func (m *Transaction) XXX_DiscardUnknown() {
xxx_messageInfo_Transaction.DiscardUnknown(m)
}
var xxx_messageInfo_Transaction proto.InternalMessageInfo
func (m *Transaction) GetBody() []byte {
if m != nil {
return m.Body
}
return nil
}
type TransactionRequest struct {
// transaction hash
Hash []byte `protobuf:"bytes,1,opt,name=hash,proto3" json:"hash,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *TransactionRequest) Reset() { *m = TransactionRequest{} }
func (m *TransactionRequest) String() string { return proto.CompactTextString(m) }
func (*TransactionRequest) ProtoMessage() {}
func (*TransactionRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_33f3a5e1293a7bcd, []int{1}
}
func (m *TransactionRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_TransactionRequest.Unmarshal(m, b)
}
func (m *TransactionRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_TransactionRequest.Marshal(b, m, deterministic)
}
func (m *TransactionRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_TransactionRequest.Merge(m, src)
}
func (m *TransactionRequest) XXX_Size() int {
return xxx_messageInfo_TransactionRequest.Size(m)
}
func (m *TransactionRequest) XXX_DiscardUnknown() {
xxx_messageInfo_TransactionRequest.DiscardUnknown(m)
}
var xxx_messageInfo_TransactionRequest proto.InternalMessageInfo
func (m *TransactionRequest) GetHash() []byte {
if m != nil {
return m.Hash
}
return nil
}
func init() {
proto.RegisterType((*Transaction)(nil), "proto.Transaction")
proto.RegisterType((*TransactionRequest)(nil), "proto.TransactionRequest")
}
func init() { proto.RegisterFile("proto/message.proto", fileDescriptor_33f3a5e1293a7bcd) }
var fileDescriptor_33f3a5e1293a7bcd = []byte{
// 139 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x2e, 0x28, 0xca, 0x2f,
0xc9, 0xd7, 0xcf, 0x4d, 0x2d, 0x2e, 0x4e, 0x4c, 0x4f, 0xd5, 0x03, 0xf3, 0x84, 0x58, 0xc1, 0x94,
0x92, 0x22, 0x17, 0x77, 0x48, 0x51, 0x62, 0x5e, 0x71, 0x62, 0x72, 0x49, 0x66, 0x7e, 0x9e, 0x90,
0x10, 0x17, 0x4b, 0x52, 0x7e, 0x4a, 0xa5, 0x04, 0xa3, 0x02, 0xa3, 0x06, 0x4f, 0x10, 0x98, 0xad,
0xa4, 0xc1, 0x25, 0x84, 0xa4, 0x24, 0x28, 0xb5, 0xb0, 0x34, 0xb5, 0xb8, 0x04, 0xa4, 0x32, 0x23,
0xb1, 0x38, 0x03, 0xa6, 0x12, 0xc4, 0x76, 0x52, 0x8e, 0x52, 0x4c, 0xcf, 0x2c, 0xc9, 0x28, 0x4d,
0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x4f, 0x4e, 0x2c, 0xc8, 0x2f, 0x2e, 0x4e, 0xcd, 0x49, 0xd5, 0x4f,
0xcf, 0x2f, 0x2e, 0xce, 0x2c, 0xd0, 0x07, 0xdb, 0x98, 0xc4, 0x06, 0xa6, 0x8c, 0x01, 0x01, 0x00,
0x00, 0xff, 0xff, 0x34, 0x46, 0xa5, 0x0f, 0x96, 0x00, 0x00, 0x00,
}
syntax = "proto3";
option go_package = "github.com/capossele/gossip/proto";
package proto;
message Transaction {
// body of the tx
bytes body = 1;
}
message TransactionRequest {
// transaction hash
bytes hash = 1;
}
\ No newline at end of file
package transport
import (
"net"
"github.com/iotaledger/autopeering-sim/peer"
)
const (
// MaxPacketSize specifies the maximum allowed size of packets.
// Packets larger than this will be cut and thus treated as invalid.
MaxPacketSize = 1280
)
type Connection struct {
peer *peer.Peer
conn net.Conn
}
func newConnection(p *peer.Peer, c net.Conn) *Connection {
return &Connection{
peer: p,
conn: c,
}
}
func (c *Connection) Close() {
c.conn.Close()
}
func (c *Connection) Read() ([]byte, error) {
b := make([]byte, MaxPacketSize)
n, err := c.conn.Read(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
func (c *Connection) Write(b []byte) error {
_, err := c.conn.Write(b)
return err
}
package transport
import (
"bytes"
"time"
pb "github.com/capossele/gossip/transport/proto"
"github.com/golang/protobuf/proto"
"github.com/iotaledger/autopeering-sim/server"
)
const (
HandshakeExpiration = 20 * time.Second
VersionNum = 0
)
// isExpired checks whether the given UNIX time stamp is too far in the past.
func isExpired(ts int64) bool {
return time.Since(time.Unix(ts, 0)) >= HandshakeExpiration
}
func newHandshakeRequest(fromAddr string, toAddr string) ([]byte, error) {
m := &pb.HandshakeRequest{
Version: VersionNum,
From: fromAddr,
To: toAddr,
Timestamp: time.Now().Unix(),
}
return proto.Marshal(m)
}
func newHandshakeResponse(reqData []byte) ([]byte, error) {
m := &pb.HandshakeResponse{
ReqHash: server.PacketHash(reqData),
}
return proto.Marshal(m)
}
func (t *TransportTCP) validateHandshakeRequest(reqData []byte, fromAddr string) bool {
m := new(pb.HandshakeRequest)
if err := proto.Unmarshal(reqData, m); err != nil {
t.log.Debugw("invalid handshake",
"err", err,
)
return false
}
if m.GetVersion() != VersionNum {
t.log.Debugw("invalid handshake",
"version", m.GetVersion(),
)
return false
}
if m.GetFrom() != fromAddr {
t.log.Debugw("invalid handshake",
"from", m.GetFrom(),
)
return false
}
if m.GetTo() != t.LocalAddr().String() {
t.log.Debugw("invalid handshake",
"to", m.GetTo(),
)
return false
}
if isExpired(m.GetTimestamp()) {
t.log.Debugw("invalid handshake",
"timestamp", time.Unix(m.GetTimestamp(), 0),
)
}
return true
}
func (t *TransportTCP) validateHandshakeResponse(resData []byte, reqData []byte) bool {
m := new(pb.HandshakeResponse)
if err := proto.Unmarshal(resData, m); err != nil {
t.log.Debugw("invalid handshake",
"err", err,
)
return false
}
if !bytes.Equal(m.GetReqHash(), server.PacketHash(reqData)) {
t.log.Debugw("invalid handshake",
"hash", m.GetReqHash(),
)
return false
}
return true
}
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: transport/proto/handshake.proto
package proto
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type HandshakeRequest struct {
// protocol version number
Version uint32 `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"`
// string form of the sender address (e.g. "192.0.2.1:25", "[2001:db8::1]:80")
From string `protobuf:"bytes,2,opt,name=from,proto3" json:"from,omitempty"`
// string form of the recipient address
To string `protobuf:"bytes,3,opt,name=to,proto3" json:"to,omitempty"`
// unix time
Timestamp int64 `protobuf:"varint,4,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *HandshakeRequest) Reset() { *m = HandshakeRequest{} }
func (m *HandshakeRequest) String() string { return proto.CompactTextString(m) }
func (*HandshakeRequest) ProtoMessage() {}
func (*HandshakeRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_d7101ffe19b05443, []int{0}
}
func (m *HandshakeRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_HandshakeRequest.Unmarshal(m, b)
}
func (m *HandshakeRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_HandshakeRequest.Marshal(b, m, deterministic)
}
func (m *HandshakeRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_HandshakeRequest.Merge(m, src)
}
func (m *HandshakeRequest) XXX_Size() int {
return xxx_messageInfo_HandshakeRequest.Size(m)
}
func (m *HandshakeRequest) XXX_DiscardUnknown() {
xxx_messageInfo_HandshakeRequest.DiscardUnknown(m)
}
var xxx_messageInfo_HandshakeRequest proto.InternalMessageInfo
func (m *HandshakeRequest) GetVersion() uint32 {
if m != nil {
return m.Version
}
return 0
}
func (m *HandshakeRequest) GetFrom() string {
if m != nil {
return m.From
}
return ""
}
func (m *HandshakeRequest) GetTo() string {
if m != nil {
return m.To
}
return ""
}
func (m *HandshakeRequest) GetTimestamp() int64 {
if m != nil {
return m.Timestamp
}
return 0
}
type HandshakeResponse struct {
// hash of the ping packet
ReqHash []byte `protobuf:"bytes,1,opt,name=req_hash,json=reqHash,proto3" json:"req_hash,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *HandshakeResponse) Reset() { *m = HandshakeResponse{} }
func (m *HandshakeResponse) String() string { return proto.CompactTextString(m) }
func (*HandshakeResponse) ProtoMessage() {}
func (*HandshakeResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_d7101ffe19b05443, []int{1}
}
func (m *HandshakeResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_HandshakeResponse.Unmarshal(m, b)
}
func (m *HandshakeResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_HandshakeResponse.Marshal(b, m, deterministic)
}
func (m *HandshakeResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_HandshakeResponse.Merge(m, src)
}
func (m *HandshakeResponse) XXX_Size() int {
return xxx_messageInfo_HandshakeResponse.Size(m)
}
func (m *HandshakeResponse) XXX_DiscardUnknown() {
xxx_messageInfo_HandshakeResponse.DiscardUnknown(m)
}
var xxx_messageInfo_HandshakeResponse proto.InternalMessageInfo
func (m *HandshakeResponse) GetReqHash() []byte {
if m != nil {
return m.ReqHash
}
return nil
}
func init() {
proto.RegisterType((*HandshakeRequest)(nil), "proto.HandshakeRequest")
proto.RegisterType((*HandshakeResponse)(nil), "proto.HandshakeResponse")
}
func init() { proto.RegisterFile("transport/proto/handshake.proto", fileDescriptor_d7101ffe19b05443) }
var fileDescriptor_d7101ffe19b05443 = []byte{
// 203 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x5c, 0x8f, 0x3f, 0x4b, 0x04, 0x31,
0x10, 0xc5, 0xb9, 0x3f, 0x7a, 0xde, 0xa0, 0xa2, 0xa9, 0x22, 0x08, 0xca, 0x55, 0x82, 0xb8, 0x29,
0xfc, 0x06, 0x56, 0x57, 0xa7, 0xb4, 0x91, 0xec, 0x3a, 0x6e, 0x82, 0x26, 0x93, 0xcd, 0x64, 0xfd,
0xfc, 0x0e, 0x81, 0x45, 0xb1, 0x7a, 0xef, 0xf7, 0x0b, 0xe4, 0x25, 0x70, 0x57, 0x8b, 0x4b, 0x9c,
0xa9, 0x54, 0x93, 0x0b, 0x55, 0x32, 0xde, 0xa5, 0x77, 0xf6, 0xee, 0x13, 0xbb, 0xc6, 0xea, 0xa4,
0xc5, 0x21, 0xc1, 0xd5, 0x71, 0x39, 0xb1, 0x38, 0xcd, 0xc8, 0x55, 0x69, 0xd8, 0x7d, 0x63, 0xe1,
0x40, 0x49, 0xaf, 0xee, 0x57, 0x0f, 0x17, 0x76, 0x41, 0xa5, 0x60, 0xfb, 0x51, 0x28, 0xea, 0xb5,
0xe8, 0xbd, 0x6d, 0x5d, 0x5d, 0xc2, 0xba, 0x92, 0xde, 0x34, 0x23, 0x4d, 0xdd, 0xc2, 0xbe, 0x86,
0x28, 0xf7, 0xb8, 0x98, 0xf5, 0x56, 0xf4, 0xc6, 0xfe, 0x8a, 0x43, 0x07, 0xd7, 0x7f, 0xf6, 0xe4,
0x81, 0x89, 0x51, 0xdd, 0xc0, 0x59, 0xc1, 0xe9, 0xcd, 0x3b, 0xf6, 0x6d, 0xf1, 0xdc, 0xee, 0x84,
0x8f, 0x82, 0x2f, 0x4f, 0xaf, 0x8f, 0x63, 0xa8, 0x7e, 0xee, 0xbb, 0x81, 0xa2, 0x19, 0x5c, 0x26,
0x66, 0xfc, 0x42, 0x33, 0x4a, 0x86, 0x6c, 0xfe, 0xfd, 0xb2, 0x3f, 0x6d, 0xf1, 0xfc, 0x13, 0x00,
0x00, 0xff, 0xff, 0x51, 0xe0, 0x08, 0xd0, 0xff, 0x00, 0x00, 0x00,
}
syntax = "proto3";
option go_package = "github.com/capossele/gossip/transport/proto";
package proto;
message HandshakeRequest {
// protocol version number
uint32 version = 1;
// string form of the sender address (e.g. "192.0.2.1:25", "[2001:db8::1]:80")
string from = 2;
// string form of the recipient address
string to = 3;
// unix time
int64 timestamp = 4;
}
message HandshakeResponse {
// hash of the ping packet
bytes req_hash = 1;
}
\ No newline at end of file
package transport
import (
"bytes"
"container/list"
"errors"
"net"
"sync"
"time"
"github.com/golang/protobuf/proto"
"github.com/iotaledger/autopeering-sim/peer"
"github.com/iotaledger/autopeering-sim/peer/service"
pb "github.com/iotaledger/autopeering-sim/server/proto"
"go.uber.org/zap"
)
var (
ErrTimeout = errors.New("accept timeout")
ErrClosed = errors.New("listener closed")
ErrInvalidHandshake = errors.New("invalid handshake")
ErrNoGossip = errors.New("peer does not have a gossip service")
)
// connection timeouts
const (
acceptTimeout = 500 * time.Millisecond
handshakeTimeout = 100 * time.Millisecond
connectionTimeout = acceptTimeout + handshakeTimeout
)
type TransportTCP struct {
local *peer.Local
listener *net.TCPListener
log *zap.SugaredLogger
addAcceptMatcher chan *acceptMatcher
acceptReceived chan accept
closeOnce sync.Once
wg sync.WaitGroup
closing chan struct{} // if this channel gets closed all pending waits should terminate
}
// connect contains the result of an incoming connection.
type connect struct {
c *Connection
err error
}
type acceptMatcher struct {
peer *peer.Peer // connecting peer
deadline time.Time // deadline for the incoming call
connected chan connect // result of the connection is signaled here
}
type accept struct {
fromID peer.ID // ID of the connecting peer
req []byte // raw data of the handshake request
conn net.Conn // the actual network connection
}
func Listen(local *peer.Local, log *zap.SugaredLogger) (*TransportTCP, error) {
t := &TransportTCP{
local: local,
log: log,
addAcceptMatcher: make(chan *acceptMatcher),
acceptReceived: make(chan accept),
closing: make(chan struct{}),
}
gossipAddr := local.Services().Get(service.GossipKey)
if gossipAddr == nil {
return nil, ErrNoGossip
}
tcpAddr, err := net.ResolveTCPAddr(gossipAddr.Network(), gossipAddr.String())
if err != nil {
return nil, err
}
listener, err := net.ListenTCP(gossipAddr.Network(), tcpAddr)
if err != nil {
return nil, err
}
t.listener = listener
t.wg.Add(2)
go t.run()
go t.listenLoop()
return t, nil
}
// Close stops listening on the gossip address.
func (t *TransportTCP) Close() {
t.closeOnce.Do(func() {
close(t.closing)
if err := t.listener.Close(); err != nil {
t.log.Warnw("close error", "err", err)
}
t.wg.Wait()
})
}
// LocalAddr returns the listener's network address,
func (t *TransportTCP) LocalAddr() net.Addr {
return t.listener.Addr()
}
// DialPeer establishes a gossip connection to the given peer.
// If the peer does not accept the connection or the handshake fails, an error is returned.
func (t *TransportTCP) DialPeer(p *peer.Peer) (*Connection, error) {
gossipAddr := p.Services().Get(service.GossipKey)
if gossipAddr == nil {
return nil, ErrNoGossip
}
conn, err := net.DialTimeout(gossipAddr.Network(), gossipAddr.String(), acceptTimeout)
if err != nil {
return nil, err
}
err = t.doHandshake(p.PublicKey(), gossipAddr.String(), conn)
if err != nil {
return nil, err
}
t.log.Debugw("connected", "id", p.ID(), "addr", conn.RemoteAddr(), "direction", "out")
return newConnection(p, conn), nil
}
// AcceptPeer awaits an incoming connection from the given peer.
// If the peer does not establish the connection or the handshake fails, an error is returned.
func (t *TransportTCP) AcceptPeer(p *peer.Peer) (*Connection, error) {
if p.Services().Get(service.GossipKey) == nil {
return nil, ErrNoGossip
}
// wait for the connection
connected := <-t.acceptPeer(p)
if connected.err != nil {
return nil, connected.err
}
t.log.Debugw("connected", "id", p.ID(), "addr", connected.c.conn.RemoteAddr(), "direction", "in")
return connected.c, nil
}
func (t *TransportTCP) acceptPeer(p *peer.Peer) <-chan connect {
connected := make(chan connect, 1)
// add the matcher
select {
case t.addAcceptMatcher <- &acceptMatcher{peer: p, connected: connected}:
case <-t.closing:
connected <- connect{nil, ErrClosed}
}
return connected
}
func (t *TransportTCP) closeConnection(c net.Conn) {
if err := c.Close(); err != nil {
t.log.Warnw("close error", "err", err)
}
}
func (t *TransportTCP) run() {
defer t.wg.Done()
var (
mlist = list.New()
timeout = time.NewTimer(0)
)
defer timeout.Stop()
<-timeout.C // ignore first timeout
for {
// Set the timer so that it fires when the next accept expires
if el := mlist.Front(); el != nil {
// the first element always has the closest deadline
m := el.Value.(*acceptMatcher)
timeout.Reset(time.Until(m.deadline))
} else {
timeout.Stop()
}
select {
// add a new matcher to the list
case m := <-t.addAcceptMatcher:
m.deadline = time.Now().Add(connectionTimeout)
mlist.PushBack(m)
// on accept received, check all matchers for a fit
case a := <-t.acceptReceived:
matched := false
for el := mlist.Front(); el != nil; el = el.Next() {
m := el.Value.(*acceptMatcher)
if m.peer.ID() == a.fromID {
matched = true
mlist.Remove(el)
// finish the handshake
go t.matchAccept(m, a.req, a.conn)
}
}
// close the connection if not matched
if !matched {
t.log.Debugw("unexpected connection", "id", a.fromID, "addr", a.conn.RemoteAddr())
t.closeConnection(a.conn)
}
// on timeout, check for expired matchers
case <-timeout.C:
now := time.Now()
// notify and remove any expired matchers
for el := mlist.Front(); el != nil; el = el.Next() {
m := el.Value.(*acceptMatcher)
if now.After(m.deadline) || now.Equal(m.deadline) {
m.connected <- connect{nil, ErrTimeout}
mlist.Remove(el)
}
}
// on close, notify all the matchers
case <-t.closing:
for el := mlist.Front(); el != nil; el = el.Next() {
el.Value.(*acceptMatcher).connected <- connect{nil, ErrClosed}
}
return
}
}
}
func (t *TransportTCP) matchAccept(m *acceptMatcher, req []byte, conn net.Conn) {
t.wg.Add(1)
defer t.wg.Done()
if err := t.writeHandshakeResponse(req, conn); err != nil {
t.log.Warnw("failed handshake", "addr", conn.RemoteAddr(), "err", err)
m.connected <- connect{nil, err}
t.closeConnection(conn)
return
}
m.connected <- connect{newConnection(m.peer, conn), nil}
}
func (t *TransportTCP) listenLoop() {
defer t.wg.Done()
for {
conn, err := t.listener.AcceptTCP()
if err, ok := err.(net.Error); ok && err.Temporary() {
t.log.Debugw("temporary read error", "err", err)
continue
} else if err != nil {
// return from the loop on all other errors
t.log.Warnw("read error", "err", err)
return
}
key, req, err := t.readHandshakeRequest(conn)
if err != nil {
t.log.Warnw("failed handshake", "addr", conn.RemoteAddr(), "err", err)
t.closeConnection(conn)
continue
}
select {
case t.acceptReceived <- accept{
fromID: key.ID(),
req: req,
conn: conn,
}:
case <-t.closing:
t.closeConnection(conn)
return
}
}
}
func (t *TransportTCP) doHandshake(key peer.PublicKey, remoteAddr string, conn net.Conn) error {
reqData, err := newHandshakeRequest(conn.LocalAddr().String(), remoteAddr)
if err != nil {
return err
}
pkt := &pb.Packet{
PublicKey: t.local.PublicKey(),
Signature: t.local.Sign(reqData),
Data: reqData,
}
b, err := proto.Marshal(pkt)
if err != nil {
return err
}
if err := conn.SetWriteDeadline(time.Now().Add(handshakeTimeout)); err != nil {
return err
}
_, err = conn.Write(b)
if err != nil {
return err
}
if err := conn.SetReadDeadline(time.Now().Add(handshakeTimeout)); err != nil {
return err
}
b = make([]byte, MaxPacketSize)
n, err := conn.Read(b)
if err != nil {
return err
}
pkt = new(pb.Packet)
if err := proto.Unmarshal(b[:n], pkt); err != nil {
return err
}
signer, err := peer.RecoverKeyFromSignedData(pkt)
if err != nil {
return err
}
if !bytes.Equal(key, signer) {
return errors.New("invalid key")
}
if !t.validateHandshakeResponse(pkt.GetData(), reqData) {
return ErrInvalidHandshake
}
return nil
}
func (t *TransportTCP) readHandshakeRequest(conn net.Conn) (peer.PublicKey, []byte, error) {
if err := conn.SetReadDeadline(time.Now().Add(handshakeTimeout)); err != nil {
return nil, nil, err
}
b := make([]byte, MaxPacketSize)
n, err := conn.Read(b)
if err != nil {
return nil, nil, err
}
pkt := new(pb.Packet)
if err := proto.Unmarshal(b[:n], pkt); err != nil {
return nil, nil, err
}
key, err := peer.RecoverKeyFromSignedData(pkt)
if err != nil {
return nil, nil, err
}
if !t.validateHandshakeRequest(pkt.GetData(), conn.RemoteAddr().String()) {
return nil, nil, ErrInvalidHandshake
}
return key, pkt.GetData(), nil
}
func (t *TransportTCP) writeHandshakeResponse(reqData []byte, conn net.Conn) error {
data, err := newHandshakeResponse(reqData)
if err != nil {
return err
}
pkt := &pb.Packet{
PublicKey: t.local.PublicKey(),
Signature: t.local.Sign(data),
Data: data,
}
b, err := proto.Marshal(pkt)
if err != nil {
return err
}
if err := conn.SetWriteDeadline(time.Now().Add(handshakeTimeout)); err != nil {
return err
}
_, err = conn.Write(b)
if err != nil {
return err
}
return nil
}
package transport
import (
"log"
"net"
"sync"
"testing"
"time"
"github.com/iotaledger/autopeering-sim/peer"
"github.com/iotaledger/autopeering-sim/peer/service"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
const graceTime = 5 * time.Millisecond
var logger *zap.SugaredLogger
func init() {
l, err := zap.NewDevelopment()
if err != nil {
log.Fatalf("cannot initialize logger: %v", err)
}
logger = l.Sugar()
}
func newTest(t require.TestingT, name string) (*TransportTCP, func()) {
l := logger.Named(name)
db := peer.NewMemoryDB(l.Named("db"))
local, err := peer.NewLocal("peering", name, db)
require.NoError(t, err)
// enable TCP gossipping
require.NoError(t, local.UpdateService(service.GossipKey, "tcp", ":0"))
trans, err := Listen(local, l)
require.NoError(t, err)
// update the service with the actual address
require.NoError(t, local.UpdateService(service.GossipKey, trans.LocalAddr().Network(), trans.LocalAddr().String()))
teardown := func() {
trans.Close()
db.Close()
}
return trans, teardown
}
func getPeer(t *TransportTCP) *peer.Peer {
return &t.local.Peer
}
func TestClose(t *testing.T) {
_, teardown := newTest(t, "A")
teardown()
}
func TestUnansweredAccept(t *testing.T) {
transA, closeA := newTest(t, "A")
defer closeA()
_, err := transA.AcceptPeer(getPeer(transA))
assert.Error(t, err)
}
func TestCloseWhileAccepting(t *testing.T) {
transA, closeA := newTest(t, "A")
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
_, err := transA.AcceptPeer(getPeer(transA))
assert.Error(t, err)
}()
time.Sleep(graceTime)
closeA()
wg.Wait()
}
func TestUnansweredDial(t *testing.T) {
transA, closeA := newTest(t, "A")
defer closeA()
// create peer with invalid gossip address
services := getPeer(transA).Services().CreateRecord()
services.Update(service.GossipKey, "tcp", ":0")
unreachablePeer := peer.NewPeer(getPeer(transA).PublicKey(), services)
_, err := transA.DialPeer(unreachablePeer)
assert.Error(t, err)
}
func TestNoHandshakeResponse(t *testing.T) {
transA, closeA := newTest(t, "A")
defer closeA()
// accept and read incoming connections
lis, err := net.Listen("tcp", ":0")
require.NoError(t, err)
go func() {
conn, err := lis.Accept()
require.NoError(t, err)
n, _ := conn.Read(make([]byte, MaxPacketSize))
assert.NotZero(t, n)
_ = conn.Close()
_ = lis.Close()
}()
// create peer for the listener
services := getPeer(transA).Services().CreateRecord()
services.Update(service.GossipKey, lis.Addr().Network(), lis.Addr().String())
p := peer.NewPeer(getPeer(transA).PublicKey(), services)
_, err = transA.DialPeer(p)
assert.Error(t, err)
}
func TestNoHandshakeRequest(t *testing.T) {
transA, closeA := newTest(t, "A")
defer closeA()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
_, err := transA.AcceptPeer(getPeer(transA))
assert.Error(t, err)
}()
time.Sleep(graceTime)
conn, err := net.Dial(transA.LocalAddr().Network(), transA.LocalAddr().String())
require.NoError(t, err)
time.Sleep(handshakeTimeout)
_ = conn.Close()
wg.Wait()
}
func TestConnect(t *testing.T) {
transA, closeA := newTest(t, "A")
defer closeA()
transB, closeB := newTest(t, "B")
defer closeB()
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
c, err := transA.AcceptPeer(getPeer(transB))
assert.NoError(t, err)
if assert.NotNil(t, c) {
c.Close()
}
}()
time.Sleep(graceTime)
go func() {
defer wg.Done()
c, err := transB.DialPeer(getPeer(transA))
assert.NoError(t, err)
if assert.NotNil(t, c) {
c.Close()
}
}()
wg.Wait()
}
func TestWrongConnect(t *testing.T) {
transA, closeA := newTest(t, "A")
defer closeA()
transB, closeB := newTest(t, "B")
defer closeB()
transC, closeC := newTest(t, "C")
defer closeC()
var wg sync.WaitGroup
wg.Add(2)
// a expects connection from B, but C is connecting
go func() {
defer wg.Done()
_, err := transA.AcceptPeer(getPeer(transB))
assert.Error(t, err)
}()
go func() {
defer wg.Done()
_, err := transC.DialPeer(getPeer(transA))
assert.Error(t, err)
}()
wg.Wait()
}
package autopeering
import (
"net"
"github.com/iotaledger/autopeering-sim/discover"
"github.com/iotaledger/autopeering-sim/selection"
"github.com/iotaledger/goshimmer/packages/gossip/neighbor"
"github.com/iotaledger/goshimmer/plugins/gossip"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
......@@ -28,26 +26,26 @@ func run(plugin *node.Plugin) {
}
func configureLogging(plugin *node.Plugin) {
gossip.Events.RemoveNeighbor.Attach(events.NewClosure(func(peer *gossip.Neighbor) {
gossip.Events.DropNeighbor.Attach(events.NewClosure(func(peer *neighbor.Neighbor) {
Selection.DropPeer(peer.Peer)
}))
selection.Events.Dropped.Attach(events.NewClosure(func(ev *selection.DroppedEvent) {
log.Debug("neighbor removed: " + ev.DroppedID.String())
gossip.RemoveNeighbor(ev.DroppedID.String())
}))
selection.Events.IncomingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) {
log.Debug("accepted neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String())
address, port, _ := net.SplitHostPort(ev.Services["gossip"].Address)
gossip.AddNeighbor(gossip.NewNeighbor(ev.Peer, address, port))
}))
selection.Events.OutgoingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) {
log.Debug("chosen neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String())
address, port, _ := net.SplitHostPort(ev.Services["gossip"].Address)
gossip.AddNeighbor(gossip.NewNeighbor(ev.Peer, address, port))
}))
// selection.Events.Dropped.Attach(events.NewClosure(func(ev *selection.DroppedEvent) {
// log.Debug("neighbor removed: " + ev.DroppedID.String())
// gossip.RemoveNeighbor(ev.DroppedID.String())
// }))
// selection.Events.IncomingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) {
// log.Debug("accepted neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String())
// address, port, _ := net.SplitHostPort(ev.Services["gossip"].Address)
// gossip.AddNeighbor(gossip.NewNeighbor(ev.Peer, address, port))
// }))
// selection.Events.OutgoingPeering.Attach(events.NewClosure(func(ev *selection.PeeringEvent) {
// log.Debug("chosen neighbor added: " + ev.Peer.Address() + " / " + ev.Peer.String())
// address, port, _ := net.SplitHostPort(ev.Services["gossip"].Address)
// gossip.AddNeighbor(gossip.NewNeighbor(ev.Peer, address, port))
// }))
discover.Events.PeerDiscovered.Attach(events.NewClosure(func(ev *discover.DiscoveredEvent) {
log.Info("new peer discovered: " + ev.Peer.Address() + " / " + ev.Peer.ID().String())
......
package gossip_on_solidification
import (
"github.com/iotaledger/goshimmer/packages/model/value_transaction"
"github.com/iotaledger/goshimmer/plugins/gossip"
"github.com/iotaledger/goshimmer/plugins/tangle"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/node"
)
var PLUGIN = node.NewPlugin("Gossip On Solidification", node.Enabled, func(plugin *node.Plugin) {
tangle.Events.TransactionSolid.Attach(events.NewClosure(func(tx *value_transaction.ValueTransaction) {
gossip.SendTransaction(tx.MetaTransaction)
}))
})
package gossip
import "github.com/iotaledger/goshimmer/packages/errors"
var (
ErrConnectionFailed = errors.Wrap(errors.New("connection error"), "could not connect to neighbor")
ErrInvalidAuthenticationMessage = errors.Wrap(errors.New("protocol error"), "invalid authentication message")
ErrInvalidIdentity = errors.Wrap(errors.New("protocol error"), "invalid identity message")
ErrInvalidStateTransition = errors.New("protocol error: invalid state transition message")
ErrSendFailed = errors.Wrap(errors.New("protocol error"), "failed to send message")
ErrInvalidSendParam = errors.New("invalid parameter passed to send")
)
package gossip
import (
"github.com/iotaledger/goshimmer/packages/errors"
"github.com/iotaledger/goshimmer/packages/identity"
"github.com/iotaledger/goshimmer/packages/model/meta_transaction"
"github.com/iotaledger/goshimmer/packages/network"
"github.com/iotaledger/hive.go/events"
)
var Events = pluginEvents{
// neighbor events
AddNeighbor: events.NewEvent(neighborCaller),
UpdateNeighbor: events.NewEvent(neighborCaller),
RemoveNeighbor: events.NewEvent(neighborCaller),
// low level network events
IncomingConnection: events.NewEvent(connectionCaller),
// high level protocol events
DropNeighbor: events.NewEvent(neighborCaller),
SendTransaction: events.NewEvent(transactionCaller),
SendTransactionRequest: events.NewEvent(transactionCaller), // TODO
ReceiveTransaction: events.NewEvent(transactionCaller),
ReceiveTransactionRequest: events.NewEvent(transactionCaller), // TODO
ProtocolError: events.NewEvent(transactionCaller), // TODO
// generic events
Error: events.NewEvent(errorCaller),
}
type pluginEvents struct {
// neighbor events
AddNeighbor *events.Event
UpdateNeighbor *events.Event
RemoveNeighbor *events.Event
// low level network events
IncomingConnection *events.Event
// high level protocol events
DropNeighbor *events.Event
SendTransaction *events.Event
SendTransactionRequest *events.Event
ReceiveTransaction *events.Event
ReceiveTransactionRequest *events.Event
ProtocolError *events.Event
// generic events
Error *events.Event
}
type protocolEvents struct {
ReceiveVersion *events.Event
ReceiveIdentification *events.Event
ReceiveConnectionAccepted *events.Event
ReceiveConnectionRejected *events.Event
ReceiveDropConnection *events.Event
ReceiveTransactionData *events.Event
ReceiveRequestData *events.Event
HandshakeCompleted *events.Event
Error *events.Event
}
type neighborEvents struct {
ProtocolConnectionEstablished *events.Event
}
func intCaller(handler interface{}, params ...interface{}) { handler.(func(int))(params[0].(int)) }
func identityCaller(handler interface{}, params ...interface{}) {
handler.(func(*identity.Identity))(params[0].(*identity.Identity))
}
func connectionCaller(handler interface{}, params ...interface{}) {
handler.(func(*network.ManagedConnection))(params[0].(*network.ManagedConnection))
}
func protocolCaller(handler interface{}, params ...interface{}) {
handler.(func(*protocol))(params[0].(*protocol))
}
func neighborCaller(handler interface{}, params ...interface{}) {
handler.(func(*Neighbor))(params[0].(*Neighbor))
}
func errorCaller(handler interface{}, params ...interface{}) {
handler.(func(errors.IdentifiableError))(params[0].(errors.IdentifiableError))
}
func dataCaller(handler interface{}, params ...interface{}) {
handler.(func([]byte))(params[0].([]byte))
}
func transactionCaller(handler interface{}, params ...interface{}) {
handler.(func(*meta_transaction.MetaTransaction))(params[0].(*meta_transaction.MetaTransaction))
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment