Skip to content
Snippets Groups Projects
Commit 36dd741d authored by Hans Moog's avatar Hans Moog
Browse files

Fix: fixed gossip protocol gossip.SendTransaction() now works

parent e07fd604
Branches
Tags
No related merge requests found
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"github.com/iotaledger/goshimmer/plugins/gossip" "github.com/iotaledger/goshimmer/plugins/gossip"
"github.com/iotaledger/goshimmer/plugins/gracefulshutdown" "github.com/iotaledger/goshimmer/plugins/gracefulshutdown"
"github.com/iotaledger/goshimmer/plugins/statusscreen" "github.com/iotaledger/goshimmer/plugins/statusscreen"
"github.com/iotaledger/goshimmer/plugins/tangle"
) )
func main() { func main() {
...@@ -15,6 +16,7 @@ func main() { ...@@ -15,6 +16,7 @@ func main() {
cli.PLUGIN, cli.PLUGIN,
autopeering.PLUGIN, autopeering.PLUGIN,
gossip.PLUGIN, gossip.PLUGIN,
tangle.PLUGIN,
analysis.PLUGIN, analysis.PLUGIN,
statusscreen.PLUGIN, statusscreen.PLUGIN,
gracefulshutdown.PLUGIN, gracefulshutdown.PLUGIN,
......
...@@ -107,7 +107,7 @@ func New(message string) *fundamental { ...@@ -107,7 +107,7 @@ func New(message string) *fundamental {
return &fundamental{ return &fundamental{
id: idCounter, id: idCounter,
msg: message, msg: message,
stack: callers(), stack: Callers(),
} }
} }
...@@ -120,7 +120,7 @@ func Errorf(format string, args ...interface{}) IdentifiableError { ...@@ -120,7 +120,7 @@ func Errorf(format string, args ...interface{}) IdentifiableError {
return &fundamental{ return &fundamental{
id: idCounter, id: idCounter,
msg: fmt.Sprintf(format, args...), msg: fmt.Sprintf(format, args...),
stack: callers(), stack: Callers(),
} }
} }
...@@ -135,7 +135,7 @@ func (f *fundamental) Derive(msg string) *fundamental { ...@@ -135,7 +135,7 @@ func (f *fundamental) Derive(msg string) *fundamental {
return &fundamental{ return &fundamental{
id: f.id, id: f.id,
msg: msg, msg: msg,
stack: callers(), stack: Callers(),
} }
} }
...@@ -177,7 +177,7 @@ func WithStack(err error) IdentifiableError { ...@@ -177,7 +177,7 @@ func WithStack(err error) IdentifiableError {
return &withStack{ return &withStack{
idCounter, idCounter,
err, err,
callers(), Callers(),
} }
} }
...@@ -205,7 +205,7 @@ func (w *withStack) Derive(err error, message string) *withStack { ...@@ -205,7 +205,7 @@ func (w *withStack) Derive(err error, message string) *withStack {
cause: err, cause: err,
msg: message, msg: message,
}, },
callers(), Callers(),
} }
} }
...@@ -244,7 +244,7 @@ func Wrap(err error, message string) *withStack { ...@@ -244,7 +244,7 @@ func Wrap(err error, message string) *withStack {
return &withStack{ return &withStack{
idCounter, idCounter,
err, err,
callers(), Callers(),
} }
} }
...@@ -265,7 +265,7 @@ func Wrapf(err error, format string, args ...interface{}) IdentifiableError { ...@@ -265,7 +265,7 @@ func Wrapf(err error, format string, args ...interface{}) IdentifiableError {
return &withStack{ return &withStack{
idCounter, idCounter,
err, err,
callers(), Callers(),
} }
} }
......
...@@ -160,7 +160,7 @@ func (s *stack) StackTrace() StackTrace { ...@@ -160,7 +160,7 @@ func (s *stack) StackTrace() StackTrace {
return f return f
} }
func callers() *stack { func Callers() *stack {
const depth = 32 const depth = 32
var pcs [depth]uintptr var pcs [depth]uintptr
n := runtime.Callers(3, pcs[:]) n := runtime.Callers(3, pcs[:])
......
...@@ -78,4 +78,6 @@ func neighborCaller(handler interface{}, params ...interface{}) { handler.(func( ...@@ -78,4 +78,6 @@ func neighborCaller(handler interface{}, params ...interface{}) { handler.(func(
func errorCaller(handler interface{}, params ...interface{}) { handler.(func(errors.IdentifiableError))(params[0].(errors.IdentifiableError)) } func errorCaller(handler interface{}, params ...interface{}) { handler.(func(errors.IdentifiableError))(params[0].(errors.IdentifiableError)) }
func dataCaller(handler interface{}, params ...interface{}) { handler.(func([]byte))(params[0].([]byte)) }
func transactionCaller(handler interface{}, params ...interface{}) { handler.(func(*transaction.Transaction))(params[0].(*transaction.Transaction)) } func transactionCaller(handler interface{}, params ...interface{}) { handler.(func(*transaction.Transaction))(params[0].(*transaction.Transaction)) }
...@@ -84,7 +84,7 @@ func manageConnection(plugin *node.Plugin, neighbor *Neighbor) { ...@@ -84,7 +84,7 @@ func manageConnection(plugin *node.Plugin, neighbor *Neighbor) {
return return
case <-disconnectSignal: case <-disconnectSignal:
break continue
} }
} }
......
...@@ -12,7 +12,6 @@ func configure(plugin *node.Plugin) { ...@@ -12,7 +12,6 @@ func configure(plugin *node.Plugin) {
configureNeighbors(plugin) configureNeighbors(plugin)
configureServer(plugin) configureServer(plugin)
configureSendQueue(plugin) configureSendQueue(plugin)
configureTransactionProcessor(plugin)
Events.ReceiveTransaction.Attach(events.NewClosure(func(transaction *transaction.Transaction) { Events.ReceiveTransaction.Attach(events.NewClosure(func(transaction *transaction.Transaction) {
...@@ -23,5 +22,4 @@ func run(plugin *node.Plugin) { ...@@ -23,5 +22,4 @@ func run(plugin *node.Plugin) {
runNeighbors(plugin) runNeighbors(plugin)
runServer(plugin) runServer(plugin)
runSendQueue(plugin) runSendQueue(plugin)
runTransactionProcessor(plugin)
} }
...@@ -40,6 +40,7 @@ func newProtocol(conn *network.ManagedConnection) *protocol { ...@@ -40,6 +40,7 @@ func newProtocol(conn *network.ManagedConnection) *protocol {
ReceiveIdentification: events.NewEvent(identityCaller), ReceiveIdentification: events.NewEvent(identityCaller),
ReceiveConnectionAccepted: events.NewEvent(events.CallbackCaller), ReceiveConnectionAccepted: events.NewEvent(events.CallbackCaller),
ReceiveConnectionRejected: events.NewEvent(events.CallbackCaller), ReceiveConnectionRejected: events.NewEvent(events.CallbackCaller),
ReceiveTransactionData: events.NewEvent(dataCaller),
HandshakeCompleted: events.NewEvent(events.CallbackCaller), HandshakeCompleted: events.NewEvent(events.CallbackCaller),
Error: events.NewEvent(errorCaller), Error: events.NewEvent(errorCaller),
}, },
......
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"github.com/iotaledger/goshimmer/packages/errors" "github.com/iotaledger/goshimmer/packages/errors"
"github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/events"
"github.com/iotaledger/goshimmer/packages/identity" "github.com/iotaledger/goshimmer/packages/identity"
"github.com/iotaledger/goshimmer/packages/ternary"
"github.com/iotaledger/goshimmer/packages/transaction" "github.com/iotaledger/goshimmer/packages/transaction"
"strconv" "strconv"
) )
...@@ -219,7 +220,7 @@ func newDispatchStateV1(protocol *protocol) *dispatchStateV1 { ...@@ -219,7 +220,7 @@ func newDispatchStateV1(protocol *protocol) *dispatchStateV1 {
func (state *dispatchStateV1) Receive(data []byte, offset int, length int) (int, errors.IdentifiableError) { func (state *dispatchStateV1) Receive(data []byte, offset int, length int) (int, errors.IdentifiableError) {
switch data[0] { switch data[0] {
case 0: case DISPATCH_DROP:
protocol := state.protocol protocol := state.protocol
protocol.Events.ReceiveConnectionRejected.Trigger() protocol.Events.ReceiveConnectionRejected.Trigger()
...@@ -228,12 +229,12 @@ func (state *dispatchStateV1) Receive(data []byte, offset int, length int) (int, ...@@ -228,12 +229,12 @@ func (state *dispatchStateV1) Receive(data []byte, offset int, length int) (int,
protocol.ReceivingState = nil protocol.ReceivingState = nil
case 1: case DISPATCH_TRANSACTION:
protocol := state.protocol protocol := state.protocol
protocol.ReceivingState = newTransactionStateV1(protocol) protocol.ReceivingState = newTransactionStateV1(protocol)
case 2: case DISPATCH_REQUEST:
protocol := state.protocol protocol := state.protocol
protocol.ReceivingState = newRequestStateV1(protocol) protocol.ReceivingState = newRequestStateV1(protocol)
...@@ -251,6 +252,10 @@ func (state *dispatchStateV1) Send(param interface{}) errors.IdentifiableError { ...@@ -251,6 +252,10 @@ func (state *dispatchStateV1) Send(param interface{}) errors.IdentifiableError {
case DISPATCH_DROP: case DISPATCH_DROP:
protocol := state.protocol protocol := state.protocol
if _, err := protocol.Conn.Write([]byte{DISPATCH_DROP}); err != nil {
return ErrSendFailed.Derive(err, "failed to send drop message")
}
_ = protocol.Conn.Close() _ = protocol.Conn.Close()
protocol.SendState = nil protocol.SendState = nil
...@@ -260,6 +265,10 @@ func (state *dispatchStateV1) Send(param interface{}) errors.IdentifiableError { ...@@ -260,6 +265,10 @@ func (state *dispatchStateV1) Send(param interface{}) errors.IdentifiableError {
case DISPATCH_TRANSACTION: case DISPATCH_TRANSACTION:
protocol := state.protocol protocol := state.protocol
if _, err := protocol.Conn.Write([]byte{DISPATCH_TRANSACTION}); err != nil {
return ErrSendFailed.Derive(err, "failed to send transaction dispatch byte")
}
protocol.SendState = newTransactionStateV1(protocol) protocol.SendState = newTransactionStateV1(protocol)
return nil return nil
...@@ -267,6 +276,10 @@ func (state *dispatchStateV1) Send(param interface{}) errors.IdentifiableError { ...@@ -267,6 +276,10 @@ func (state *dispatchStateV1) Send(param interface{}) errors.IdentifiableError {
case DISPATCH_REQUEST: case DISPATCH_REQUEST:
protocol := state.protocol protocol := state.protocol
if _, err := protocol.Conn.Write([]byte{DISPATCH_REQUEST}); err != nil {
return ErrSendFailed.Derive(err, "failed to send request dispatch byte")
}
protocol.SendState = newTransactionStateV1(protocol) protocol.SendState = newTransactionStateV1(protocol)
return nil return nil
...@@ -288,8 +301,9 @@ type transactionStateV1 struct { ...@@ -288,8 +301,9 @@ type transactionStateV1 struct {
func newTransactionStateV1(protocol *protocol) *transactionStateV1 { func newTransactionStateV1(protocol *protocol) *transactionStateV1 {
return &transactionStateV1{ return &transactionStateV1{
buffer: make([]byte, transaction.MARSHALLED_TOTAL_SIZE), protocol: protocol,
offset: 0, buffer: make([]byte, transaction.MARSHALLED_TOTAL_SIZE / ternary.NUMBER_OF_TRITS_IN_A_BYTE),
offset: 0,
} }
} }
...@@ -297,15 +311,15 @@ func (state *transactionStateV1) Receive(data []byte, offset int, length int) (i ...@@ -297,15 +311,15 @@ func (state *transactionStateV1) Receive(data []byte, offset int, length int) (i
bytesRead := byteutils.ReadAvailableBytesToBuffer(state.buffer, state.offset, data, offset, length) bytesRead := byteutils.ReadAvailableBytesToBuffer(state.buffer, state.offset, data, offset, length)
state.offset += bytesRead state.offset += bytesRead
if state.offset == transaction.MARSHALLED_TOTAL_SIZE { if state.offset == transaction.MARSHALLED_TOTAL_SIZE / ternary.NUMBER_OF_TRITS_IN_A_BYTE {
protocol := state.protocol protocol := state.protocol
transactionData := make([]byte, transaction.MARSHALLED_TOTAL_SIZE) transactionData := make([]byte, transaction.MARSHALLED_TOTAL_SIZE / ternary.NUMBER_OF_TRITS_IN_A_BYTE)
copy(transactionData, state.buffer) copy(transactionData, state.buffer)
protocol.Events.ReceiveTransactionData.Trigger(transactionData) protocol.Events.ReceiveTransactionData.Trigger(transactionData)
go processIncomingTransactionData(transactionData) go ProcessReceivedTransactionData(transactionData)
protocol.ReceivingState = newDispatchStateV1(protocol) protocol.ReceivingState = newDispatchStateV1(protocol)
state.offset = 0 state.offset = 0
...@@ -315,7 +329,19 @@ func (state *transactionStateV1) Receive(data []byte, offset int, length int) (i ...@@ -315,7 +329,19 @@ func (state *transactionStateV1) Receive(data []byte, offset int, length int) (i
} }
func (state *transactionStateV1) Send(param interface{}) errors.IdentifiableError { func (state *transactionStateV1) Send(param interface{}) errors.IdentifiableError {
return nil if tx, ok := param.(*transaction.Transaction); ok {
protocol := state.protocol
if _, err := protocol.Conn.Write(tx.Bytes); err != nil {
return ErrSendFailed.Derive(err, "failed to send transaction")
}
protocol.SendState = newDispatchStateV1(protocol)
return nil
}
return ErrInvalidSendParam.Derive("passed in parameter is not a valid transaction")
} }
// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// // endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
......
...@@ -40,10 +40,10 @@ func runSendQueue(plugin *node.Plugin) { ...@@ -40,10 +40,10 @@ func runSendQueue(plugin *node.Plugin) {
for _, neighborQueue := range neighborQueues { for _, neighborQueue := range neighborQueues {
select { select {
case neighborQueue.queue <- tx: case neighborQueue.queue <- tx:
return // log sth
default: default:
return // log sth
} }
} }
connectedNeighborsMutex.RUnlock() connectedNeighborsMutex.RUnlock()
......
...@@ -2,26 +2,25 @@ package gossip ...@@ -2,26 +2,25 @@ package gossip
import ( import (
"github.com/iotaledger/goshimmer/packages/filter" "github.com/iotaledger/goshimmer/packages/filter"
"github.com/iotaledger/goshimmer/packages/node"
"github.com/iotaledger/goshimmer/packages/transaction" "github.com/iotaledger/goshimmer/packages/transaction"
) )
var transactionFilter = filter.NewByteArrayFilter(TRANSACTION_FILTER_SIZE) // region public api ///////////////////////////////////////////////////////////////////////////////////////////////////
func processIncomingTransactionData(transactionData []byte) { func ProcessReceivedTransactionData(transactionData []byte) {
if transactionFilter.Add(transactionData) { if transactionFilter.Add(transactionData) {
Events.ReceiveTransaction.Trigger(transaction.FromBytes(transactionData)) Events.ReceiveTransaction.Trigger(transaction.FromBytes(transactionData))
} }
} }
// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
// region constants and variables //////////////////////////////////////////////////////////////////////////////////////
func configureTransactionProcessor(plugin *node.Plugin) { var transactionFilter = filter.NewByteArrayFilter(TRANSACTION_FILTER_SIZE)
}
func runTransactionProcessor(plugin *node.Plugin) {
}
const ( const (
TRANSACTION_FILTER_SIZE = 5000 TRANSACTION_FILTER_SIZE = 5000
) )
// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
...@@ -13,7 +13,7 @@ func BenchmarkProcessSimilarTransactionsFiltered(b *testing.B) { ...@@ -13,7 +13,7 @@ func BenchmarkProcessSimilarTransactionsFiltered(b *testing.B) {
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
processIncomingTransactionData(byteArray) ProcessReceivedTransactionData(byteArray)
} }
} }
......
package tangle package tangle
import ( import (
"fmt"
"github.com/iotaledger/goshimmer/packages/database" "github.com/iotaledger/goshimmer/packages/database"
"github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/events"
"github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/packages/node"
...@@ -8,14 +9,24 @@ import ( ...@@ -8,14 +9,24 @@ import (
"github.com/iotaledger/goshimmer/plugins/gossip" "github.com/iotaledger/goshimmer/plugins/gossip"
) )
// region plugin module setup //////////////////////////////////////////////////////////////////////////////////////////
var PLUGIN = node.NewPlugin("Tangle", configure, run) var PLUGIN = node.NewPlugin("Tangle", configure, run)
func configure(node *node.Plugin) { func configure(node *node.Plugin) {
if db, err := database.Get("transactions"); err != nil {
panic(err)
} else {
transactionDatabase = db
}
gossip.Events.ReceiveTransaction.Attach(events.NewClosure(func(transaction *transaction.Transaction) { gossip.Events.ReceiveTransaction.Attach(events.NewClosure(func(transaction *transaction.Transaction) {
if transactionStoredAlready, err := transactionDatabase.Contains(transaction.Hash.ToBytes()); err != nil { if transactionStoredAlready, err := transactionDatabase.Contains(transaction.Hash.ToBytes()); err != nil {
panic(err) panic(err)
} else { } else {
if !transactionStoredAlready { if !transactionStoredAlready {
fmt.Println(transaction.Hash.ToString())
// process transaction // process transaction
} }
} }
...@@ -23,15 +34,31 @@ func configure(node *node.Plugin) { ...@@ -23,15 +34,31 @@ func configure(node *node.Plugin) {
} }
func run(node *node.Plugin) { func run(node *node.Plugin) {
/*
if accountability.OWN_ID.StringIdentifier == "72f84aaee02af4542672cb25aceb9d9e458ef2a3" {
go func() {
txCounter := 0
for {
txCounter++
dummyTx := make([]byte, transaction.MARSHALLED_TOTAL_SIZE / ternary.NUMBER_OF_TRITS_IN_A_BYTE)
for i := 0; i < 1620; i++ {
dummyTx[i] = byte((i + txCounter) % 128)
}
gossip.SendTransaction(transaction.FromBytes(dummyTx))
<- time.After(1000 * time.Millisecond)
}
}()
}*/
} }
// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
// region constants and variables //////////////////////////////////////////////////////////////////////////////////////
var transactionDatabase database.Database var transactionDatabase database.Database
func init() { // endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
if db, err := database.Get("transactions"); err != nil {
panic(err)
} else {
transactionDatabase = db
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment