From 36dd741d23c7ee4a84297d8c09dacc4524c0e5f1 Mon Sep 17 00:00:00 2001 From: Hans Moog <hm@mkjc.net> Date: Tue, 21 May 2019 17:37:43 +0200 Subject: [PATCH] Fix: fixed gossip protocol gossip.SendTransaction() now works --- main.go | 2 + packages/errors/errors.go | 14 +++---- packages/errors/stack.go | 2 +- plugins/gossip/events.go | 2 + plugins/gossip/neighbors.go | 2 +- plugins/gossip/plugin.go | 2 - plugins/gossip/protocol.go | 1 + plugins/gossip/protocol_v1.go | 44 ++++++++++++++++---- plugins/gossip/send_queue.go | 4 +- plugins/gossip/transaction_processor.go | 15 ++++--- plugins/gossip/transaction_processor_test.go | 2 +- plugins/tangle/plugin.go | 41 ++++++++++++++---- 12 files changed, 93 insertions(+), 38 deletions(-) diff --git a/main.go b/main.go index 9d80fd89..bbbed59c 100644 --- a/main.go +++ b/main.go @@ -8,6 +8,7 @@ import ( "github.com/iotaledger/goshimmer/plugins/gossip" "github.com/iotaledger/goshimmer/plugins/gracefulshutdown" "github.com/iotaledger/goshimmer/plugins/statusscreen" + "github.com/iotaledger/goshimmer/plugins/tangle" ) func main() { @@ -15,6 +16,7 @@ func main() { cli.PLUGIN, autopeering.PLUGIN, gossip.PLUGIN, + tangle.PLUGIN, analysis.PLUGIN, statusscreen.PLUGIN, gracefulshutdown.PLUGIN, diff --git a/packages/errors/errors.go b/packages/errors/errors.go index 6ca17ec9..4be504c8 100644 --- a/packages/errors/errors.go +++ b/packages/errors/errors.go @@ -107,7 +107,7 @@ func New(message string) *fundamental { return &fundamental{ id: idCounter, msg: message, - stack: callers(), + stack: Callers(), } } @@ -120,7 +120,7 @@ func Errorf(format string, args ...interface{}) IdentifiableError { return &fundamental{ id: idCounter, msg: fmt.Sprintf(format, args...), - stack: callers(), + stack: Callers(), } } @@ -135,7 +135,7 @@ func (f *fundamental) Derive(msg string) *fundamental { return &fundamental{ id: f.id, msg: msg, - stack: callers(), + stack: Callers(), } } @@ -177,7 +177,7 @@ func WithStack(err error) IdentifiableError { return &withStack{ idCounter, err, - callers(), + Callers(), } } @@ -205,7 +205,7 @@ func (w *withStack) Derive(err error, message string) *withStack { cause: err, msg: message, }, - callers(), + Callers(), } } @@ -244,7 +244,7 @@ func Wrap(err error, message string) *withStack { return &withStack{ idCounter, err, - callers(), + Callers(), } } @@ -265,7 +265,7 @@ func Wrapf(err error, format string, args ...interface{}) IdentifiableError { return &withStack{ idCounter, err, - callers(), + Callers(), } } diff --git a/packages/errors/stack.go b/packages/errors/stack.go index 54b202a1..c9ca0456 100644 --- a/packages/errors/stack.go +++ b/packages/errors/stack.go @@ -160,7 +160,7 @@ func (s *stack) StackTrace() StackTrace { return f } -func callers() *stack { +func Callers() *stack { const depth = 32 var pcs [depth]uintptr n := runtime.Callers(3, pcs[:]) diff --git a/plugins/gossip/events.go b/plugins/gossip/events.go index b354280f..130da4f8 100644 --- a/plugins/gossip/events.go +++ b/plugins/gossip/events.go @@ -78,4 +78,6 @@ func neighborCaller(handler interface{}, params ...interface{}) { handler.(func( func errorCaller(handler interface{}, params ...interface{}) { handler.(func(errors.IdentifiableError))(params[0].(errors.IdentifiableError)) } +func dataCaller(handler interface{}, params ...interface{}) { handler.(func([]byte))(params[0].([]byte)) } + func transactionCaller(handler interface{}, params ...interface{}) { handler.(func(*transaction.Transaction))(params[0].(*transaction.Transaction)) } diff --git a/plugins/gossip/neighbors.go b/plugins/gossip/neighbors.go index bfa3e055..47f0028c 100644 --- a/plugins/gossip/neighbors.go +++ b/plugins/gossip/neighbors.go @@ -84,7 +84,7 @@ func manageConnection(plugin *node.Plugin, neighbor *Neighbor) { return case <-disconnectSignal: - break + continue } } diff --git a/plugins/gossip/plugin.go b/plugins/gossip/plugin.go index 5262b9e4..33b85247 100644 --- a/plugins/gossip/plugin.go +++ b/plugins/gossip/plugin.go @@ -12,7 +12,6 @@ func configure(plugin *node.Plugin) { configureNeighbors(plugin) configureServer(plugin) configureSendQueue(plugin) - configureTransactionProcessor(plugin) Events.ReceiveTransaction.Attach(events.NewClosure(func(transaction *transaction.Transaction) { @@ -23,5 +22,4 @@ func run(plugin *node.Plugin) { runNeighbors(plugin) runServer(plugin) runSendQueue(plugin) - runTransactionProcessor(plugin) } diff --git a/plugins/gossip/protocol.go b/plugins/gossip/protocol.go index f05e6c8c..bc9f1e69 100644 --- a/plugins/gossip/protocol.go +++ b/plugins/gossip/protocol.go @@ -40,6 +40,7 @@ func newProtocol(conn *network.ManagedConnection) *protocol { ReceiveIdentification: events.NewEvent(identityCaller), ReceiveConnectionAccepted: events.NewEvent(events.CallbackCaller), ReceiveConnectionRejected: events.NewEvent(events.CallbackCaller), + ReceiveTransactionData: events.NewEvent(dataCaller), HandshakeCompleted: events.NewEvent(events.CallbackCaller), Error: events.NewEvent(errorCaller), }, diff --git a/plugins/gossip/protocol_v1.go b/plugins/gossip/protocol_v1.go index 1b841f6b..7e4bc5e4 100644 --- a/plugins/gossip/protocol_v1.go +++ b/plugins/gossip/protocol_v1.go @@ -7,6 +7,7 @@ import ( "github.com/iotaledger/goshimmer/packages/errors" "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/identity" + "github.com/iotaledger/goshimmer/packages/ternary" "github.com/iotaledger/goshimmer/packages/transaction" "strconv" ) @@ -219,7 +220,7 @@ func newDispatchStateV1(protocol *protocol) *dispatchStateV1 { func (state *dispatchStateV1) Receive(data []byte, offset int, length int) (int, errors.IdentifiableError) { switch data[0] { - case 0: + case DISPATCH_DROP: protocol := state.protocol protocol.Events.ReceiveConnectionRejected.Trigger() @@ -228,12 +229,12 @@ func (state *dispatchStateV1) Receive(data []byte, offset int, length int) (int, protocol.ReceivingState = nil - case 1: + case DISPATCH_TRANSACTION: protocol := state.protocol protocol.ReceivingState = newTransactionStateV1(protocol) - case 2: + case DISPATCH_REQUEST: protocol := state.protocol protocol.ReceivingState = newRequestStateV1(protocol) @@ -251,6 +252,10 @@ func (state *dispatchStateV1) Send(param interface{}) errors.IdentifiableError { case DISPATCH_DROP: protocol := state.protocol + if _, err := protocol.Conn.Write([]byte{DISPATCH_DROP}); err != nil { + return ErrSendFailed.Derive(err, "failed to send drop message") + } + _ = protocol.Conn.Close() protocol.SendState = nil @@ -260,6 +265,10 @@ func (state *dispatchStateV1) Send(param interface{}) errors.IdentifiableError { case DISPATCH_TRANSACTION: protocol := state.protocol + if _, err := protocol.Conn.Write([]byte{DISPATCH_TRANSACTION}); err != nil { + return ErrSendFailed.Derive(err, "failed to send transaction dispatch byte") + } + protocol.SendState = newTransactionStateV1(protocol) return nil @@ -267,6 +276,10 @@ func (state *dispatchStateV1) Send(param interface{}) errors.IdentifiableError { case DISPATCH_REQUEST: protocol := state.protocol + if _, err := protocol.Conn.Write([]byte{DISPATCH_REQUEST}); err != nil { + return ErrSendFailed.Derive(err, "failed to send request dispatch byte") + } + protocol.SendState = newTransactionStateV1(protocol) return nil @@ -288,8 +301,9 @@ type transactionStateV1 struct { func newTransactionStateV1(protocol *protocol) *transactionStateV1 { return &transactionStateV1{ - buffer: make([]byte, transaction.MARSHALLED_TOTAL_SIZE), - offset: 0, + protocol: protocol, + buffer: make([]byte, transaction.MARSHALLED_TOTAL_SIZE / ternary.NUMBER_OF_TRITS_IN_A_BYTE), + offset: 0, } } @@ -297,15 +311,15 @@ func (state *transactionStateV1) Receive(data []byte, offset int, length int) (i bytesRead := byteutils.ReadAvailableBytesToBuffer(state.buffer, state.offset, data, offset, length) state.offset += bytesRead - if state.offset == transaction.MARSHALLED_TOTAL_SIZE { + if state.offset == transaction.MARSHALLED_TOTAL_SIZE / ternary.NUMBER_OF_TRITS_IN_A_BYTE { protocol := state.protocol - transactionData := make([]byte, transaction.MARSHALLED_TOTAL_SIZE) + transactionData := make([]byte, transaction.MARSHALLED_TOTAL_SIZE / ternary.NUMBER_OF_TRITS_IN_A_BYTE) copy(transactionData, state.buffer) protocol.Events.ReceiveTransactionData.Trigger(transactionData) - go processIncomingTransactionData(transactionData) + go ProcessReceivedTransactionData(transactionData) protocol.ReceivingState = newDispatchStateV1(protocol) state.offset = 0 @@ -315,7 +329,19 @@ func (state *transactionStateV1) Receive(data []byte, offset int, length int) (i } func (state *transactionStateV1) Send(param interface{}) errors.IdentifiableError { - return nil + if tx, ok := param.(*transaction.Transaction); ok { + protocol := state.protocol + + if _, err := protocol.Conn.Write(tx.Bytes); err != nil { + return ErrSendFailed.Derive(err, "failed to send transaction") + } + + protocol.SendState = newDispatchStateV1(protocol) + + return nil + } + + return ErrInvalidSendParam.Derive("passed in parameter is not a valid transaction") } // endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/plugins/gossip/send_queue.go b/plugins/gossip/send_queue.go index 23443a1d..736c32be 100644 --- a/plugins/gossip/send_queue.go +++ b/plugins/gossip/send_queue.go @@ -40,10 +40,10 @@ func runSendQueue(plugin *node.Plugin) { for _, neighborQueue := range neighborQueues { select { case neighborQueue.queue <- tx: - return + // log sth default: - return + // log sth } } connectedNeighborsMutex.RUnlock() diff --git a/plugins/gossip/transaction_processor.go b/plugins/gossip/transaction_processor.go index 2dae5050..269f493b 100644 --- a/plugins/gossip/transaction_processor.go +++ b/plugins/gossip/transaction_processor.go @@ -2,26 +2,25 @@ package gossip import ( "github.com/iotaledger/goshimmer/packages/filter" - "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/packages/transaction" ) -var transactionFilter = filter.NewByteArrayFilter(TRANSACTION_FILTER_SIZE) +// region public api /////////////////////////////////////////////////////////////////////////////////////////////////// -func processIncomingTransactionData(transactionData []byte) { +func ProcessReceivedTransactionData(transactionData []byte) { if transactionFilter.Add(transactionData) { Events.ReceiveTransaction.Trigger(transaction.FromBytes(transactionData)) } } +// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// +// region constants and variables ////////////////////////////////////////////////////////////////////////////////////// -func configureTransactionProcessor(plugin *node.Plugin) { -} - -func runTransactionProcessor(plugin *node.Plugin) { -} +var transactionFilter = filter.NewByteArrayFilter(TRANSACTION_FILTER_SIZE) const ( TRANSACTION_FILTER_SIZE = 5000 ) + +// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/plugins/gossip/transaction_processor_test.go b/plugins/gossip/transaction_processor_test.go index 30c151a3..04348641 100644 --- a/plugins/gossip/transaction_processor_test.go +++ b/plugins/gossip/transaction_processor_test.go @@ -13,7 +13,7 @@ func BenchmarkProcessSimilarTransactionsFiltered(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - processIncomingTransactionData(byteArray) + ProcessReceivedTransactionData(byteArray) } } diff --git a/plugins/tangle/plugin.go b/plugins/tangle/plugin.go index d6a4d0e8..556dda07 100644 --- a/plugins/tangle/plugin.go +++ b/plugins/tangle/plugin.go @@ -1,6 +1,7 @@ package tangle import ( + "fmt" "github.com/iotaledger/goshimmer/packages/database" "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/node" @@ -8,14 +9,24 @@ import ( "github.com/iotaledger/goshimmer/plugins/gossip" ) +// region plugin module setup ////////////////////////////////////////////////////////////////////////////////////////// + var PLUGIN = node.NewPlugin("Tangle", configure, run) func configure(node *node.Plugin) { + if db, err := database.Get("transactions"); err != nil { + panic(err) + } else { + transactionDatabase = db + } + gossip.Events.ReceiveTransaction.Attach(events.NewClosure(func(transaction *transaction.Transaction) { if transactionStoredAlready, err := transactionDatabase.Contains(transaction.Hash.ToBytes()); err != nil { panic(err) } else { if !transactionStoredAlready { + + fmt.Println(transaction.Hash.ToString()) // process transaction } } @@ -23,15 +34,31 @@ func configure(node *node.Plugin) { } func run(node *node.Plugin) { + /* + if accountability.OWN_ID.StringIdentifier == "72f84aaee02af4542672cb25aceb9d9e458ef2a3" { + go func() { + txCounter := 0 + + for { + txCounter++ + + dummyTx := make([]byte, transaction.MARSHALLED_TOTAL_SIZE / ternary.NUMBER_OF_TRITS_IN_A_BYTE) + for i := 0; i < 1620; i++ { + dummyTx[i] = byte((i + txCounter) % 128) + } + gossip.SendTransaction(transaction.FromBytes(dummyTx)) + + <- time.After(1000 * time.Millisecond) + } + }() + }*/ } +// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// + +// region constants and variables ////////////////////////////////////////////////////////////////////////////////////// + var transactionDatabase database.Database -func init() { - if db, err := database.Get("transactions"); err != nil { - panic(err) - } else { - transactionDatabase = db - } -} +// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// -- GitLab