Skip to content
Snippets Groups Projects
Commit cb919b95 authored by Hans Moog's avatar Hans Moog
Browse files

Feat: intermediary commit gossip protocol

parent 17f603fd
No related branches found
No related tags found
No related merge requests found
......@@ -6,6 +6,7 @@ import (
"github.com/iotaledger/goshimmer/plugins/autopeering/parameters"
"github.com/iotaledger/goshimmer/plugins/autopeering/types/peer"
"github.com/iotaledger/goshimmer/plugins/autopeering/saltmanager"
"github.com/iotaledger/goshimmer/plugins/gossip"
"net"
)
......@@ -15,7 +16,7 @@ func Configure(plugin *node.Plugin) {
INSTANCE = &peer.Peer{
Identity: accountability.OWN_ID,
PeeringPort: uint16(*parameters.PORT.Value),
GossipPort: uint16(*parameters.PORT.Value),
GossipPort: uint16(*gossip.PORT.Value),
Address: net.IPv4(0, 0, 0, 0),
Salt: saltmanager.PUBLIC_SALT,
}
......
......@@ -12,6 +12,7 @@ import (
"github.com/iotaledger/goshimmer/plugins/autopeering/saltmanager"
"github.com/iotaledger/goshimmer/plugins/autopeering/server"
"github.com/iotaledger/goshimmer/plugins/autopeering/types/peer"
"github.com/iotaledger/goshimmer/plugins/gossip"
)
var PLUGIN = node.NewPlugin("Auto Peering", configure, run)
......@@ -37,23 +38,55 @@ func run(plugin *node.Plugin) {
func configureLogging(plugin *node.Plugin) {
acceptedneighbors.INSTANCE.Events.Add.Attach(events.NewClosure(func(p *peer.Peer) {
plugin.LogSuccess("neighbor added: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
plugin.LogDebug("accepted neighbor added: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
gossip.AddNeighbor(&gossip.Peer{
Identity: p.Identity,
Address: p.Address,
Port: p.GossipPort,
})
}))
acceptedneighbors.INSTANCE.Events.Remove.Attach(events.NewClosure(func(p *peer.Peer) {
plugin.LogSuccess("neighbor removed: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
plugin.LogDebug("accepted neighbor removed: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
gossip.RemoveNeighbor(p.Identity.StringIdentifier)
}))
chosenneighbors.INSTANCE.Events.Add.Attach(events.NewClosure(func(p *peer.Peer) {
plugin.LogSuccess("neighbor added: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
plugin.LogDebug("chosen neighbor added: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
gossip.AddNeighbor(&gossip.Peer{
Identity: p.Identity,
Address: p.Address,
Port: p.GossipPort,
})
}))
chosenneighbors.INSTANCE.Events.Remove.Attach(events.NewClosure(func(p *peer.Peer) {
plugin.LogSuccess("neighbor removed: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
plugin.LogDebug("chosen neighbor removed: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
gossip.RemoveNeighbor(p.Identity.StringIdentifier)
}))
knownpeers.INSTANCE.Events.Add.Attach(events.NewClosure(func(p *peer.Peer) {
plugin.LogInfo("peer discovered: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
plugin.LogInfo("new peer discovered: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
if _, exists := gossip.GetNeighbor(p.Identity.StringIdentifier); exists {
gossip.AddNeighbor(&gossip.Peer{
Identity: p.Identity,
Address: p.Address,
Port: p.GossipPort,
})
}
}))
knownpeers.INSTANCE.Events.Update.Attach(events.NewClosure(func(p *peer.Peer) {
plugin.LogDebug("peer updated: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
if _, exists := gossip.GetNeighbor(p.Identity.StringIdentifier); exists {
gossip.AddNeighbor(&gossip.Peer{
Identity: p.Identity,
Address: p.Address,
Port: p.GossipPort,
})
}
}))
}
......@@ -3,6 +3,8 @@ package gossip
import "github.com/iotaledger/goshimmer/packages/errors"
var (
ErrConnectionFailed = errors.Wrap(errors.New("connection error"), "could not connect to neighbor")
ErrInvalidAuthenticationMessage = errors.Wrap(errors.New("protocol error"), "invalid authentication message")
ErrInvalidIdentity = errors.Wrap(errors.New("protocol error"), "invalid identity message")
ErrInvalidStateTransition = errors.New("protocol error: invalid state transition message")
)
......@@ -2,17 +2,29 @@ package gossip
import (
"github.com/iotaledger/goshimmer/packages/events"
"github.com/iotaledger/goshimmer/packages/network"
"github.com/iotaledger/goshimmer/packages/transaction"
)
var Events = pluginEvents{
AddNeighbor: events.NewEvent(neighborCaller),
UpdateNeighbor: events.NewEvent(neighborCaller),
RemoveNeighbor: events.NewEvent(neighborCaller),
DropNeighbor: events.NewEvent(neighborCaller),
IncomingConnection: events.NewEvent(errorCaller),
ReceiveTransaction: events.NewEvent(transactionCaller),
Error: events.NewEvent(errorCaller),
// neighbor events
AddNeighbor: events.NewEvent(peerCaller),
UpdateNeighbor: events.NewEvent(peerCaller),
RemoveNeighbor: events.NewEvent(peerCaller),
// low level network events
IncomingConnection: events.NewEvent(connectionCaller),
// high level protocol events
DropNeighbor: events.NewEvent(peerCaller),
SendTransaction: events.NewEvent(transactionCaller),
SendTransactionRequest: events.NewEvent(transactionCaller), // TODO
ReceiveTransaction: events.NewEvent(transactionCaller),
ReceiveTransactionRequest: events.NewEvent(transactionCaller), // TODO
ProtocolError: events.NewEvent(transactionCaller), // TODO
// generic events
Error: events.NewEvent(errorCaller),
}
type pluginEvents struct {
......@@ -49,7 +61,9 @@ type protocolEvents struct {
func intCaller(handler interface{}, params ...interface{}) { handler.(func(int))(params[0].(int)) }
func neighborCaller(handler interface{}, params ...interface{}) { handler.(func(*Peer))(params[0].(*Peer)) }
func connectionCaller(handler interface{}, params ...interface{}) { handler.(func(*network.ManagedConnection))(params[0].(*network.ManagedConnection)) }
func peerCaller(handler interface{}, params ...interface{}) { handler.(func(*Peer))(params[0].(*Peer)) }
func errorCaller(handler interface{}, params ...interface{}) { handler.(func(error))(params[0].(error)) }
......
package gossip
import (
"github.com/iotaledger/goshimmer/packages/daemon"
"github.com/iotaledger/goshimmer/packages/errors"
"github.com/iotaledger/goshimmer/packages/events"
"github.com/iotaledger/goshimmer/packages/identity"
"github.com/iotaledger/goshimmer/packages/network"
"github.com/iotaledger/goshimmer/packages/node"
"net"
"strconv"
"sync"
)
func configureNeighbors(plugin *node.Plugin) {
Events.AddNeighbor.Attach(events.NewClosure(func(neighbor *Peer) {
plugin.LogSuccess("new neighbor added " + neighbor.Identity.StringIdentifier + "@" + neighbor.Address.String() + ":" + strconv.Itoa(int(neighbor.Port)))
}))
Events.UpdateNeighbor.Attach(events.NewClosure(func(neighbor *Peer) {
plugin.LogSuccess("existing neighbor updated " + neighbor.Identity.StringIdentifier + "@" + neighbor.Address.String() + ":" + strconv.Itoa(int(neighbor.Port)))
}))
Events.RemoveNeighbor.Attach(events.NewClosure(func(neighbor *Peer) {
plugin.LogSuccess("existing neighbor removed " + neighbor.Identity.StringIdentifier + "@" + neighbor.Address.String() + ":" + strconv.Itoa(int(neighbor.Port)))
}))
}
func runNeighbors(plugin *node.Plugin) {
plugin.LogInfo("Starting Neighbor Connection Manager ...")
neighborLock.RLock()
neighborLock.RUnlock()
Events.AddNeighbor.Attach(events.NewClosure(func(neighbor *Peer) {
manageConnection(plugin, neighbor)
}))
plugin.LogSuccess("Starting Neighbor Connection Manager ... done")
}
func manageConnection(plugin *node.Plugin, neighbor *Peer, initiated bool) {
daemon.BackgroundWorker(func() {
failedConnectionAttempts := 0
for failedConnectionAttempts < MAX_CONNECTION_ATTEMPTS {
if neighbor, exists := GetNeighbor(neighbor.Identity.StringIdentifier); !exists {
return
} else {
if conn, newConnection, err := neighbor.Connect(); err != nil {
failedConnectionAttempts++
plugin.LogFailure("connection attempt [" + strconv.Itoa(int(failedConnectionAttempts)) + "/" + strconv.Itoa(MAX_CONNECTION_ATTEMPTS) + "] " + err.Error())
} else {
if newConnection {
newProtocol(conn).init()
}
}
}
}
RemoveNeighbor(neighbor.Identity.StringIdentifier)
})
}
type Peer struct {
Identity identity.Identity
Address net.IP
Port uint16
Conn *network.ManagedConnection
Identity *identity.Identity
Address net.IP
Port uint16
InitiatedConn *network.ManagedConnection
AcceptedConn *network.ManagedConnection
initiatedConnMutex sync.Mutex
acceptedConnMutex sync.Mutex
}
func UnmarshalPeer(data []byte) (*Peer, error) {
return &Peer{}, nil
}
func (peer *Peer) Connect() (*network.ManagedConnection, bool, errors.IdentifiableError) {
// if we already have an accepted connection -> use it instead
if peer.AcceptedConn != nil {
peer.acceptedConnMutex.Lock()
if peer.AcceptedConn != nil {
defer peer.acceptedConnMutex.Unlock()
return peer.AcceptedConn, false, nil
}
peer.acceptedConnMutex.Unlock()
}
// otherwise try to dial
peer.initiatedConnMutex.Lock()
defer peer.initiatedConnMutex.Unlock()
if peer.InitiatedConn != nil {
return peer.InitiatedConn, false, nil
} else {
conn, err := net.Dial("tcp", peer.Address.String()+":"+strconv.Itoa(int(peer.Port)))
if err != nil {
return nil, false, ErrConnectionFailed.Derive(err, "error when connecting to neighbor "+
peer.Identity.StringIdentifier+"@"+peer.Address.String()+":"+strconv.Itoa(int(peer.Port)))
}
peer.InitiatedConn = network.NewManagedConnection(conn)
peer.InitiatedConn.Events.Close.Attach(events.NewClosure(func() {
peer.initiatedConnMutex.Lock()
defer peer.initiatedConnMutex.Unlock()
peer.InitiatedConn = nil
}))
return peer.InitiatedConn, true, nil
}
}
func (peer *Peer) Marshal() []byte {
return nil
}
......@@ -29,7 +126,7 @@ func (peer *Peer) Equals(other *Peer) bool {
func AddNeighbor(newNeighbor *Peer) {
neighborLock.Lock()
defer neighborLock.Lock()
defer neighborLock.Unlock()
if neighbor, exists := neighbors[newNeighbor.Identity.StringIdentifier]; !exists {
neighbors[newNeighbor.Identity.StringIdentifier] = newNeighbor
......@@ -57,11 +154,11 @@ func RemoveNeighbor(identifier string) {
}
}
func GetNeighbor(neighbor *Peer) (*Peer, bool) {
func GetNeighbor(identifier string) (*Peer, bool) {
neighborLock.RLock()
defer neighborLock.RUnlock()
neighbor, exists := neighbors[neighbor.Identity.StringIdentifier]
neighbor, exists := neighbors[identifier]
return neighbor, exists
}
......@@ -79,6 +176,7 @@ func GetNeighbors() map[string]*Peer {
}
const (
MAX_CONNECTION_ATTEMPTS = 5
MARSHALLED_NEIGHBOR_TOTAL_SIZE = 1
)
......
......@@ -5,9 +5,11 @@ import "github.com/iotaledger/goshimmer/packages/node"
var PLUGIN = node.NewPlugin("Gossip", configure, run)
func configure(plugin *node.Plugin) {
configureNeighbors(plugin)
configureServer(plugin)
}
func run(plugin *node.Plugin) {
runNeighbors(plugin)
runServer(plugin)
}
package gossip
import (
"fmt"
"github.com/iotaledger/goshimmer/packages/accountability"
"github.com/iotaledger/goshimmer/packages/errors"
"github.com/iotaledger/goshimmer/packages/events"
"github.com/iotaledger/goshimmer/packages/network"
"strconv"
)
......@@ -17,31 +20,69 @@ type protocolState interface {
// region protocol /////////////////////////////////////////////////////////////////////////////////////////////////////
type protocol struct {
Conn *network.ManagedConnection
Neighbor *Peer
Version int
CurrentState protocolState
Events protocolEvents
neighbor *Peer
currentState protocolState
}
func newProtocol(neighbor *Peer) *protocol {
func newProtocol(conn *network.ManagedConnection) *protocol {
protocol := &protocol{
Conn: conn,
CurrentState: &versionState{},
Events: protocolEvents{
ReceiveVersion: events.NewEvent(intCaller),
ReceiveVersion: events.NewEvent(intCaller),
ReceiveIdentification: events.NewEvent(peerCaller),
},
neighbor: neighbor,
currentState: &versionState{},
}
return protocol
}
func (protocol *protocol) init() {
var onClose, onReceiveData *events.Closure
fmt.Println("INIT")
onReceiveData = events.NewClosure(protocol.parseData)
onClose = events.NewClosure(func() {
protocol.Conn.Events.ReceiveData.Detach(onReceiveData)
protocol.Conn.Events.Close.Detach(onClose)
})
protocol.Conn.Events.ReceiveData.Attach(onReceiveData)
protocol.Conn.Events.Close.Attach(onClose)
protocol.Events.ReceiveVersion.Attach(events.NewClosure(func(version int) {
fmt.Println(version)
}))
protocol.Events.ReceiveIdentification.Attach(events.NewClosure(func(neighbor *Peer) {
fmt.Println(neighbor)
}))
protocol.Conn.Write([]byte{1})
fmt.Println("SENT VERSION")
protocol.Conn.Write(accountability.OWN_ID.Identifier)
fmt.Println(len(accountability.OWN_ID.Identifier))
if signature, err := accountability.OWN_ID.Sign(accountability.OWN_ID.Identifier); err == nil {
protocol.Conn.Write(signature)
fmt.Println(len(signature))
}
fmt.Println("SENTSIGNATURE")
go protocol.Conn.Read(make([]byte, 1000))
}
func (protocol *protocol) parseData(data []byte) {
offset := 0
length := len(data)
for offset < length && protocol.currentState != nil {
if readBytes, err := protocol.currentState.Consume(protocol, data, offset, length); err != nil {
for offset < length && protocol.CurrentState != nil {
if readBytes, err := protocol.CurrentState.Consume(protocol, data, offset, length); err != nil {
Events.Error.Trigger(err)
protocol.neighbor.Conn.Close()
protocol.Neighbor.InitiatedConn.Close()
return
} else {
......@@ -59,9 +100,10 @@ type versionState struct{}
func (state *versionState) Consume(protocol *protocol, data []byte, offset int, length int) (int, errors.IdentifiableError) {
switch data[offset] {
case 1:
protocol.Version = 1
protocol.Events.ReceiveVersion.Trigger(1)
protocol.currentState = newIndentificationStateV1()
protocol.CurrentState = newIndentificationStateV1()
return 1, nil
......
package gossip
import (
"bytes"
"github.com/iotaledger/goshimmer/packages/byteutils"
"github.com/iotaledger/goshimmer/packages/errors"
"github.com/iotaledger/goshimmer/packages/identity"
"github.com/iotaledger/goshimmer/packages/transaction"
"strconv"
)
......@@ -16,7 +18,7 @@ type indentificationStateV1 struct {
func newIndentificationStateV1() *indentificationStateV1 {
return &indentificationStateV1{
buffer: make([]byte, MARSHALLED_NEIGHBOR_TOTAL_SIZE),
buffer: make([]byte, MARSHALLED_IDENTITY_TOTAL_SIZE),
offset: 0,
}
}
......@@ -25,16 +27,25 @@ func (state *indentificationStateV1) Consume(protocol *protocol, data []byte, of
bytesRead := byteutils.ReadAvailableBytesToBuffer(state.buffer, state.offset, data, offset, length)
state.offset += bytesRead
if state.offset == MARSHALLED_NEIGHBOR_TOTAL_SIZE {
if unmarshalledNeighbor, err := UnmarshalPeer(state.buffer); err != nil {
if state.offset == MARSHALLED_IDENTITY_TOTAL_SIZE {
if receivedIdentity, err := unmarshalIdentity(state.buffer); err != nil {
return bytesRead, ErrInvalidAuthenticationMessage.Derive(err, "invalid authentication message")
} else {
protocol.neighbor.Identity = unmarshalledNeighbor.Identity
protocol.neighbor.Port = unmarshalledNeighbor.Port
if neighbor, exists := GetNeighbor(receivedIdentity.StringIdentifier); exists {
neighbor.initiatedConnMutex.Lock()
if neighbor.InitiatedConn == nil {
neighbor.InitiatedConn = protocol.Conn
}
neighbor.initiatedConnMutex.Unlock()
protocol.Events.ReceiveIdentification.Trigger(protocol.neighbor)
protocol.Neighbor = neighbor
} else {
protocol.Neighbor = nil
}
protocol.currentState = newacceptanceStateV1()
protocol.Events.ReceiveIdentification.Trigger(protocol.Neighbor)
protocol.CurrentState = newacceptanceStateV1()
state.offset = 0
}
}
......@@ -42,6 +53,20 @@ func (state *indentificationStateV1) Consume(protocol *protocol, data []byte, of
return bytesRead, nil
}
func unmarshalIdentity(data []byte) (*identity.Identity, error) {
identifier := data[MARSHALLED_IDENTITY_START:MARSHALLED_IDENTITY_END]
if restoredIdentity, err := identity.FromSignedData(identifier, data[MARSHALLED_IDENTITY_SIGNATURE_START:MARSHALLED_IDENTITY_SIGNATURE_END]); err != nil {
return nil, err
} else {
if bytes.Equal(identifier, restoredIdentity.Identifier) {
return restoredIdentity, nil
} else {
return nil, errors.New("signature does not match claimed identity")
}
}
}
//endregion ////////////////////////////////////////////////////////////////////////////////////////////////////////////
//region acceptanceStateV1 /////////////////////////////////////////////////////////////////////////////////////////////
......@@ -57,14 +82,16 @@ func (state *acceptanceStateV1) Consume(protocol *protocol, data []byte, offset
case 0:
protocol.Events.RejectConnection.Trigger()
protocol.neighbor.Conn.Close()
protocol.currentState = nil
RemoveNeighbor(protocol.Neighbor.Identity.StringIdentifier)
protocol.Neighbor.InitiatedConn.Close()
protocol.CurrentState = nil
break
case 1:
protocol.Events.AcceptConnection.Trigger()
protocol.currentState = newDispatchStateV1()
protocol.CurrentState = newDispatchStateV1()
break
default:
......@@ -89,15 +116,15 @@ func (state *dispatchStateV1) Consume(protocol *protocol, data []byte, offset in
case 0:
protocol.Events.RejectConnection.Trigger()
protocol.neighbor.Conn.Close()
protocol.currentState = nil
protocol.Neighbor.InitiatedConn.Close()
protocol.CurrentState = nil
case 1:
protocol.currentState = newTransactionStateV1()
protocol.CurrentState = newTransactionStateV1()
break
case 2:
protocol.currentState = newRequestStateV1()
protocol.CurrentState = newRequestStateV1()
break
default:
......@@ -136,7 +163,7 @@ func (state *transactionStateV1) Consume(protocol *protocol, data []byte, offset
Events.ReceiveTransaction.Trigger(transaction.FromBytes(transactionData))
}()
protocol.currentState = newDispatchStateV1()
protocol.CurrentState = newDispatchStateV1()
state.offset = 0
}
......@@ -164,3 +191,16 @@ func (state *requestStateV1) Consume(protocol *protocol, data []byte, offset int
}
//endregion ////////////////////////////////////////////////////////////////////////////////////////////////////////////
const (
MARSHALLED_IDENTITY_START = 0
MARSHALLED_IDENTITY_SIGNATURE_START = MARSHALLED_IDENTITY_END
MARSHALLED_IDENTITY_SIZE = 20
MARSHALLED_IDENTITY_SIGNATURE_SIZE = 65
MARSHALLED_IDENTITY_END = MARSHALLED_IDENTITY_START + MARSHALLED_IDENTITY_SIZE
MARSHALLED_IDENTITY_SIGNATURE_END = MARSHALLED_IDENTITY_SIGNATURE_START + MARSHALLED_IDENTITY_SIGNATURE_SIZE
MARSHALLED_IDENTITY_TOTAL_SIZE = MARSHALLED_IDENTITY_SIGNATURE_END
)
\ No newline at end of file
......@@ -6,7 +6,6 @@ import (
"github.com/iotaledger/goshimmer/packages/network"
"github.com/iotaledger/goshimmer/packages/network/tcp"
"github.com/iotaledger/goshimmer/packages/node"
"net"
"strconv"
)
......@@ -14,26 +13,7 @@ var TCPServer = tcp.NewServer()
func configureServer(plugin *node.Plugin) {
TCPServer.Events.Connect.Attach(events.NewClosure(func(conn *network.ManagedConnection) {
neighbor := &Peer{
Address: conn.RemoteAddr().(*net.TCPAddr).IP,
}
protocol := newProtocol(neighbor)
var onClose, onReceiveData *events.Closure
onReceiveData = events.NewClosure(func(data []byte) {
protocol.parseData(data)
})
onClose = events.NewClosure(func() {
conn.Events.ReceiveData.Detach(onReceiveData)
conn.Events.Close.Detach(onClose)
})
conn.Events.ReceiveData.Attach(onReceiveData)
conn.Events.Close.Attach(onClose)
go conn.Read(make([]byte, 1000))
newProtocol(conn).init()
}))
daemon.Events.Shutdown.Attach(events.NewClosure(func() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment