diff --git a/main.go b/main.go index 62b4183b237ff3b54c679b3fa0741df4fead66bb..9d80fd890b2a9530e533e80e89e4947d404064be 100644 --- a/main.go +++ b/main.go @@ -1,10 +1,7 @@ package main import ( - "github.com/iotaledger/goshimmer/packages/database" - "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/node" - "github.com/iotaledger/goshimmer/packages/transaction" "github.com/iotaledger/goshimmer/plugins/analysis" "github.com/iotaledger/goshimmer/plugins/autopeering" "github.com/iotaledger/goshimmer/plugins/cli" @@ -22,9 +19,4 @@ func main() { statusscreen.PLUGIN, gracefulshutdown.PLUGIN, ) - - db, _ := database.Get("transactions") - gossip.Events.ReceiveTransaction.Attach(events.NewClosure(func(tx *transaction.Transaction) { - db.Set(tx.Hash.ToBytes(), tx.Bytes) - })) } diff --git a/packages/network/managed_connection.go b/packages/network/managed_connection.go index cb70a9b82d8b355740ecae19bba270ebb2430d6c..9fb42fd072db76f28d6ca3ffe0f9645f8d34e689 100644 --- a/packages/network/managed_connection.go +++ b/packages/network/managed_connection.go @@ -39,6 +39,15 @@ func (this *ManagedConnection) Read(receiveBuffer []byte) (n int, err error) { } byteCount, err := this.Conn.Read(receiveBuffer) + if byteCount > 0 { + totalReadBytes += byteCount + + receivedData := make([]byte, byteCount) + copy(receivedData, receiveBuffer) + + this.Events.ReceiveData.Trigger(receivedData) + } + if err != nil { if err != io.EOF { this.Events.Error.Trigger(err) @@ -46,12 +55,6 @@ func (this *ManagedConnection) Read(receiveBuffer []byte) (n int, err error) { return totalReadBytes, err } - totalReadBytes += byteCount - - receivedData := make([]byte, byteCount) - copy(receivedData, receiveBuffer) - - this.Events.ReceiveData.Trigger(receivedData) } } diff --git a/plugins/gossip/neighbors.go b/plugins/gossip/neighbors.go index ee0c669a1303d8ef95082fe086ea36a92dd9e0e3..97a5d6f257137f258404f5953152f33c4a8cdbc0 100644 --- a/plugins/gossip/neighbors.go +++ b/plugins/gossip/neighbors.go @@ -49,7 +49,7 @@ func manageConnection(plugin *node.Plugin, neighbor *Peer) { failedConnectionAttempts := 0 for _, exists := GetNeighbor(neighbor.Identity.StringIdentifier); exists && failedConnectionAttempts < CONNECTION_MAX_ATTEMPTS; { - conn, dialed, err := neighbor.Connect() + protocol, dialed, err := neighbor.Connect() if err != nil { failedConnectionAttempts++ @@ -68,13 +68,13 @@ func manageConnection(plugin *node.Plugin, neighbor *Peer) { failedConnectionAttempts = 0 - disconnectChan := make(chan int, 1) - conn.Events.Close.Attach(events.NewClosure(func() { - close(disconnectChan) + disconnectSignal := make(chan int, 1) + protocol.Conn.Events.Close.Attach(events.NewClosure(func() { + close(disconnectSignal) })) if dialed { - go newProtocol(conn).Init() + go protocol.Init() } // wait for shutdown or @@ -82,7 +82,7 @@ func manageConnection(plugin *node.Plugin, neighbor *Peer) { case <-daemon.ShutdownSignal: return - case <-disconnectChan: + case <-disconnectSignal: break } } @@ -92,37 +92,37 @@ func manageConnection(plugin *node.Plugin, neighbor *Peer) { } type Peer struct { - Identity *identity.Identity - Address net.IP - Port uint16 - InitiatedConn *network.ManagedConnection - AcceptedConn *network.ManagedConnection - initiatedConnMutex sync.RWMutex - acceptedConnMutex sync.RWMutex + Identity *identity.Identity + Address net.IP + Port uint16 + InitiatedProtocol *protocol + AcceptedProtocol *protocol + initiatedProtocolMutex sync.RWMutex + acceptedProtocolMutex sync.RWMutex } func UnmarshalPeer(data []byte) (*Peer, error) { return &Peer{}, nil } -func (peer *Peer) Connect() (*network.ManagedConnection, bool, errors.IdentifiableError) { - peer.initiatedConnMutex.Lock() - defer peer.initiatedConnMutex.Unlock() +func (peer *Peer) Connect() (*protocol, bool, errors.IdentifiableError) { + peer.initiatedProtocolMutex.Lock() + defer peer.initiatedProtocolMutex.Unlock() // return existing connections first - if peer.InitiatedConn != nil { - return peer.InitiatedConn, false, nil + if peer.InitiatedProtocol != nil { + return peer.InitiatedProtocol, false, nil } // if we already have an accepted connection -> use it instead - if peer.AcceptedConn != nil { - peer.acceptedConnMutex.RLock() - if peer.AcceptedConn != nil { - defer peer.acceptedConnMutex.RUnlock() + if peer.AcceptedProtocol != nil { + peer.acceptedProtocolMutex.RLock() + if peer.AcceptedProtocol != nil { + defer peer.acceptedProtocolMutex.RUnlock() - return peer.AcceptedConn, false, nil + return peer.AcceptedProtocol, false, nil } - peer.acceptedConnMutex.RUnlock() + peer.acceptedProtocolMutex.RUnlock() } // otherwise try to dial @@ -132,16 +132,16 @@ func (peer *Peer) Connect() (*network.ManagedConnection, bool, errors.Identifiab peer.Identity.StringIdentifier+"@"+peer.Address.String()+":"+strconv.Itoa(int(peer.Port))) } - peer.InitiatedConn = network.NewManagedConnection(conn) + peer.InitiatedProtocol = newProtocol(network.NewManagedConnection(conn)) - peer.InitiatedConn.Events.Close.Attach(events.NewClosure(func() { - peer.initiatedConnMutex.Lock() - defer peer.initiatedConnMutex.Unlock() + peer.InitiatedProtocol.Conn.Events.Close.Attach(events.NewClosure(func() { + peer.initiatedProtocolMutex.Lock() + defer peer.initiatedProtocolMutex.Unlock() - peer.InitiatedConn = nil + peer.InitiatedProtocol = nil })) - return peer.InitiatedConn, true, nil + return peer.InitiatedProtocol, true, nil } func (peer *Peer) Marshal() []byte { @@ -212,4 +212,5 @@ const ( ) var neighbors = make(map[string]*Peer) + var neighborLock sync.RWMutex diff --git a/plugins/gossip/plugin.go b/plugins/gossip/plugin.go index 2ef6867c0e246b8d2c53f30febba5eeb2b08b616..73488f03dc7dadbdbf648811bf5a9db90821d12e 100644 --- a/plugins/gossip/plugin.go +++ b/plugins/gossip/plugin.go @@ -11,6 +11,7 @@ var PLUGIN = node.NewPlugin("Gossip", configure, run) func configure(plugin *node.Plugin) { configureNeighbors(plugin) configureServer(plugin) + configureTransactionProcessor(plugin) Events.ReceiveTransaction.Attach(events.NewClosure(func(transaction *transaction.Transaction) { @@ -20,4 +21,5 @@ func configure(plugin *node.Plugin) { func run(plugin *node.Plugin) { runNeighbors(plugin) runServer(plugin) + runTransactionProcessor(plugin) } diff --git a/plugins/gossip/protocol.go b/plugins/gossip/protocol.go index 2bc3f4b23096c3c7df558dd425f640360f83415f..e8b9d56cf80cea85eacf9b4b1e63cbb1d54e5a88 100644 --- a/plugins/gossip/protocol.go +++ b/plugins/gossip/protocol.go @@ -5,12 +5,13 @@ import ( "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/network" "strconv" + "sync" ) // region constants and variables ////////////////////////////////////////////////////////////////////////////////////// var DEFAULT_PROTOCOL = protocolDefinition{ - version: 1, + version: VERSION_1, initializer: protocolV1, } @@ -21,10 +22,11 @@ var DEFAULT_PROTOCOL = protocolDefinition{ type protocol struct { Conn *network.ManagedConnection Neighbor *Peer - Version int + Version byte SendState protocolState ReceivingState protocolState Events protocolEvents + sendMutex sync.Mutex } func newProtocol(conn *network.ManagedConnection) *protocol { @@ -95,6 +97,13 @@ func (protocol *protocol) Receive(data []byte) { } func (protocol *protocol) Send(data interface{}) errors.IdentifiableError { + protocol.sendMutex.Lock() + defer protocol.sendMutex.Unlock() + + return protocol.send(data) +} + +func (protocol *protocol) send(data interface{}) errors.IdentifiableError { if protocol.SendState != nil { if err := protocol.SendState.Send(data); err != nil { protocol.SendState = nil diff --git a/plugins/gossip/protocol_v1.go b/plugins/gossip/protocol_v1.go index 43c260a59cf4c0c646bcb618ad2a8edc5a890c4a..bfd3f32d6f07e5c673d12dc235c5cfa982dac276 100644 --- a/plugins/gossip/protocol_v1.go +++ b/plugins/gossip/protocol_v1.go @@ -35,6 +35,20 @@ func protocolV1(protocol *protocol) errors.IdentifiableError { return nil } +func sendTransactionV1(protocol *protocol, tx *transaction.Transaction) { + if _, ok := protocol.SendState.(*dispatchStateV1); ok { + protocol.sendMutex.Lock() + defer protocol.sendMutex.Unlock() + + if err := protocol.send(DISPATCH_TRANSACTION); err != nil { + return + } + if err := protocol.send(tx); err != nil { + return + } + } +} + // endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// // region indentificationStateV1 /////////////////////////////////////////////////////////////////////////////////////// @@ -283,7 +297,7 @@ func (state *transactionStateV1) Receive(data []byte, offset int, length int) (i protocol.Events.ReceiveTransactionData.Trigger(transactionData) - go processTransactionData(transactionData) + go processIncomingTransactionData(transactionData) protocol.ReceivingState = newDispatchStateV1(protocol) state.offset = 0 diff --git a/plugins/gossip/server.go b/plugins/gossip/server.go index 7c58c317a22167bb2e14d47b3b2ee07967a5afd9..1d9f08e17156ac3b94a539597a5c762dd7b42857 100644 --- a/plugins/gossip/server.go +++ b/plugins/gossip/server.go @@ -25,18 +25,18 @@ func configureServer(plugin *node.Plugin) { // store connection in neighbor if its a neighbor calling protocol.Events.ReceiveIdentification.Attach(events.NewClosure(func(identity *identity.Identity) { if protocol.Neighbor != nil { - protocol.Neighbor.acceptedConnMutex.Lock() - if protocol.Neighbor.AcceptedConn == nil { - protocol.Neighbor.AcceptedConn = protocol.Conn + protocol.Neighbor.acceptedProtocolMutex.Lock() + if protocol.Neighbor.AcceptedProtocol == nil { + protocol.Neighbor.AcceptedProtocol = protocol - protocol.Neighbor.AcceptedConn.Events.Close.Attach(events.NewClosure(func() { - protocol.Neighbor.acceptedConnMutex.Lock() - defer protocol.Neighbor.acceptedConnMutex.Unlock() + protocol.Conn.Events.Close.Attach(events.NewClosure(func() { + protocol.Neighbor.acceptedProtocolMutex.Lock() + defer protocol.Neighbor.acceptedProtocolMutex.Unlock() - protocol.Neighbor.AcceptedConn = nil + protocol.Neighbor.AcceptedProtocol = nil })) } - protocol.Neighbor.acceptedConnMutex.Unlock() + protocol.Neighbor.acceptedProtocolMutex.Unlock() } })) diff --git a/plugins/gossip/transaction_processor.go b/plugins/gossip/transaction_processor.go index 49f4f7df94cdbddfcab5830c40e54ed880e35d2d..aae8362aa342c136cc4a7e189214e925e0f9c8f4 100644 --- a/plugins/gossip/transaction_processor.go +++ b/plugins/gossip/transaction_processor.go @@ -1,18 +1,64 @@ package gossip import ( + "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/filter" + "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/packages/transaction" ) var transactionFilter = filter.NewByteArrayFilter(TRANSACTION_FILTER_SIZE) -func processTransactionData(transactionData []byte) { +var sendQueue = make(chan *transaction.Transaction, TRANSACTION_SEND_QUEUE_SIZE) + +func SendTransaction(transaction *transaction.Transaction) { + sendQueue <- transaction +} + +func SendTransactionToNeighbor(neighbor *Peer, transaction *transaction.Transaction) { + switch neighbor.AcceptedProtocol.Version { + case VERSION_1: + sendTransactionV1(neighbor.AcceptedProtocol, transaction) + } +} + +func processIncomingTransactionData(transactionData []byte) { if transactionFilter.Add(transactionData) { Events.ReceiveTransaction.Trigger(transaction.FromBytes(transactionData)) } } +func configureTransactionProcessor(plugin *node.Plugin) { + daemon.Events.Shutdown.Attach(events.NewClosure(func() { + plugin.LogInfo("Stopping Transaction Processor ...") + })) +} + +func runTransactionProcessor(plugin *node.Plugin) { + plugin.LogInfo("Starting Transaction Processor ...") + + daemon.BackgroundWorker(func() { + plugin.LogSuccess("Starting Transaction Processor ... done") + + for { + select { + case <- daemon.ShutdownSignal: + plugin.LogSuccess("Stopping Transaction Processor ... done") + + return + + case tx := <-sendQueue: + for _, neighbor := range GetNeighbors() { + SendTransactionToNeighbor(neighbor, tx) + } + } + } + }) +} + const ( TRANSACTION_FILTER_SIZE = 5000 + + TRANSACTION_SEND_QUEUE_SIZE = 500 ) diff --git a/plugins/gossip/transaction_processor_test.go b/plugins/gossip/transaction_processor_test.go index 1e567f60a1469dde338773a20c7d7efec54d76bf..30c151a321a2532aa51e8b88fabfb624708961b2 100644 --- a/plugins/gossip/transaction_processor_test.go +++ b/plugins/gossip/transaction_processor_test.go @@ -13,7 +13,7 @@ func BenchmarkProcessSimilarTransactionsFiltered(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - processTransactionData(byteArray) + processIncomingTransactionData(byteArray) } }