Skip to content
Snippets Groups Projects
Commit 39237b7c authored by Hans Moog's avatar Hans Moog
Browse files

Feat: gossip protocol nearly done

parent cb919b95
Branches
Tags
No related merge requests found
......@@ -61,7 +61,7 @@ func setupHooks(conn *network.ManagedConnection, eventDispatchers *EventDispatch
// define hooks ////////////////////////////////////////////////////////////////////////////////////////////////////
onDiscoverPeer := events.NewClosure(func(p *peer.Peer) {
eventDispatchers.AddNode(p.Identity.Identifier)
go eventDispatchers.AddNode(p.Identity.Identifier)
})
onAddAcceptedNeighbor := events.NewClosure(func(p *peer.Peer) {
......
......@@ -37,6 +37,11 @@ func run(plugin *node.Plugin) {
}
func configureLogging(plugin *node.Plugin) {
gossip.Events.RemoveNeighbor.Attach(events.NewClosure(func(peer *gossip.Peer) {
chosenneighbors.INSTANCE.Remove(peer.Identity.StringIdentifier)
acceptedneighbors.INSTANCE.Remove(peer.Identity.StringIdentifier)
}))
acceptedneighbors.INSTANCE.Events.Add.Attach(events.NewClosure(func(p *peer.Peer) {
plugin.LogDebug("accepted neighbor added: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
......
......@@ -62,14 +62,20 @@ func (this *PeerRegister) Lock() func() {
}
func (this *PeerRegister) Remove(key string, lock... bool) {
if len(lock) == 0 || lock[0] {
defer this.Lock()()
}
if peerEntry, exists := this.Peers[key]; exists {
delete(this.Peers, key)
if len(lock) == 0 || lock[0] {
defer this.Lock()()
if peerEntry, exists := this.Peers[key]; exists {
delete(this.Peers, key)
this.Events.Remove.Trigger(peerEntry)
}
} else {
delete(this.Peers, key)
this.Events.Remove.Trigger(peerEntry)
this.Events.Remove.Trigger(peerEntry)
}
}
}
......
......@@ -2,6 +2,7 @@ package gossip
import (
"github.com/iotaledger/goshimmer/packages/events"
"github.com/iotaledger/goshimmer/packages/identity"
"github.com/iotaledger/goshimmer/packages/network"
"github.com/iotaledger/goshimmer/packages/transaction"
)
......@@ -61,6 +62,8 @@ type protocolEvents struct {
func intCaller(handler interface{}, params ...interface{}) { handler.(func(int))(params[0].(int)) }
func identityCaller(handler interface{}, params ...interface{}) { handler.(func(*identity.Identity))(params[0].(*identity.Identity)) }
func connectionCaller(handler interface{}, params ...interface{}) { handler.(func(*network.ManagedConnection))(params[0].(*network.ManagedConnection)) }
func peerCaller(handler interface{}, params ...interface{}) { handler.(func(*Peer))(params[0].(*Peer)) }
......
......@@ -7,9 +7,11 @@ import (
"github.com/iotaledger/goshimmer/packages/identity"
"github.com/iotaledger/goshimmer/packages/network"
"github.com/iotaledger/goshimmer/packages/node"
"math"
"net"
"strconv"
"sync"
"time"
)
func configureNeighbors(plugin *node.Plugin) {
......@@ -30,6 +32,9 @@ func runNeighbors(plugin *node.Plugin) {
plugin.LogInfo("Starting Neighbor Connection Manager ...")
neighborLock.RLock()
for _, neighbor := range GetNeighbors() {
manageConnection(plugin, neighbor)
}
neighborLock.RUnlock()
Events.AddNeighbor.Attach(events.NewClosure(func(neighbor *Peer) {
......@@ -39,21 +44,46 @@ func runNeighbors(plugin *node.Plugin) {
plugin.LogSuccess("Starting Neighbor Connection Manager ... done")
}
func manageConnection(plugin *node.Plugin, neighbor *Peer, initiated bool) {
func manageConnection(plugin *node.Plugin, neighbor *Peer) {
daemon.BackgroundWorker(func() {
failedConnectionAttempts := 0
for failedConnectionAttempts < MAX_CONNECTION_ATTEMPTS {
for failedConnectionAttempts < CONNECTION_MAX_ATTEMPTS {
if neighbor, exists := GetNeighbor(neighbor.Identity.StringIdentifier); !exists {
return
} else {
if conn, newConnection, err := neighbor.Connect(); err != nil {
failedConnectionAttempts++
plugin.LogFailure("connection attempt [" + strconv.Itoa(int(failedConnectionAttempts)) + "/" + strconv.Itoa(MAX_CONNECTION_ATTEMPTS) + "] " + err.Error())
plugin.LogFailure("connection attempt [" + strconv.Itoa(int(failedConnectionAttempts)) + "/" + strconv.Itoa(CONNECTION_MAX_ATTEMPTS) + "] " + err.Error())
if failedConnectionAttempts <= CONNECTION_MAX_ATTEMPTS {
select {
case <-daemon.ShutdownSignal:
return
case <-time.After(time.Duration(int(math.Pow(2, float64(failedConnectionAttempts-1)))) * CONNECTION_BASE_TIMEOUT):
// continue
}
}
} else {
failedConnectionAttempts = 0
disconnectChan := make(chan int, 1)
conn.Events.Close.Attach(events.NewClosure(func() {
close(disconnectChan)
}))
if newConnection {
newProtocol(conn).init()
go newProtocol(conn).init()
}
select {
case <-daemon.ShutdownSignal:
return
case <-disconnectChan:
break
}
}
}
......@@ -69,8 +99,8 @@ type Peer struct {
Port uint16
InitiatedConn *network.ManagedConnection
AcceptedConn *network.ManagedConnection
initiatedConnMutex sync.Mutex
acceptedConnMutex sync.Mutex
initiatedConnMutex sync.RWMutex
acceptedConnMutex sync.RWMutex
}
func UnmarshalPeer(data []byte) (*Peer, error) {
......@@ -78,41 +108,42 @@ func UnmarshalPeer(data []byte) (*Peer, error) {
}
func (peer *Peer) Connect() (*network.ManagedConnection, bool, errors.IdentifiableError) {
peer.initiatedConnMutex.Lock()
defer peer.initiatedConnMutex.Unlock()
// return existing connections first
if peer.InitiatedConn != nil {
return peer.InitiatedConn, false, nil
}
// if we already have an accepted connection -> use it instead
if peer.AcceptedConn != nil {
peer.acceptedConnMutex.Lock()
peer.acceptedConnMutex.RLock()
if peer.AcceptedConn != nil {
defer peer.acceptedConnMutex.Unlock()
defer peer.acceptedConnMutex.RUnlock()
return peer.AcceptedConn, false, nil
}
peer.acceptedConnMutex.Unlock()
peer.acceptedConnMutex.RUnlock()
}
// otherwise try to dial
peer.initiatedConnMutex.Lock()
defer peer.initiatedConnMutex.Unlock()
if peer.InitiatedConn != nil {
return peer.InitiatedConn, false, nil
} else {
conn, err := net.Dial("tcp", peer.Address.String()+":"+strconv.Itoa(int(peer.Port)))
if err != nil {
return nil, false, ErrConnectionFailed.Derive(err, "error when connecting to neighbor "+
peer.Identity.StringIdentifier+"@"+peer.Address.String()+":"+strconv.Itoa(int(peer.Port)))
}
conn, err := net.Dial("tcp", peer.Address.String()+":"+strconv.Itoa(int(peer.Port)))
if err != nil {
return nil, false, ErrConnectionFailed.Derive(err, "error when connecting to neighbor "+
peer.Identity.StringIdentifier+"@"+peer.Address.String()+":"+strconv.Itoa(int(peer.Port)))
}
peer.InitiatedConn = network.NewManagedConnection(conn)
peer.InitiatedConn = network.NewManagedConnection(conn)
peer.InitiatedConn.Events.Close.Attach(events.NewClosure(func() {
peer.initiatedConnMutex.Lock()
defer peer.initiatedConnMutex.Unlock()
peer.InitiatedConn.Events.Close.Attach(events.NewClosure(func() {
peer.initiatedConnMutex.Lock()
defer peer.initiatedConnMutex.Unlock()
peer.InitiatedConn = nil
}))
peer.InitiatedConn = nil
}))
return peer.InitiatedConn, true, nil
}
return peer.InitiatedConn, true, nil
}
func (peer *Peer) Marshal() []byte {
......@@ -144,13 +175,15 @@ func AddNeighbor(newNeighbor *Peer) {
}
func RemoveNeighbor(identifier string) {
neighborLock.Lock()
defer neighborLock.Lock()
if _, exists := neighbors[identifier]; exists {
neighborLock.Lock()
defer neighborLock.Unlock()
if neighbor, exists := neighbors[identifier]; exists {
delete(neighbors, identifier)
if neighbor, exists := neighbors[identifier]; exists {
delete(neighbors, identifier)
Events.RemoveNeighbor.Trigger(neighbor)
Events.RemoveNeighbor.Trigger(neighbor)
}
}
}
......@@ -176,7 +209,8 @@ func GetNeighbors() map[string]*Peer {
}
const (
MAX_CONNECTION_ATTEMPTS = 5
CONNECTION_MAX_ATTEMPTS = 5
CONNECTION_BASE_TIMEOUT = 10 * time.Second
MARSHALLED_NEIGHBOR_TOTAL_SIZE = 1
)
......
package gossip
import (
"fmt"
"github.com/iotaledger/goshimmer/packages/accountability"
"github.com/iotaledger/goshimmer/packages/errors"
"github.com/iotaledger/goshimmer/packages/events"
......@@ -33,7 +32,7 @@ func newProtocol(conn *network.ManagedConnection) *protocol {
CurrentState: &versionState{},
Events: protocolEvents{
ReceiveVersion: events.NewEvent(intCaller),
ReceiveIdentification: events.NewEvent(peerCaller),
ReceiveIdentification: events.NewEvent(identityCaller),
},
}
......@@ -43,8 +42,6 @@ func newProtocol(conn *network.ManagedConnection) *protocol {
func (protocol *protocol) init() {
var onClose, onReceiveData *events.Closure
fmt.Println("INIT")
onReceiveData = events.NewClosure(protocol.parseData)
onClose = events.NewClosure(func() {
protocol.Conn.Events.ReceiveData.Detach(onReceiveData)
......@@ -53,26 +50,15 @@ func (protocol *protocol) init() {
protocol.Conn.Events.ReceiveData.Attach(onReceiveData)
protocol.Conn.Events.Close.Attach(onClose)
protocol.Events.ReceiveVersion.Attach(events.NewClosure(func(version int) {
fmt.Println(version)
}))
protocol.Events.ReceiveIdentification.Attach(events.NewClosure(func(neighbor *Peer) {
fmt.Println(neighbor)
}))
protocol.Conn.Write([]byte{1})
fmt.Println("SENT VERSION")
protocol.Conn.Write(accountability.OWN_ID.Identifier)
fmt.Println(len(accountability.OWN_ID.Identifier))
if signature, err := accountability.OWN_ID.Sign(accountability.OWN_ID.Identifier); err == nil {
protocol.Conn.Write(signature)
fmt.Println(len(signature))
}
fmt.Println("SENTSIGNATURE")
go protocol.Conn.Read(make([]byte, 1000))
protocol.Conn.Read(make([]byte, 1000))
}
func (protocol *protocol) parseData(data []byte) {
......
......@@ -32,18 +32,12 @@ func (state *indentificationStateV1) Consume(protocol *protocol, data []byte, of
return bytesRead, ErrInvalidAuthenticationMessage.Derive(err, "invalid authentication message")
} else {
if neighbor, exists := GetNeighbor(receivedIdentity.StringIdentifier); exists {
neighbor.initiatedConnMutex.Lock()
if neighbor.InitiatedConn == nil {
neighbor.InitiatedConn = protocol.Conn
}
neighbor.initiatedConnMutex.Unlock()
protocol.Neighbor = neighbor
} else {
protocol.Neighbor = nil
}
protocol.Events.ReceiveIdentification.Trigger(protocol.Neighbor)
protocol.Events.ReceiveIdentification.Trigger(receivedIdentity)
protocol.CurrentState = newacceptanceStateV1()
state.offset = 0
......
......@@ -3,6 +3,7 @@ package gossip
import (
"github.com/iotaledger/goshimmer/packages/daemon"
"github.com/iotaledger/goshimmer/packages/events"
"github.com/iotaledger/goshimmer/packages/identity"
"github.com/iotaledger/goshimmer/packages/network"
"github.com/iotaledger/goshimmer/packages/network/tcp"
"github.com/iotaledger/goshimmer/packages/node"
......@@ -13,7 +14,26 @@ var TCPServer = tcp.NewServer()
func configureServer(plugin *node.Plugin) {
TCPServer.Events.Connect.Attach(events.NewClosure(func(conn *network.ManagedConnection) {
newProtocol(conn).init()
protocol := newProtocol(conn)
protocol.Events.ReceiveIdentification.Attach(events.NewClosure(func(identity *identity.Identity) {
if protocol.Neighbor != nil {
protocol.Neighbor.acceptedConnMutex.Lock()
if protocol.Neighbor.AcceptedConn == nil {
protocol.Neighbor.AcceptedConn = protocol.Conn
protocol.Neighbor.AcceptedConn.Events.Close.Attach(events.NewClosure(func() {
protocol.Neighbor.acceptedConnMutex.Lock()
defer protocol.Neighbor.acceptedConnMutex.Unlock()
protocol.Neighbor.AcceptedConn = nil
}))
}
protocol.Neighbor.acceptedConnMutex.Unlock()
}
}))
go protocol.init()
}))
daemon.Events.Shutdown.Attach(events.NewClosure(func() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment