diff --git a/main.go b/main.go index 9d80fd890b2a9530e533e80e89e4947d404064be..bbbed59c3fea26cbc043d54ea39efe8273aad224 100644 --- a/main.go +++ b/main.go @@ -8,6 +8,7 @@ import ( "github.com/iotaledger/goshimmer/plugins/gossip" "github.com/iotaledger/goshimmer/plugins/gracefulshutdown" "github.com/iotaledger/goshimmer/plugins/statusscreen" + "github.com/iotaledger/goshimmer/plugins/tangle" ) func main() { @@ -15,6 +16,7 @@ func main() { cli.PLUGIN, autopeering.PLUGIN, gossip.PLUGIN, + tangle.PLUGIN, analysis.PLUGIN, statusscreen.PLUGIN, gracefulshutdown.PLUGIN, diff --git a/packages/errors/errors.go b/packages/errors/errors.go index 6ca17ec9589582f6b52fbaa7b3c660a7444964b6..4be504c850bd54efeebc72fa53152f648d15103e 100644 --- a/packages/errors/errors.go +++ b/packages/errors/errors.go @@ -107,7 +107,7 @@ func New(message string) *fundamental { return &fundamental{ id: idCounter, msg: message, - stack: callers(), + stack: Callers(), } } @@ -120,7 +120,7 @@ func Errorf(format string, args ...interface{}) IdentifiableError { return &fundamental{ id: idCounter, msg: fmt.Sprintf(format, args...), - stack: callers(), + stack: Callers(), } } @@ -135,7 +135,7 @@ func (f *fundamental) Derive(msg string) *fundamental { return &fundamental{ id: f.id, msg: msg, - stack: callers(), + stack: Callers(), } } @@ -177,7 +177,7 @@ func WithStack(err error) IdentifiableError { return &withStack{ idCounter, err, - callers(), + Callers(), } } @@ -205,7 +205,7 @@ func (w *withStack) Derive(err error, message string) *withStack { cause: err, msg: message, }, - callers(), + Callers(), } } @@ -244,7 +244,7 @@ func Wrap(err error, message string) *withStack { return &withStack{ idCounter, err, - callers(), + Callers(), } } @@ -265,7 +265,7 @@ func Wrapf(err error, format string, args ...interface{}) IdentifiableError { return &withStack{ idCounter, err, - callers(), + Callers(), } } diff --git a/packages/errors/stack.go b/packages/errors/stack.go index 54b202a1ff86c400582dce25df159af3f6bd58c1..c9ca045690e254be760e181d75e14609185e1b38 100644 --- a/packages/errors/stack.go +++ b/packages/errors/stack.go @@ -160,7 +160,7 @@ func (s *stack) StackTrace() StackTrace { return f } -func callers() *stack { +func Callers() *stack { const depth = 32 var pcs [depth]uintptr n := runtime.Callers(3, pcs[:]) diff --git a/plugins/gossip/events.go b/plugins/gossip/events.go index b354280f579f233ad6a308ec400d901b6a534191..130da4f8cb91c19a0708dac91c9bdaa7a35ed73b 100644 --- a/plugins/gossip/events.go +++ b/plugins/gossip/events.go @@ -78,4 +78,6 @@ func neighborCaller(handler interface{}, params ...interface{}) { handler.(func( func errorCaller(handler interface{}, params ...interface{}) { handler.(func(errors.IdentifiableError))(params[0].(errors.IdentifiableError)) } +func dataCaller(handler interface{}, params ...interface{}) { handler.(func([]byte))(params[0].([]byte)) } + func transactionCaller(handler interface{}, params ...interface{}) { handler.(func(*transaction.Transaction))(params[0].(*transaction.Transaction)) } diff --git a/plugins/gossip/neighbors.go b/plugins/gossip/neighbors.go index bfa3e055de36af4e97d80b7bbd8089fc3a1d4e81..47f0028c0027ab37f7ec8d3cb8e8d52eac5ebcbd 100644 --- a/plugins/gossip/neighbors.go +++ b/plugins/gossip/neighbors.go @@ -84,7 +84,7 @@ func manageConnection(plugin *node.Plugin, neighbor *Neighbor) { return case <-disconnectSignal: - break + continue } } diff --git a/plugins/gossip/plugin.go b/plugins/gossip/plugin.go index 5262b9e4c09cdf73c1f7f6232d21507b6c5f2f0b..33b8524770a8b434d5cbe03954b4b6b35a87bafd 100644 --- a/plugins/gossip/plugin.go +++ b/plugins/gossip/plugin.go @@ -12,7 +12,6 @@ func configure(plugin *node.Plugin) { configureNeighbors(plugin) configureServer(plugin) configureSendQueue(plugin) - configureTransactionProcessor(plugin) Events.ReceiveTransaction.Attach(events.NewClosure(func(transaction *transaction.Transaction) { @@ -23,5 +22,4 @@ func run(plugin *node.Plugin) { runNeighbors(plugin) runServer(plugin) runSendQueue(plugin) - runTransactionProcessor(plugin) } diff --git a/plugins/gossip/protocol.go b/plugins/gossip/protocol.go index f05e6c8c69b8f332a04d32030ef29d2bbafbb5d0..bc9f1e695581ba43402452d6f7f3acbed64423e5 100644 --- a/plugins/gossip/protocol.go +++ b/plugins/gossip/protocol.go @@ -40,6 +40,7 @@ func newProtocol(conn *network.ManagedConnection) *protocol { ReceiveIdentification: events.NewEvent(identityCaller), ReceiveConnectionAccepted: events.NewEvent(events.CallbackCaller), ReceiveConnectionRejected: events.NewEvent(events.CallbackCaller), + ReceiveTransactionData: events.NewEvent(dataCaller), HandshakeCompleted: events.NewEvent(events.CallbackCaller), Error: events.NewEvent(errorCaller), }, diff --git a/plugins/gossip/protocol_v1.go b/plugins/gossip/protocol_v1.go index 1b841f6bddad2a149c91499d41a9ddf59c181855..7e4bc5e405dbd5c4fbbf370ace459abbb2ac4222 100644 --- a/plugins/gossip/protocol_v1.go +++ b/plugins/gossip/protocol_v1.go @@ -7,6 +7,7 @@ import ( "github.com/iotaledger/goshimmer/packages/errors" "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/identity" + "github.com/iotaledger/goshimmer/packages/ternary" "github.com/iotaledger/goshimmer/packages/transaction" "strconv" ) @@ -219,7 +220,7 @@ func newDispatchStateV1(protocol *protocol) *dispatchStateV1 { func (state *dispatchStateV1) Receive(data []byte, offset int, length int) (int, errors.IdentifiableError) { switch data[0] { - case 0: + case DISPATCH_DROP: protocol := state.protocol protocol.Events.ReceiveConnectionRejected.Trigger() @@ -228,12 +229,12 @@ func (state *dispatchStateV1) Receive(data []byte, offset int, length int) (int, protocol.ReceivingState = nil - case 1: + case DISPATCH_TRANSACTION: protocol := state.protocol protocol.ReceivingState = newTransactionStateV1(protocol) - case 2: + case DISPATCH_REQUEST: protocol := state.protocol protocol.ReceivingState = newRequestStateV1(protocol) @@ -251,6 +252,10 @@ func (state *dispatchStateV1) Send(param interface{}) errors.IdentifiableError { case DISPATCH_DROP: protocol := state.protocol + if _, err := protocol.Conn.Write([]byte{DISPATCH_DROP}); err != nil { + return ErrSendFailed.Derive(err, "failed to send drop message") + } + _ = protocol.Conn.Close() protocol.SendState = nil @@ -260,6 +265,10 @@ func (state *dispatchStateV1) Send(param interface{}) errors.IdentifiableError { case DISPATCH_TRANSACTION: protocol := state.protocol + if _, err := protocol.Conn.Write([]byte{DISPATCH_TRANSACTION}); err != nil { + return ErrSendFailed.Derive(err, "failed to send transaction dispatch byte") + } + protocol.SendState = newTransactionStateV1(protocol) return nil @@ -267,6 +276,10 @@ func (state *dispatchStateV1) Send(param interface{}) errors.IdentifiableError { case DISPATCH_REQUEST: protocol := state.protocol + if _, err := protocol.Conn.Write([]byte{DISPATCH_REQUEST}); err != nil { + return ErrSendFailed.Derive(err, "failed to send request dispatch byte") + } + protocol.SendState = newTransactionStateV1(protocol) return nil @@ -288,8 +301,9 @@ type transactionStateV1 struct { func newTransactionStateV1(protocol *protocol) *transactionStateV1 { return &transactionStateV1{ - buffer: make([]byte, transaction.MARSHALLED_TOTAL_SIZE), - offset: 0, + protocol: protocol, + buffer: make([]byte, transaction.MARSHALLED_TOTAL_SIZE / ternary.NUMBER_OF_TRITS_IN_A_BYTE), + offset: 0, } } @@ -297,15 +311,15 @@ func (state *transactionStateV1) Receive(data []byte, offset int, length int) (i bytesRead := byteutils.ReadAvailableBytesToBuffer(state.buffer, state.offset, data, offset, length) state.offset += bytesRead - if state.offset == transaction.MARSHALLED_TOTAL_SIZE { + if state.offset == transaction.MARSHALLED_TOTAL_SIZE / ternary.NUMBER_OF_TRITS_IN_A_BYTE { protocol := state.protocol - transactionData := make([]byte, transaction.MARSHALLED_TOTAL_SIZE) + transactionData := make([]byte, transaction.MARSHALLED_TOTAL_SIZE / ternary.NUMBER_OF_TRITS_IN_A_BYTE) copy(transactionData, state.buffer) protocol.Events.ReceiveTransactionData.Trigger(transactionData) - go processIncomingTransactionData(transactionData) + go ProcessReceivedTransactionData(transactionData) protocol.ReceivingState = newDispatchStateV1(protocol) state.offset = 0 @@ -315,7 +329,19 @@ func (state *transactionStateV1) Receive(data []byte, offset int, length int) (i } func (state *transactionStateV1) Send(param interface{}) errors.IdentifiableError { - return nil + if tx, ok := param.(*transaction.Transaction); ok { + protocol := state.protocol + + if _, err := protocol.Conn.Write(tx.Bytes); err != nil { + return ErrSendFailed.Derive(err, "failed to send transaction") + } + + protocol.SendState = newDispatchStateV1(protocol) + + return nil + } + + return ErrInvalidSendParam.Derive("passed in parameter is not a valid transaction") } // endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/plugins/gossip/send_queue.go b/plugins/gossip/send_queue.go index 23443a1da5a0602f087fb71fd89638367c210240..736c32bea3f0103f9f27e12eb3836c9c83a612f2 100644 --- a/plugins/gossip/send_queue.go +++ b/plugins/gossip/send_queue.go @@ -40,10 +40,10 @@ func runSendQueue(plugin *node.Plugin) { for _, neighborQueue := range neighborQueues { select { case neighborQueue.queue <- tx: - return + // log sth default: - return + // log sth } } connectedNeighborsMutex.RUnlock() diff --git a/plugins/gossip/transaction_processor.go b/plugins/gossip/transaction_processor.go index 2dae505000adb650bdc186ea063bdb70947a371a..269f493b9f38ac82761202edc71a7a76b1b433b4 100644 --- a/plugins/gossip/transaction_processor.go +++ b/plugins/gossip/transaction_processor.go @@ -2,26 +2,25 @@ package gossip import ( "github.com/iotaledger/goshimmer/packages/filter" - "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/packages/transaction" ) -var transactionFilter = filter.NewByteArrayFilter(TRANSACTION_FILTER_SIZE) +// region public api /////////////////////////////////////////////////////////////////////////////////////////////////// -func processIncomingTransactionData(transactionData []byte) { +func ProcessReceivedTransactionData(transactionData []byte) { if transactionFilter.Add(transactionData) { Events.ReceiveTransaction.Trigger(transaction.FromBytes(transactionData)) } } +// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// +// region constants and variables ////////////////////////////////////////////////////////////////////////////////////// -func configureTransactionProcessor(plugin *node.Plugin) { -} - -func runTransactionProcessor(plugin *node.Plugin) { -} +var transactionFilter = filter.NewByteArrayFilter(TRANSACTION_FILTER_SIZE) const ( TRANSACTION_FILTER_SIZE = 5000 ) + +// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/plugins/gossip/transaction_processor_test.go b/plugins/gossip/transaction_processor_test.go index 30c151a321a2532aa51e8b88fabfb624708961b2..0434864165f145e3b49a38c4feda43313e3e9b83 100644 --- a/plugins/gossip/transaction_processor_test.go +++ b/plugins/gossip/transaction_processor_test.go @@ -13,7 +13,7 @@ func BenchmarkProcessSimilarTransactionsFiltered(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - processIncomingTransactionData(byteArray) + ProcessReceivedTransactionData(byteArray) } } diff --git a/plugins/tangle/plugin.go b/plugins/tangle/plugin.go index d6a4d0e8c93846562e99d8631e5011fa8c6fab03..556dda07205e263aa036a883070e259c322be51a 100644 --- a/plugins/tangle/plugin.go +++ b/plugins/tangle/plugin.go @@ -1,6 +1,7 @@ package tangle import ( + "fmt" "github.com/iotaledger/goshimmer/packages/database" "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/node" @@ -8,14 +9,24 @@ import ( "github.com/iotaledger/goshimmer/plugins/gossip" ) +// region plugin module setup ////////////////////////////////////////////////////////////////////////////////////////// + var PLUGIN = node.NewPlugin("Tangle", configure, run) func configure(node *node.Plugin) { + if db, err := database.Get("transactions"); err != nil { + panic(err) + } else { + transactionDatabase = db + } + gossip.Events.ReceiveTransaction.Attach(events.NewClosure(func(transaction *transaction.Transaction) { if transactionStoredAlready, err := transactionDatabase.Contains(transaction.Hash.ToBytes()); err != nil { panic(err) } else { if !transactionStoredAlready { + + fmt.Println(transaction.Hash.ToString()) // process transaction } } @@ -23,15 +34,31 @@ func configure(node *node.Plugin) { } func run(node *node.Plugin) { + /* + if accountability.OWN_ID.StringIdentifier == "72f84aaee02af4542672cb25aceb9d9e458ef2a3" { + go func() { + txCounter := 0 + + for { + txCounter++ + + dummyTx := make([]byte, transaction.MARSHALLED_TOTAL_SIZE / ternary.NUMBER_OF_TRITS_IN_A_BYTE) + for i := 0; i < 1620; i++ { + dummyTx[i] = byte((i + txCounter) % 128) + } + gossip.SendTransaction(transaction.FromBytes(dummyTx)) + + <- time.After(1000 * time.Millisecond) + } + }() + }*/ } +// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// + +// region constants and variables ////////////////////////////////////////////////////////////////////////////////////// + var transactionDatabase database.Database -func init() { - if db, err := database.Get("transactions"); err != nil { - panic(err) - } else { - transactionDatabase = db - } -} +// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////