Skip to content
Snippets Groups Projects
Commit 182ea59d authored by Hans Moog's avatar Hans Moog
Browse files

Feat: extended transaction processor -> gossip.SendTransaction

parent 64aa79c0
No related branches found
No related tags found
No related merge requests found
package main package main
import ( import (
"github.com/iotaledger/goshimmer/packages/database"
"github.com/iotaledger/goshimmer/packages/events"
"github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/packages/node"
"github.com/iotaledger/goshimmer/packages/transaction"
"github.com/iotaledger/goshimmer/plugins/analysis" "github.com/iotaledger/goshimmer/plugins/analysis"
"github.com/iotaledger/goshimmer/plugins/autopeering" "github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/cli" "github.com/iotaledger/goshimmer/plugins/cli"
...@@ -22,9 +19,4 @@ func main() { ...@@ -22,9 +19,4 @@ func main() {
statusscreen.PLUGIN, statusscreen.PLUGIN,
gracefulshutdown.PLUGIN, gracefulshutdown.PLUGIN,
) )
db, _ := database.Get("transactions")
gossip.Events.ReceiveTransaction.Attach(events.NewClosure(func(tx *transaction.Transaction) {
db.Set(tx.Hash.ToBytes(), tx.Bytes)
}))
} }
...@@ -39,6 +39,15 @@ func (this *ManagedConnection) Read(receiveBuffer []byte) (n int, err error) { ...@@ -39,6 +39,15 @@ func (this *ManagedConnection) Read(receiveBuffer []byte) (n int, err error) {
} }
byteCount, err := this.Conn.Read(receiveBuffer) byteCount, err := this.Conn.Read(receiveBuffer)
if byteCount > 0 {
totalReadBytes += byteCount
receivedData := make([]byte, byteCount)
copy(receivedData, receiveBuffer)
this.Events.ReceiveData.Trigger(receivedData)
}
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF {
this.Events.Error.Trigger(err) this.Events.Error.Trigger(err)
...@@ -46,12 +55,6 @@ func (this *ManagedConnection) Read(receiveBuffer []byte) (n int, err error) { ...@@ -46,12 +55,6 @@ func (this *ManagedConnection) Read(receiveBuffer []byte) (n int, err error) {
return totalReadBytes, err return totalReadBytes, err
} }
totalReadBytes += byteCount
receivedData := make([]byte, byteCount)
copy(receivedData, receiveBuffer)
this.Events.ReceiveData.Trigger(receivedData)
} }
} }
......
...@@ -49,7 +49,7 @@ func manageConnection(plugin *node.Plugin, neighbor *Peer) { ...@@ -49,7 +49,7 @@ func manageConnection(plugin *node.Plugin, neighbor *Peer) {
failedConnectionAttempts := 0 failedConnectionAttempts := 0
for _, exists := GetNeighbor(neighbor.Identity.StringIdentifier); exists && failedConnectionAttempts < CONNECTION_MAX_ATTEMPTS; { for _, exists := GetNeighbor(neighbor.Identity.StringIdentifier); exists && failedConnectionAttempts < CONNECTION_MAX_ATTEMPTS; {
conn, dialed, err := neighbor.Connect() protocol, dialed, err := neighbor.Connect()
if err != nil { if err != nil {
failedConnectionAttempts++ failedConnectionAttempts++
...@@ -68,13 +68,13 @@ func manageConnection(plugin *node.Plugin, neighbor *Peer) { ...@@ -68,13 +68,13 @@ func manageConnection(plugin *node.Plugin, neighbor *Peer) {
failedConnectionAttempts = 0 failedConnectionAttempts = 0
disconnectChan := make(chan int, 1) disconnectSignal := make(chan int, 1)
conn.Events.Close.Attach(events.NewClosure(func() { protocol.Conn.Events.Close.Attach(events.NewClosure(func() {
close(disconnectChan) close(disconnectSignal)
})) }))
if dialed { if dialed {
go newProtocol(conn).Init() go protocol.Init()
} }
// wait for shutdown or // wait for shutdown or
...@@ -82,7 +82,7 @@ func manageConnection(plugin *node.Plugin, neighbor *Peer) { ...@@ -82,7 +82,7 @@ func manageConnection(plugin *node.Plugin, neighbor *Peer) {
case <-daemon.ShutdownSignal: case <-daemon.ShutdownSignal:
return return
case <-disconnectChan: case <-disconnectSignal:
break break
} }
} }
...@@ -92,37 +92,37 @@ func manageConnection(plugin *node.Plugin, neighbor *Peer) { ...@@ -92,37 +92,37 @@ func manageConnection(plugin *node.Plugin, neighbor *Peer) {
} }
type Peer struct { type Peer struct {
Identity *identity.Identity Identity *identity.Identity
Address net.IP Address net.IP
Port uint16 Port uint16
InitiatedConn *network.ManagedConnection InitiatedProtocol *protocol
AcceptedConn *network.ManagedConnection AcceptedProtocol *protocol
initiatedConnMutex sync.RWMutex initiatedProtocolMutex sync.RWMutex
acceptedConnMutex sync.RWMutex acceptedProtocolMutex sync.RWMutex
} }
func UnmarshalPeer(data []byte) (*Peer, error) { func UnmarshalPeer(data []byte) (*Peer, error) {
return &Peer{}, nil return &Peer{}, nil
} }
func (peer *Peer) Connect() (*network.ManagedConnection, bool, errors.IdentifiableError) { func (peer *Peer) Connect() (*protocol, bool, errors.IdentifiableError) {
peer.initiatedConnMutex.Lock() peer.initiatedProtocolMutex.Lock()
defer peer.initiatedConnMutex.Unlock() defer peer.initiatedProtocolMutex.Unlock()
// return existing connections first // return existing connections first
if peer.InitiatedConn != nil { if peer.InitiatedProtocol != nil {
return peer.InitiatedConn, false, nil return peer.InitiatedProtocol, false, nil
} }
// if we already have an accepted connection -> use it instead // if we already have an accepted connection -> use it instead
if peer.AcceptedConn != nil { if peer.AcceptedProtocol != nil {
peer.acceptedConnMutex.RLock() peer.acceptedProtocolMutex.RLock()
if peer.AcceptedConn != nil { if peer.AcceptedProtocol != nil {
defer peer.acceptedConnMutex.RUnlock() defer peer.acceptedProtocolMutex.RUnlock()
return peer.AcceptedConn, false, nil return peer.AcceptedProtocol, false, nil
} }
peer.acceptedConnMutex.RUnlock() peer.acceptedProtocolMutex.RUnlock()
} }
// otherwise try to dial // otherwise try to dial
...@@ -132,16 +132,16 @@ func (peer *Peer) Connect() (*network.ManagedConnection, bool, errors.Identifiab ...@@ -132,16 +132,16 @@ func (peer *Peer) Connect() (*network.ManagedConnection, bool, errors.Identifiab
peer.Identity.StringIdentifier+"@"+peer.Address.String()+":"+strconv.Itoa(int(peer.Port))) peer.Identity.StringIdentifier+"@"+peer.Address.String()+":"+strconv.Itoa(int(peer.Port)))
} }
peer.InitiatedConn = network.NewManagedConnection(conn) peer.InitiatedProtocol = newProtocol(network.NewManagedConnection(conn))
peer.InitiatedConn.Events.Close.Attach(events.NewClosure(func() { peer.InitiatedProtocol.Conn.Events.Close.Attach(events.NewClosure(func() {
peer.initiatedConnMutex.Lock() peer.initiatedProtocolMutex.Lock()
defer peer.initiatedConnMutex.Unlock() defer peer.initiatedProtocolMutex.Unlock()
peer.InitiatedConn = nil peer.InitiatedProtocol = nil
})) }))
return peer.InitiatedConn, true, nil return peer.InitiatedProtocol, true, nil
} }
func (peer *Peer) Marshal() []byte { func (peer *Peer) Marshal() []byte {
...@@ -212,4 +212,5 @@ const ( ...@@ -212,4 +212,5 @@ const (
) )
var neighbors = make(map[string]*Peer) var neighbors = make(map[string]*Peer)
var neighborLock sync.RWMutex var neighborLock sync.RWMutex
...@@ -11,6 +11,7 @@ var PLUGIN = node.NewPlugin("Gossip", configure, run) ...@@ -11,6 +11,7 @@ var PLUGIN = node.NewPlugin("Gossip", configure, run)
func configure(plugin *node.Plugin) { func configure(plugin *node.Plugin) {
configureNeighbors(plugin) configureNeighbors(plugin)
configureServer(plugin) configureServer(plugin)
configureTransactionProcessor(plugin)
Events.ReceiveTransaction.Attach(events.NewClosure(func(transaction *transaction.Transaction) { Events.ReceiveTransaction.Attach(events.NewClosure(func(transaction *transaction.Transaction) {
...@@ -20,4 +21,5 @@ func configure(plugin *node.Plugin) { ...@@ -20,4 +21,5 @@ func configure(plugin *node.Plugin) {
func run(plugin *node.Plugin) { func run(plugin *node.Plugin) {
runNeighbors(plugin) runNeighbors(plugin)
runServer(plugin) runServer(plugin)
runTransactionProcessor(plugin)
} }
...@@ -5,12 +5,13 @@ import ( ...@@ -5,12 +5,13 @@ import (
"github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/events"
"github.com/iotaledger/goshimmer/packages/network" "github.com/iotaledger/goshimmer/packages/network"
"strconv" "strconv"
"sync"
) )
// region constants and variables ////////////////////////////////////////////////////////////////////////////////////// // region constants and variables //////////////////////////////////////////////////////////////////////////////////////
var DEFAULT_PROTOCOL = protocolDefinition{ var DEFAULT_PROTOCOL = protocolDefinition{
version: 1, version: VERSION_1,
initializer: protocolV1, initializer: protocolV1,
} }
...@@ -21,10 +22,11 @@ var DEFAULT_PROTOCOL = protocolDefinition{ ...@@ -21,10 +22,11 @@ var DEFAULT_PROTOCOL = protocolDefinition{
type protocol struct { type protocol struct {
Conn *network.ManagedConnection Conn *network.ManagedConnection
Neighbor *Peer Neighbor *Peer
Version int Version byte
SendState protocolState SendState protocolState
ReceivingState protocolState ReceivingState protocolState
Events protocolEvents Events protocolEvents
sendMutex sync.Mutex
} }
func newProtocol(conn *network.ManagedConnection) *protocol { func newProtocol(conn *network.ManagedConnection) *protocol {
...@@ -95,6 +97,13 @@ func (protocol *protocol) Receive(data []byte) { ...@@ -95,6 +97,13 @@ func (protocol *protocol) Receive(data []byte) {
} }
func (protocol *protocol) Send(data interface{}) errors.IdentifiableError { func (protocol *protocol) Send(data interface{}) errors.IdentifiableError {
protocol.sendMutex.Lock()
defer protocol.sendMutex.Unlock()
return protocol.send(data)
}
func (protocol *protocol) send(data interface{}) errors.IdentifiableError {
if protocol.SendState != nil { if protocol.SendState != nil {
if err := protocol.SendState.Send(data); err != nil { if err := protocol.SendState.Send(data); err != nil {
protocol.SendState = nil protocol.SendState = nil
......
...@@ -35,6 +35,20 @@ func protocolV1(protocol *protocol) errors.IdentifiableError { ...@@ -35,6 +35,20 @@ func protocolV1(protocol *protocol) errors.IdentifiableError {
return nil return nil
} }
func sendTransactionV1(protocol *protocol, tx *transaction.Transaction) {
if _, ok := protocol.SendState.(*dispatchStateV1); ok {
protocol.sendMutex.Lock()
defer protocol.sendMutex.Unlock()
if err := protocol.send(DISPATCH_TRANSACTION); err != nil {
return
}
if err := protocol.send(tx); err != nil {
return
}
}
}
// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// // endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
// region indentificationStateV1 /////////////////////////////////////////////////////////////////////////////////////// // region indentificationStateV1 ///////////////////////////////////////////////////////////////////////////////////////
...@@ -283,7 +297,7 @@ func (state *transactionStateV1) Receive(data []byte, offset int, length int) (i ...@@ -283,7 +297,7 @@ func (state *transactionStateV1) Receive(data []byte, offset int, length int) (i
protocol.Events.ReceiveTransactionData.Trigger(transactionData) protocol.Events.ReceiveTransactionData.Trigger(transactionData)
go processTransactionData(transactionData) go processIncomingTransactionData(transactionData)
protocol.ReceivingState = newDispatchStateV1(protocol) protocol.ReceivingState = newDispatchStateV1(protocol)
state.offset = 0 state.offset = 0
......
...@@ -25,18 +25,18 @@ func configureServer(plugin *node.Plugin) { ...@@ -25,18 +25,18 @@ func configureServer(plugin *node.Plugin) {
// store connection in neighbor if its a neighbor calling // store connection in neighbor if its a neighbor calling
protocol.Events.ReceiveIdentification.Attach(events.NewClosure(func(identity *identity.Identity) { protocol.Events.ReceiveIdentification.Attach(events.NewClosure(func(identity *identity.Identity) {
if protocol.Neighbor != nil { if protocol.Neighbor != nil {
protocol.Neighbor.acceptedConnMutex.Lock() protocol.Neighbor.acceptedProtocolMutex.Lock()
if protocol.Neighbor.AcceptedConn == nil { if protocol.Neighbor.AcceptedProtocol == nil {
protocol.Neighbor.AcceptedConn = protocol.Conn protocol.Neighbor.AcceptedProtocol = protocol
protocol.Neighbor.AcceptedConn.Events.Close.Attach(events.NewClosure(func() { protocol.Conn.Events.Close.Attach(events.NewClosure(func() {
protocol.Neighbor.acceptedConnMutex.Lock() protocol.Neighbor.acceptedProtocolMutex.Lock()
defer protocol.Neighbor.acceptedConnMutex.Unlock() defer protocol.Neighbor.acceptedProtocolMutex.Unlock()
protocol.Neighbor.AcceptedConn = nil protocol.Neighbor.AcceptedProtocol = nil
})) }))
} }
protocol.Neighbor.acceptedConnMutex.Unlock() protocol.Neighbor.acceptedProtocolMutex.Unlock()
} }
})) }))
......
package gossip package gossip
import ( import (
"github.com/iotaledger/goshimmer/packages/daemon"
"github.com/iotaledger/goshimmer/packages/events"
"github.com/iotaledger/goshimmer/packages/filter" "github.com/iotaledger/goshimmer/packages/filter"
"github.com/iotaledger/goshimmer/packages/node"
"github.com/iotaledger/goshimmer/packages/transaction" "github.com/iotaledger/goshimmer/packages/transaction"
) )
var transactionFilter = filter.NewByteArrayFilter(TRANSACTION_FILTER_SIZE) var transactionFilter = filter.NewByteArrayFilter(TRANSACTION_FILTER_SIZE)
func processTransactionData(transactionData []byte) { var sendQueue = make(chan *transaction.Transaction, TRANSACTION_SEND_QUEUE_SIZE)
func SendTransaction(transaction *transaction.Transaction) {
sendQueue <- transaction
}
func SendTransactionToNeighbor(neighbor *Peer, transaction *transaction.Transaction) {
switch neighbor.AcceptedProtocol.Version {
case VERSION_1:
sendTransactionV1(neighbor.AcceptedProtocol, transaction)
}
}
func processIncomingTransactionData(transactionData []byte) {
if transactionFilter.Add(transactionData) { if transactionFilter.Add(transactionData) {
Events.ReceiveTransaction.Trigger(transaction.FromBytes(transactionData)) Events.ReceiveTransaction.Trigger(transaction.FromBytes(transactionData))
} }
} }
func configureTransactionProcessor(plugin *node.Plugin) {
daemon.Events.Shutdown.Attach(events.NewClosure(func() {
plugin.LogInfo("Stopping Transaction Processor ...")
}))
}
func runTransactionProcessor(plugin *node.Plugin) {
plugin.LogInfo("Starting Transaction Processor ...")
daemon.BackgroundWorker(func() {
plugin.LogSuccess("Starting Transaction Processor ... done")
for {
select {
case <- daemon.ShutdownSignal:
plugin.LogSuccess("Stopping Transaction Processor ... done")
return
case tx := <-sendQueue:
for _, neighbor := range GetNeighbors() {
SendTransactionToNeighbor(neighbor, tx)
}
}
}
})
}
const ( const (
TRANSACTION_FILTER_SIZE = 5000 TRANSACTION_FILTER_SIZE = 5000
TRANSACTION_SEND_QUEUE_SIZE = 500
) )
...@@ -13,7 +13,7 @@ func BenchmarkProcessSimilarTransactionsFiltered(b *testing.B) { ...@@ -13,7 +13,7 @@ func BenchmarkProcessSimilarTransactionsFiltered(b *testing.B) {
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
processTransactionData(byteArray) processIncomingTransactionData(byteArray)
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment