Skip to content
Snippets Groups Projects
Commit e07fd604 authored by Hans Moog's avatar Hans Moog
Browse files

Feat: added a sendqueue to the gossip module

parent 1a4797b2
No related branches found
No related tags found
No related merge requests found
......@@ -37,7 +37,7 @@ func run(plugin *node.Plugin) {
}
func configureLogging(plugin *node.Plugin) {
gossip.Events.RemoveNeighbor.Attach(events.NewClosure(func(peer *gossip.Peer) {
gossip.Events.RemoveNeighbor.Attach(events.NewClosure(func(peer *gossip.Neighbor) {
chosenneighbors.INSTANCE.Remove(peer.Identity.StringIdentifier)
acceptedneighbors.INSTANCE.Remove(peer.Identity.StringIdentifier)
}))
......@@ -45,11 +45,7 @@ func configureLogging(plugin *node.Plugin) {
acceptedneighbors.INSTANCE.Events.Add.Attach(events.NewClosure(func(p *peer.Peer) {
plugin.LogDebug("accepted neighbor added: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
gossip.AddNeighbor(&gossip.Peer{
Identity: p.Identity,
Address: p.Address,
Port: p.GossipPort,
})
gossip.AddNeighbor(gossip.NewNeighbor(p.Identity, p.Address, p.GossipPort))
}))
acceptedneighbors.INSTANCE.Events.Remove.Attach(events.NewClosure(func(p *peer.Peer) {
plugin.LogDebug("accepted neighbor removed: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
......@@ -60,11 +56,7 @@ func configureLogging(plugin *node.Plugin) {
chosenneighbors.INSTANCE.Events.Add.Attach(events.NewClosure(func(p *peer.Peer) {
plugin.LogDebug("chosen neighbor added: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
gossip.AddNeighbor(&gossip.Peer{
Identity: p.Identity,
Address: p.Address,
Port: p.GossipPort,
})
gossip.AddNeighbor(gossip.NewNeighbor(p.Identity, p.Address, p.GossipPort))
}))
chosenneighbors.INSTANCE.Events.Remove.Attach(events.NewClosure(func(p *peer.Peer) {
plugin.LogDebug("chosen neighbor removed: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
......@@ -76,22 +68,14 @@ func configureLogging(plugin *node.Plugin) {
plugin.LogInfo("new peer discovered: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
if _, exists := gossip.GetNeighbor(p.Identity.StringIdentifier); exists {
gossip.AddNeighbor(&gossip.Peer{
Identity: p.Identity,
Address: p.Address,
Port: p.GossipPort,
})
gossip.AddNeighbor(gossip.NewNeighbor(p.Identity, p.Address, p.GossipPort))
}
}))
knownpeers.INSTANCE.Events.Update.Attach(events.NewClosure(func(p *peer.Peer) {
plugin.LogDebug("peer updated: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
if _, exists := gossip.GetNeighbor(p.Identity.StringIdentifier); exists {
gossip.AddNeighbor(&gossip.Peer{
Identity: p.Identity,
Address: p.Address,
Port: p.GossipPort,
})
gossip.AddNeighbor(gossip.NewNeighbor(p.Identity, p.Address, p.GossipPort))
}
}))
}
......@@ -10,15 +10,15 @@ import (
var Events = pluginEvents{
// neighbor events
AddNeighbor: events.NewEvent(peerCaller),
UpdateNeighbor: events.NewEvent(peerCaller),
RemoveNeighbor: events.NewEvent(peerCaller),
AddNeighbor: events.NewEvent(neighborCaller),
UpdateNeighbor: events.NewEvent(neighborCaller),
RemoveNeighbor: events.NewEvent(neighborCaller),
// low level network events
IncomingConnection: events.NewEvent(connectionCaller),
// high level protocol events
DropNeighbor: events.NewEvent(peerCaller),
DropNeighbor: events.NewEvent(neighborCaller),
SendTransaction: events.NewEvent(transactionCaller),
SendTransactionRequest: events.NewEvent(transactionCaller), // TODO
ReceiveTransaction: events.NewEvent(transactionCaller),
......@@ -58,16 +58,23 @@ type protocolEvents struct {
ReceiveDropConnection *events.Event
ReceiveTransactionData *events.Event
ReceiveRequestData *events.Event
HandshakeCompleted *events.Event
Error *events.Event
}
type neighborEvents struct {
ProtocolConnectionEstablished *events.Event
}
func intCaller(handler interface{}, params ...interface{}) { handler.(func(int))(params[0].(int)) }
func identityCaller(handler interface{}, params ...interface{}) { handler.(func(*identity.Identity))(params[0].(*identity.Identity)) }
func connectionCaller(handler interface{}, params ...interface{}) { handler.(func(*network.ManagedConnection))(params[0].(*network.ManagedConnection)) }
func peerCaller(handler interface{}, params ...interface{}) { handler.(func(*Peer))(params[0].(*Peer)) }
func protocolCaller(handler interface{}, params ...interface{}) { handler.(func(*protocol))(params[0].(*protocol)) }
func neighborCaller(handler interface{}, params ...interface{}) { handler.(func(*Neighbor))(params[0].(*Neighbor)) }
func errorCaller(handler interface{}, params ...interface{}) { handler.(func(errors.IdentifiableError))(params[0].(errors.IdentifiableError)) }
......
package gossip
import (
"github.com/iotaledger/goshimmer/packages/accountability"
"github.com/iotaledger/goshimmer/packages/daemon"
"github.com/iotaledger/goshimmer/packages/errors"
"github.com/iotaledger/goshimmer/packages/events"
......@@ -15,15 +16,15 @@ import (
)
func configureNeighbors(plugin *node.Plugin) {
Events.AddNeighbor.Attach(events.NewClosure(func(neighbor *Peer) {
Events.AddNeighbor.Attach(events.NewClosure(func(neighbor *Neighbor) {
plugin.LogSuccess("new neighbor added " + neighbor.Identity.StringIdentifier + "@" + neighbor.Address.String() + ":" + strconv.Itoa(int(neighbor.Port)))
}))
Events.UpdateNeighbor.Attach(events.NewClosure(func(neighbor *Peer) {
Events.UpdateNeighbor.Attach(events.NewClosure(func(neighbor *Neighbor) {
plugin.LogSuccess("existing neighbor updated " + neighbor.Identity.StringIdentifier + "@" + neighbor.Address.String() + ":" + strconv.Itoa(int(neighbor.Port)))
}))
Events.RemoveNeighbor.Attach(events.NewClosure(func(neighbor *Peer) {
Events.RemoveNeighbor.Attach(events.NewClosure(func(neighbor *Neighbor) {
plugin.LogSuccess("existing neighbor removed " + neighbor.Identity.StringIdentifier + "@" + neighbor.Address.String() + ":" + strconv.Itoa(int(neighbor.Port)))
}))
}
......@@ -37,14 +38,14 @@ func runNeighbors(plugin *node.Plugin) {
}
neighborLock.RUnlock()
Events.AddNeighbor.Attach(events.NewClosure(func(neighbor *Peer) {
Events.AddNeighbor.Attach(events.NewClosure(func(neighbor *Neighbor) {
manageConnection(plugin, neighbor)
}))
plugin.LogSuccess("Starting Neighbor Connection Manager ... done")
}
func manageConnection(plugin *node.Plugin, neighbor *Peer) {
func manageConnection(plugin *node.Plugin, neighbor *Neighbor) {
daemon.BackgroundWorker(func() {
failedConnectionAttempts := 0
......@@ -91,69 +92,99 @@ func manageConnection(plugin *node.Plugin, neighbor *Peer) {
})
}
type Peer struct {
type Neighbor struct {
Identity *identity.Identity
Address net.IP
Port uint16
InitiatedProtocol *protocol
AcceptedProtocol *protocol
Events neighborEvents
initiatedProtocolMutex sync.RWMutex
acceptedProtocolMutex sync.RWMutex
}
func UnmarshalPeer(data []byte) (*Peer, error) {
return &Peer{}, nil
func NewNeighbor(identity *identity.Identity, address net.IP, port uint16) *Neighbor {
return &Neighbor{
Identity: identity,
Address: address,
Port: port,
Events: neighborEvents{
ProtocolConnectionEstablished: events.NewEvent(protocolCaller),
},
}
}
func UnmarshalPeer(data []byte) (*Neighbor, error) {
return &Neighbor{}, nil
}
func (peer *Peer) Connect() (*protocol, bool, errors.IdentifiableError) {
peer.initiatedProtocolMutex.Lock()
defer peer.initiatedProtocolMutex.Unlock()
func (neighbor *Neighbor) Connect() (*protocol, bool, errors.IdentifiableError) {
neighbor.initiatedProtocolMutex.Lock()
defer neighbor.initiatedProtocolMutex.Unlock()
// return existing connections first
if peer.InitiatedProtocol != nil {
return peer.InitiatedProtocol, false, nil
if neighbor.InitiatedProtocol != nil {
return neighbor.InitiatedProtocol, false, nil
}
// if we already have an accepted connection -> use it instead
if peer.AcceptedProtocol != nil {
peer.acceptedProtocolMutex.RLock()
if peer.AcceptedProtocol != nil {
defer peer.acceptedProtocolMutex.RUnlock()
if neighbor.AcceptedProtocol != nil {
neighbor.acceptedProtocolMutex.RLock()
if neighbor.AcceptedProtocol != nil {
defer neighbor.acceptedProtocolMutex.RUnlock()
return peer.AcceptedProtocol, false, nil
return neighbor.AcceptedProtocol, false, nil
}
peer.acceptedProtocolMutex.RUnlock()
neighbor.acceptedProtocolMutex.RUnlock()
}
// otherwise try to dial
conn, err := net.Dial("tcp", peer.Address.String()+":"+strconv.Itoa(int(peer.Port)))
conn, err := net.Dial("tcp", neighbor.Address.String()+":"+strconv.Itoa(int(neighbor.Port)))
if err != nil {
return nil, false, ErrConnectionFailed.Derive(err, "error when connecting to neighbor "+
peer.Identity.StringIdentifier+"@"+peer.Address.String()+":"+strconv.Itoa(int(peer.Port)))
neighbor.Identity.StringIdentifier+"@"+neighbor.Address.String()+":"+strconv.Itoa(int(neighbor.Port)))
}
peer.InitiatedProtocol = newProtocol(network.NewManagedConnection(conn))
neighbor.InitiatedProtocol = newProtocol(network.NewManagedConnection(conn))
neighbor.InitiatedProtocol.Conn.Events.Close.Attach(events.NewClosure(func() {
neighbor.initiatedProtocolMutex.Lock()
defer neighbor.initiatedProtocolMutex.Unlock()
neighbor.InitiatedProtocol = nil
}))
// drop the "secondary" connection upon successful handshake
neighbor.InitiatedProtocol.Events.HandshakeCompleted.Attach(events.NewClosure(func() {
if accountability.OWN_ID.StringIdentifier <= neighbor.Identity.StringIdentifier {
neighbor.acceptedProtocolMutex.Lock()
var acceptedProtocolConn *network.ManagedConnection
if neighbor.AcceptedProtocol != nil {
acceptedProtocolConn = neighbor.AcceptedProtocol.Conn
}
neighbor.acceptedProtocolMutex.Unlock()
peer.InitiatedProtocol.Conn.Events.Close.Attach(events.NewClosure(func() {
peer.initiatedProtocolMutex.Lock()
defer peer.initiatedProtocolMutex.Unlock()
if acceptedProtocolConn != nil {
_ = acceptedProtocolConn.Close()
}
}
peer.InitiatedProtocol = nil
neighbor.Events.ProtocolConnectionEstablished.Trigger(neighbor.InitiatedProtocol)
}))
return peer.InitiatedProtocol, true, nil
return neighbor.InitiatedProtocol, true, nil
}
func (peer *Peer) Marshal() []byte {
func (neighbor *Neighbor) Marshal() []byte {
return nil
}
func (peer *Peer) Equals(other *Peer) bool {
return peer.Identity.StringIdentifier == peer.Identity.StringIdentifier &&
peer.Port == other.Port && peer.Address.String() == other.Address.String()
func (neighbor *Neighbor) Equals(other *Neighbor) bool {
return neighbor.Identity.StringIdentifier == neighbor.Identity.StringIdentifier &&
neighbor.Port == other.Port && neighbor.Address.String() == other.Address.String()
}
func AddNeighbor(newNeighbor *Peer) {
func AddNeighbor(newNeighbor *Neighbor) {
neighborLock.Lock()
defer neighborLock.Unlock()
......@@ -185,7 +216,7 @@ func RemoveNeighbor(identifier string) {
}
}
func GetNeighbor(identifier string) (*Peer, bool) {
func GetNeighbor(identifier string) (*Neighbor, bool) {
neighborLock.RLock()
defer neighborLock.RUnlock()
......@@ -194,11 +225,11 @@ func GetNeighbor(identifier string) (*Peer, bool) {
return neighbor, exists
}
func GetNeighbors() map[string]*Peer {
func GetNeighbors() map[string]*Neighbor {
neighborLock.RLock()
defer neighborLock.RUnlock()
result := make(map[string]*Peer)
result := make(map[string]*Neighbor)
for id, neighbor := range neighbors {
result[id] = neighbor
}
......@@ -211,6 +242,6 @@ const (
CONNECTION_BASE_TIMEOUT = 10 * time.Second
)
var neighbors = make(map[string]*Peer)
var neighbors = make(map[string]*Neighbor)
var neighborLock sync.RWMutex
......@@ -11,6 +11,7 @@ var PLUGIN = node.NewPlugin("Gossip", configure, run)
func configure(plugin *node.Plugin) {
configureNeighbors(plugin)
configureServer(plugin)
configureSendQueue(plugin)
configureTransactionProcessor(plugin)
Events.ReceiveTransaction.Attach(events.NewClosure(func(transaction *transaction.Transaction) {
......@@ -21,5 +22,6 @@ func configure(plugin *node.Plugin) {
func run(plugin *node.Plugin) {
runNeighbors(plugin)
runServer(plugin)
runSendQueue(plugin)
runTransactionProcessor(plugin)
}
......@@ -21,12 +21,15 @@ var DEFAULT_PROTOCOL = protocolDefinition{
type protocol struct {
Conn *network.ManagedConnection
Neighbor *Peer
Neighbor *Neighbor
Version byte
sendHandshakeCompleted bool
receiveHandshakeCompleted bool
SendState protocolState
ReceivingState protocolState
Events protocolEvents
sendMutex sync.Mutex
handshakeMutex sync.Mutex
}
func newProtocol(conn *network.ManagedConnection) *protocol {
......@@ -37,8 +40,11 @@ func newProtocol(conn *network.ManagedConnection) *protocol {
ReceiveIdentification: events.NewEvent(identityCaller),
ReceiveConnectionAccepted: events.NewEvent(events.CallbackCaller),
ReceiveConnectionRejected: events.NewEvent(events.CallbackCaller),
HandshakeCompleted: events.NewEvent(events.CallbackCaller),
Error: events.NewEvent(errorCaller),
},
sendHandshakeCompleted: false,
receiveHandshakeCompleted: false,
}
protocol.SendState = &versionState{protocol: protocol}
......@@ -50,15 +56,26 @@ func newProtocol(conn *network.ManagedConnection) *protocol {
func (protocol *protocol) Init() {
// setup event handlers
onReceiveData := events.NewClosure(protocol.Receive)
onConnectionAccepted := events.NewClosure(func() {
protocol.handshakeMutex.Lock()
defer protocol.handshakeMutex.Unlock()
protocol.receiveHandshakeCompleted = true
if protocol.sendHandshakeCompleted {
protocol.Events.HandshakeCompleted.Trigger()
}
})
var onClose *events.Closure
onClose = events.NewClosure(func() {
protocol.Conn.Events.ReceiveData.Detach(onReceiveData)
protocol.Conn.Events.Close.Detach(onClose)
protocol.Events.ReceiveConnectionAccepted.Detach(onConnectionAccepted)
})
// region register event handlers
protocol.Conn.Events.ReceiveData.Attach(onReceiveData)
protocol.Conn.Events.Close.Attach(onClose)
protocol.Events.ReceiveConnectionAccepted.Attach(onConnectionAccepted)
// send protocol version
if err := protocol.Send(DEFAULT_PROTOCOL.version); err != nil {
......
......@@ -27,6 +27,14 @@ func protocolV1(protocol *protocol) errors.IdentifiableError {
if err := protocol.Send(CONNECTION_ACCEPT); err != nil {
return
}
protocol.handshakeMutex.Lock()
defer protocol.handshakeMutex.Unlock()
protocol.sendHandshakeCompleted = true
if protocol.receiveHandshakeCompleted {
protocol.Events.HandshakeCompleted.Trigger()
}
}
})
......
package gossip
import (
"github.com/iotaledger/goshimmer/packages/daemon"
"github.com/iotaledger/goshimmer/packages/events"
"github.com/iotaledger/goshimmer/packages/node"
"github.com/iotaledger/goshimmer/packages/transaction"
"sync"
)
// region plugin module setup //////////////////////////////////////////////////////////////////////////////////////////
func configureSendQueue(plugin *node.Plugin) {
for _, neighbor := range GetNeighbors() {
setupEventHandlers(neighbor)
}
Events.AddNeighbor.Attach(events.NewClosure(setupEventHandlers))
daemon.Events.Shutdown.Attach(events.NewClosure(func() {
plugin.LogInfo("Stopping Send Queue Dispatcher ...")
}))
}
func runSendQueue(plugin *node.Plugin) {
plugin.LogInfo("Starting Send Queue Dispatcher ...")
daemon.BackgroundWorker(func() {
plugin.LogSuccess("Starting Send Queue Dispatcher ... done")
for {
select {
case <-daemon.ShutdownSignal:
plugin.LogSuccess("Stopping Send Queue Dispatcher ... done")
return
case tx := <-sendQueue:
connectedNeighborsMutex.RLock()
for _, neighborQueue := range neighborQueues {
select {
case neighborQueue.queue <- tx:
return
default:
return
}
}
connectedNeighborsMutex.RUnlock()
}
}
})
connectedNeighborsMutex.Lock()
for _, neighborQueue := range neighborQueues {
startNeighborSendQueue(neighborQueue)
}
connectedNeighborsMutex.Unlock()
}
// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
// region public api ///////////////////////////////////////////////////////////////////////////////////////////////////
func SendTransaction(transaction *transaction.Transaction) {
sendQueue <- transaction
}
func (neighbor *Neighbor) SendTransaction(transaction *transaction.Transaction) {
if queue, exists := neighborQueues[neighbor.Identity.StringIdentifier]; exists {
select {
case queue.queue <- transaction:
return
default:
return
}
}
}
// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
// region utility methods //////////////////////////////////////////////////////////////////////////////////////////////
func setupEventHandlers(neighbor *Neighbor) {
neighbor.Events.ProtocolConnectionEstablished.Attach(events.NewClosure(func(protocol *protocol) {
queue := &neighborQueue{
protocol: protocol,
queue: make(chan *transaction.Transaction, SEND_QUEUE_SIZE),
disconnectChan: make(chan int, 1),
}
connectedNeighborsMutex.Lock()
neighborQueues[neighbor.Identity.StringIdentifier] = queue
connectedNeighborsMutex.Unlock()
protocol.Conn.Events.Close.Attach(events.NewClosure(func() {
close(queue.disconnectChan)
connectedNeighborsMutex.Lock()
delete(neighborQueues, neighbor.Identity.StringIdentifier)
connectedNeighborsMutex.Unlock()
}))
if(daemon.IsRunning()) {
startNeighborSendQueue(queue)
}
}))
}
func startNeighborSendQueue(neighborQueue *neighborQueue) {
daemon.BackgroundWorker(func() {
for {
select {
case <-daemon.ShutdownSignal:
return
case <-neighborQueue.disconnectChan:
return
case tx := <-neighborQueue.queue:
switch neighborQueue.protocol.Version {
case VERSION_1:
sendTransactionV1(neighborQueue.protocol, tx)
}
}
}
})
}
// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
// region types and interfaces /////////////////////////////////////////////////////////////////////////////////////////
type neighborQueue struct {
protocol *protocol
queue chan *transaction.Transaction
disconnectChan chan int
}
// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
// region constants and variables //////////////////////////////////////////////////////////////////////////////////////
var neighborQueues = make(map[string]*neighborQueue)
var connectedNeighborsMutex sync.RWMutex
var sendQueue = make(chan *transaction.Transaction, SEND_QUEUE_SIZE)
const (
SEND_QUEUE_SIZE = 500
)
// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
package gossip
import (
"github.com/iotaledger/goshimmer/packages/accountability"
"github.com/iotaledger/goshimmer/packages/daemon"
"github.com/iotaledger/goshimmer/packages/errors"
"github.com/iotaledger/goshimmer/packages/events"
......@@ -22,7 +23,7 @@ func configureServer(plugin *node.Plugin) {
plugin.LogFailure(err.Error())
}))
// store connection in neighbor if its a neighbor calling
// store protocol in neighbor if its a neighbor calling
protocol.Events.ReceiveIdentification.Attach(events.NewClosure(func(identity *identity.Identity) {
if protocol.Neighbor != nil {
protocol.Neighbor.acceptedProtocolMutex.Lock()
......@@ -40,6 +41,24 @@ func configureServer(plugin *node.Plugin) {
}
}))
// drop the "secondary" connection upon successful handshake
protocol.Events.HandshakeCompleted.Attach(events.NewClosure(func() {
if protocol.Neighbor.Identity.StringIdentifier <= accountability.OWN_ID.StringIdentifier {
protocol.Neighbor.initiatedProtocolMutex.Lock()
var initiatedProtocolConn *network.ManagedConnection
if protocol.Neighbor.InitiatedProtocol != nil {
initiatedProtocolConn = protocol.Neighbor.InitiatedProtocol.Conn
}
protocol.Neighbor.initiatedProtocolMutex.Unlock()
if initiatedProtocolConn != nil {
_ = initiatedProtocolConn.Close()
}
}
protocol.Neighbor.Events.ProtocolConnectionEstablished.Trigger(protocol)
}))
go protocol.Init()
}))
......
package gossip
import (
"github.com/iotaledger/goshimmer/packages/daemon"
"github.com/iotaledger/goshimmer/packages/events"
"github.com/iotaledger/goshimmer/packages/filter"
"github.com/iotaledger/goshimmer/packages/node"
"github.com/iotaledger/goshimmer/packages/transaction"
......@@ -10,55 +8,20 @@ import (
var transactionFilter = filter.NewByteArrayFilter(TRANSACTION_FILTER_SIZE)
var sendQueue = make(chan *transaction.Transaction, TRANSACTION_SEND_QUEUE_SIZE)
func SendTransaction(transaction *transaction.Transaction) {
sendQueue <- transaction
}
func SendTransactionToNeighbor(neighbor *Peer, transaction *transaction.Transaction) {
switch neighbor.AcceptedProtocol.Version {
case VERSION_1:
sendTransactionV1(neighbor.AcceptedProtocol, transaction)
}
}
func processIncomingTransactionData(transactionData []byte) {
if transactionFilter.Add(transactionData) {
Events.ReceiveTransaction.Trigger(transaction.FromBytes(transactionData))
}
}
func configureTransactionProcessor(plugin *node.Plugin) {
daemon.Events.Shutdown.Attach(events.NewClosure(func() {
plugin.LogInfo("Stopping Transaction Processor ...")
}))
}
func runTransactionProcessor(plugin *node.Plugin) {
plugin.LogInfo("Starting Transaction Processor ...")
daemon.BackgroundWorker(func() {
plugin.LogSuccess("Starting Transaction Processor ... done")
for {
select {
case <- daemon.ShutdownSignal:
plugin.LogSuccess("Stopping Transaction Processor ... done")
return
case tx := <-sendQueue:
for _, neighbor := range GetNeighbors() {
SendTransactionToNeighbor(neighbor, tx)
}
}
}
})
}
const (
TRANSACTION_FILTER_SIZE = 5000
TRANSACTION_SEND_QUEUE_SIZE = 500
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment