Skip to content
Snippets Groups Projects
Commit 282a620b authored by Hans Moog's avatar Hans Moog
Browse files

Feat: added transaction processor and filter (250 times faster)

parent 39237b7c
No related branches found
No related tags found
No related merge requests found
Showing with 335 additions and 59 deletions
package main package main
import ( import (
"github.com/iotaledger/goshimmer/packages/database"
"github.com/iotaledger/goshimmer/packages/events"
"github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/packages/node"
"github.com/iotaledger/goshimmer/packages/transaction"
"github.com/iotaledger/goshimmer/plugins/analysis" "github.com/iotaledger/goshimmer/plugins/analysis"
"github.com/iotaledger/goshimmer/plugins/autopeering" "github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/cli" "github.com/iotaledger/goshimmer/plugins/cli"
...@@ -19,4 +22,9 @@ func main() { ...@@ -19,4 +22,9 @@ func main() {
statusscreen.PLUGIN, statusscreen.PLUGIN,
gracefulshutdown.PLUGIN, gracefulshutdown.PLUGIN,
) )
db, _ := database.Get("transactions")
gossip.Events.ReceiveTransaction.Attach(events.NewClosure(func(tx *transaction.Transaction) {
db.Set(tx.Hash.ToBytes(), tx.Bytes)
}))
} }
...@@ -76,6 +76,21 @@ func (this *databaseImpl) Set(key []byte, value []byte) error { ...@@ -76,6 +76,21 @@ func (this *databaseImpl) Set(key []byte, value []byte) error {
return nil return nil
} }
func (this *databaseImpl) Contains(key []byte) (bool, error) {
if err := this.db.View(func(txn *badger.Txn) error {
_, err := txn.Get(key)
if err != nil {
return err
}
return nil
}); err == ErrKeyNotFound {
return false, nil
} else {
return err == nil, err
}
}
func (this *databaseImpl) Get(key []byte) ([]byte, error) { func (this *databaseImpl) Get(key []byte) ([]byte, error) {
var result []byte = nil var result []byte = nil
var err error = nil var err error = nil
......
...@@ -4,5 +4,6 @@ type Database interface { ...@@ -4,5 +4,6 @@ type Database interface {
Open() error Open() error
Set(key []byte, value []byte) error Set(key []byte, value []byte) error
Get(key []byte) ([]byte, error) Get(key []byte) ([]byte, error)
Contains(key []byte) (bool, error)
Close() error Close() error
} }
package filter
import (
"github.com/iotaledger/goshimmer/packages/typeconversion"
"sync"
)
type ByteArrayFilter struct {
byteArrays [][]byte
byteArraysByKey map[string]bool
size int
mutex sync.RWMutex
}
func NewByteArrayFilter(size int) *ByteArrayFilter {
return &ByteArrayFilter{
byteArrays: make([][]byte, 0, size),
byteArraysByKey: make(map[string]bool, size),
size: size,
}
}
func (filter *ByteArrayFilter) Contains(byteArray []byte) bool {
filter.mutex.RLock()
defer filter.mutex.RUnlock()
_, exists := filter.byteArraysByKey[typeconversion.BytesToString(byteArray)]
return exists
}
func (filter *ByteArrayFilter) Add(byteArray []byte) bool {
key := typeconversion.BytesToString(byteArray)
filter.mutex.Lock()
defer filter.mutex.Unlock()
if _, exists := filter.byteArraysByKey[key]; !exists {
if len(filter.byteArrays) == filter.size {
delete(filter.byteArraysByKey, typeconversion.BytesToString(filter.byteArrays[0]))
filter.byteArrays = append(filter.byteArrays[1:], byteArray)
} else {
filter.byteArrays = append(filter.byteArrays, byteArray)
}
filter.byteArraysByKey[key] = true
return true
} else {
return false
}
}
package filter
import "testing"
func BenchmarkAdd(b *testing.B) {
filter, byteArray := setupFilter(15000, 1604)
b.ResetTimer()
for i := 0; i < b.N; i++ {
filter.Add(byteArray)
}
}
func BenchmarkContains(b *testing.B) {
filter, byteArray := setupFilter(15000, 1604)
b.ResetTimer()
for i := 0; i < b.N; i++ {
filter.Contains(byteArray)
}
}
func setupFilter(filterSize int, byteArraySize int) (*ByteArrayFilter, []byte) {
filter := NewByteArrayFilter(filterSize)
for j := 0; j < filterSize; j++ {
byteArray := make([]byte, byteArraySize)
for i := 0; i < len(byteArray); i++ {
byteArray[(i + j) % byteArraySize] = byte((i + j) % 128)
}
filter.Add(byteArray)
}
byteArray := make([]byte, byteArraySize)
for i := 0; i < len(byteArray); i++ {
byteArray[i] = byte(i % 128)
}
return filter, byteArray
}
package typeconversion
import (
"reflect"
"unsafe"
)
func BytesToString(b []byte) string {
bh := (*reflect.SliceHeader)(unsafe.Pointer(&b))
return *(*string)(unsafe.Pointer(&reflect.StringHeader{bh.Data, bh.Len}))
}
\ No newline at end of file
...@@ -50,14 +50,14 @@ type pluginEvents struct { ...@@ -50,14 +50,14 @@ type pluginEvents struct {
} }
type protocolEvents struct { type protocolEvents struct {
ReceiveVersion *events.Event ReceiveVersion *events.Event
ReceiveIdentification *events.Event ReceiveIdentification *events.Event
AcceptConnection *events.Event ReceiveConnectionAccepted *events.Event
RejectConnection *events.Event ReceiveConnectionRejected *events.Event
DropConnection *events.Event ReceiveDropConnection *events.Event
ReceiveTransactionData *events.Event ReceiveTransactionData *events.Event
ReceiveRequestData *events.Event ReceiveRequestData *events.Event
Error *events.Event Error *events.Event
} }
func intCaller(handler interface{}, params ...interface{}) { handler.(func(int))(params[0].(int)) } func intCaller(handler interface{}, params ...interface{}) { handler.(func(int))(params[0].(int)) }
......
...@@ -48,45 +48,43 @@ func manageConnection(plugin *node.Plugin, neighbor *Peer) { ...@@ -48,45 +48,43 @@ func manageConnection(plugin *node.Plugin, neighbor *Peer) {
daemon.BackgroundWorker(func() { daemon.BackgroundWorker(func() {
failedConnectionAttempts := 0 failedConnectionAttempts := 0
for failedConnectionAttempts < CONNECTION_MAX_ATTEMPTS { for _, exists := GetNeighbor(neighbor.Identity.StringIdentifier); exists && failedConnectionAttempts < CONNECTION_MAX_ATTEMPTS; {
if neighbor, exists := GetNeighbor(neighbor.Identity.StringIdentifier); !exists { conn, dialed, err := neighbor.Connect()
return if err != nil {
} else { failedConnectionAttempts++
if conn, newConnection, err := neighbor.Connect(); err != nil {
failedConnectionAttempts++
plugin.LogFailure("connection attempt [" + strconv.Itoa(int(failedConnectionAttempts)) + "/" + strconv.Itoa(CONNECTION_MAX_ATTEMPTS) + "] " + err.Error())
if failedConnectionAttempts <= CONNECTION_MAX_ATTEMPTS {
select {
case <-daemon.ShutdownSignal:
return
case <-time.After(time.Duration(int(math.Pow(2, float64(failedConnectionAttempts-1)))) * CONNECTION_BASE_TIMEOUT):
// continue
}
}
} else {
failedConnectionAttempts = 0
disconnectChan := make(chan int, 1)
conn.Events.Close.Attach(events.NewClosure(func() {
close(disconnectChan)
}))
if newConnection { plugin.LogFailure("connection attempt [" + strconv.Itoa(int(failedConnectionAttempts)) + "/" + strconv.Itoa(CONNECTION_MAX_ATTEMPTS) + "] " + err.Error())
go newProtocol(conn).init()
}
if failedConnectionAttempts <= CONNECTION_MAX_ATTEMPTS {
select { select {
case <-daemon.ShutdownSignal: case <-daemon.ShutdownSignal:
return return
case <-disconnectChan: case <-time.After(time.Duration(int(math.Pow(2, float64(failedConnectionAttempts-1)))) * CONNECTION_BASE_TIMEOUT):
break continue
} }
} }
} }
failedConnectionAttempts = 0
disconnectChan := make(chan int, 1)
conn.Events.Close.Attach(events.NewClosure(func() {
close(disconnectChan)
}))
if dialed {
go newProtocol(conn).init()
}
// wait for shutdown or
select {
case <-daemon.ShutdownSignal:
return
case <-disconnectChan:
break
}
} }
RemoveNeighbor(neighbor.Identity.StringIdentifier) RemoveNeighbor(neighbor.Identity.StringIdentifier)
...@@ -211,7 +209,6 @@ func GetNeighbors() map[string]*Peer { ...@@ -211,7 +209,6 @@ func GetNeighbors() map[string]*Peer {
const ( const (
CONNECTION_MAX_ATTEMPTS = 5 CONNECTION_MAX_ATTEMPTS = 5
CONNECTION_BASE_TIMEOUT = 10 * time.Second CONNECTION_BASE_TIMEOUT = 10 * time.Second
MARSHALLED_NEIGHBOR_TOTAL_SIZE = 1
) )
var neighbors = make(map[string]*Peer) var neighbors = make(map[string]*Peer)
......
package gossip package gossip
import "github.com/iotaledger/goshimmer/packages/node" import (
"github.com/iotaledger/goshimmer/packages/events"
"github.com/iotaledger/goshimmer/packages/node"
"github.com/iotaledger/goshimmer/packages/transaction"
)
var PLUGIN = node.NewPlugin("Gossip", configure, run) var PLUGIN = node.NewPlugin("Gossip", configure, run)
func configure(plugin *node.Plugin) { func configure(plugin *node.Plugin) {
configureNeighbors(plugin) configureNeighbors(plugin)
configureServer(plugin) configureServer(plugin)
Events.ReceiveTransaction.Attach(events.NewClosure(func(transaction *transaction.Transaction) {
}))
} }
func run(plugin *node.Plugin) { func run(plugin *node.Plugin) {
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"github.com/iotaledger/goshimmer/packages/accountability" "github.com/iotaledger/goshimmer/packages/accountability"
"github.com/iotaledger/goshimmer/packages/errors" "github.com/iotaledger/goshimmer/packages/errors"
"github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/events"
"github.com/iotaledger/goshimmer/packages/identity"
"github.com/iotaledger/goshimmer/packages/network" "github.com/iotaledger/goshimmer/packages/network"
"strconv" "strconv"
) )
...@@ -16,7 +17,7 @@ type protocolState interface { ...@@ -16,7 +17,7 @@ type protocolState interface {
// endregion //////////////////////////////////////////////////////////////////////////////////////////////////////////// // endregion ////////////////////////////////////////////////////////////////////////////////////////////////////////////
// region protocol ///////////////////////////////////////////////////////////////////////////////////////////////////// //region protocol //////////////////////////////////////////////////////////////////////////////////////////////////////
type protocol struct { type protocol struct {
Conn *network.ManagedConnection Conn *network.ManagedConnection
...@@ -31,33 +32,68 @@ func newProtocol(conn *network.ManagedConnection) *protocol { ...@@ -31,33 +32,68 @@ func newProtocol(conn *network.ManagedConnection) *protocol {
Conn: conn, Conn: conn,
CurrentState: &versionState{}, CurrentState: &versionState{},
Events: protocolEvents{ Events: protocolEvents{
ReceiveVersion: events.NewEvent(intCaller), ReceiveVersion: events.NewEvent(intCaller),
ReceiveIdentification: events.NewEvent(identityCaller), ReceiveIdentification: events.NewEvent(identityCaller),
ReceiveConnectionAccepted: events.NewEvent(events.CallbackCaller),
ReceiveConnectionRejected: events.NewEvent(events.CallbackCaller),
}, },
} }
return protocol return protocol
} }
func (protocol *protocol) sendVersion() {
protocol.Conn.Write([]byte{1})
}
func (protocol *protocol) sendIdentification() {
if signature, err := accountability.OWN_ID.Sign(accountability.OWN_ID.Identifier); err == nil {
protocol.Conn.Write(accountability.OWN_ID.Identifier)
protocol.Conn.Write(signature)
}
}
func (protocol *protocol) rejectConnection() {
protocol.Conn.Write([]byte{0})
protocol.Conn.Close()
}
func (protocol *protocol) acceptConnection() {
protocol.Conn.Write([]byte{1})
}
func (protocol *protocol) init() { func (protocol *protocol) init() {
var onClose, onReceiveData *events.Closure //region setup event handlers
onReceiveIdentification := events.NewClosure(func(identity *identity.Identity) {
if protocol.Neighbor == nil {
protocol.rejectConnection()
} else {
protocol.acceptConnection()
}
})
onReceiveData := events.NewClosure(protocol.parseData)
onReceiveData = events.NewClosure(protocol.parseData) var onClose *events.Closure // define var first so we can use it in the closure
onClose = events.NewClosure(func() { onClose = events.NewClosure(func() {
protocol.Conn.Events.ReceiveData.Detach(onReceiveData) protocol.Conn.Events.ReceiveData.Detach(onReceiveData)
protocol.Conn.Events.Close.Detach(onClose) protocol.Conn.Events.Close.Detach(onClose)
}) })
//endregion
//region register event handlers
protocol.Events.ReceiveIdentification.Attach(onReceiveIdentification)
protocol.Conn.Events.ReceiveData.Attach(onReceiveData) protocol.Conn.Events.ReceiveData.Attach(onReceiveData)
protocol.Conn.Events.Close.Attach(onClose) protocol.Conn.Events.Close.Attach(onClose)
//endregion
protocol.Conn.Write([]byte{1}) //region send initial handshake
protocol.Conn.Write(accountability.OWN_ID.Identifier) protocol.sendVersion()
protocol.sendIdentification()
if signature, err := accountability.OWN_ID.Sign(accountability.OWN_ID.Identifier); err == nil { //endregion
protocol.Conn.Write(signature)
}
// start reading from the connection
protocol.Conn.Read(make([]byte, 1000)) protocol.Conn.Read(make([]byte, 1000))
} }
......
...@@ -74,16 +74,15 @@ func newacceptanceStateV1() *acceptanceStateV1 { ...@@ -74,16 +74,15 @@ func newacceptanceStateV1() *acceptanceStateV1 {
func (state *acceptanceStateV1) Consume(protocol *protocol, data []byte, offset int, length int) (int, errors.IdentifiableError) { func (state *acceptanceStateV1) Consume(protocol *protocol, data []byte, offset int, length int) (int, errors.IdentifiableError) {
switch data[offset] { switch data[offset] {
case 0: case 0:
protocol.Events.RejectConnection.Trigger() protocol.Events.ReceiveConnectionRejected.Trigger()
RemoveNeighbor(protocol.Neighbor.Identity.StringIdentifier) protocol.Conn.Close()
protocol.Neighbor.InitiatedConn.Close()
protocol.CurrentState = nil protocol.CurrentState = nil
break break
case 1: case 1:
protocol.Events.AcceptConnection.Trigger() protocol.Events.ReceiveConnectionAccepted.Trigger()
protocol.CurrentState = newDispatchStateV1() protocol.CurrentState = newDispatchStateV1()
break break
...@@ -108,7 +107,7 @@ func newDispatchStateV1() *dispatchStateV1 { ...@@ -108,7 +107,7 @@ func newDispatchStateV1() *dispatchStateV1 {
func (state *dispatchStateV1) Consume(protocol *protocol, data []byte, offset int, length int) (int, errors.IdentifiableError) { func (state *dispatchStateV1) Consume(protocol *protocol, data []byte, offset int, length int) (int, errors.IdentifiableError) {
switch data[0] { switch data[0] {
case 0: case 0:
protocol.Events.RejectConnection.Trigger() protocol.Events.ReceiveConnectionRejected.Trigger()
protocol.Neighbor.InitiatedConn.Close() protocol.Neighbor.InitiatedConn.Close()
protocol.CurrentState = nil protocol.CurrentState = nil
...@@ -153,9 +152,7 @@ func (state *transactionStateV1) Consume(protocol *protocol, data []byte, offset ...@@ -153,9 +152,7 @@ func (state *transactionStateV1) Consume(protocol *protocol, data []byte, offset
protocol.Events.ReceiveTransactionData.Trigger(transactionData) protocol.Events.ReceiveTransactionData.Trigger(transactionData)
go func() { go processTransactionData(transactionData)
Events.ReceiveTransaction.Trigger(transaction.FromBytes(transactionData))
}()
protocol.CurrentState = newDispatchStateV1() protocol.CurrentState = newDispatchStateV1()
state.offset = 0 state.offset = 0
......
...@@ -16,6 +16,7 @@ func configureServer(plugin *node.Plugin) { ...@@ -16,6 +16,7 @@ func configureServer(plugin *node.Plugin) {
TCPServer.Events.Connect.Attach(events.NewClosure(func(conn *network.ManagedConnection) { TCPServer.Events.Connect.Attach(events.NewClosure(func(conn *network.ManagedConnection) {
protocol := newProtocol(conn) protocol := newProtocol(conn)
// store connection in neighbor if its a neighbor calling
protocol.Events.ReceiveIdentification.Attach(events.NewClosure(func(identity *identity.Identity) { protocol.Events.ReceiveIdentification.Attach(events.NewClosure(func(identity *identity.Identity) {
if protocol.Neighbor != nil { if protocol.Neighbor != nil {
protocol.Neighbor.acceptedConnMutex.Lock() protocol.Neighbor.acceptedConnMutex.Lock()
......
package gossip
import (
"github.com/iotaledger/goshimmer/packages/filter"
"github.com/iotaledger/goshimmer/packages/transaction"
)
var transactionFilter = filter.NewByteArrayFilter(TRANSACTION_FILTER_SIZE)
func processTransactionData(transactionData []byte) {
if transactionFilter.Add(transactionData) {
Events.ReceiveTransaction.Trigger(transaction.FromBytes(transactionData))
}
}
const (
TRANSACTION_FILTER_SIZE = 5000
)
package gossip
import (
"github.com/iotaledger/goshimmer/packages/ternary"
"github.com/iotaledger/goshimmer/packages/transaction"
"sync"
"testing"
)
func BenchmarkProcessSimilarTransactionsFiltered(b *testing.B) {
byteArray := setupTransaction(transaction.MARSHALLED_TOTAL_SIZE / ternary.NUMBER_OF_TRITS_IN_A_BYTE)
b.ResetTimer()
for i := 0; i < b.N; i++ {
processTransactionData(byteArray)
}
}
func BenchmarkProcessSimilarTransactionsUnfiltered(b *testing.B) {
byteArray := setupTransaction(transaction.MARSHALLED_TOTAL_SIZE / ternary.NUMBER_OF_TRITS_IN_A_BYTE)
b.ResetTimer()
var wg sync.WaitGroup
for i := 0; i < b.N; i++ {
wg.Add(1)
go func() {
Events.ReceiveTransaction.Trigger(transaction.FromBytes(byteArray))
wg.Done()
}()
}
wg.Wait()
}
func setupTransaction(byteArraySize int) []byte {
byteArray := make([]byte, byteArraySize)
for i := 0; i < len(byteArray); i++ {
byteArray[i] = byte(i % 128)
}
return byteArray
}
package tangle
import (
"github.com/iotaledger/goshimmer/packages/database"
"github.com/iotaledger/goshimmer/packages/events"
"github.com/iotaledger/goshimmer/packages/node"
"github.com/iotaledger/goshimmer/packages/transaction"
"github.com/iotaledger/goshimmer/plugins/gossip"
)
var PLUGIN = node.NewPlugin("Tangle", configure, run)
func configure(node *node.Plugin) {
gossip.Events.ReceiveTransaction.Attach(events.NewClosure(func(transaction *transaction.Transaction) {
if transactionStoredAlready, err := transactionDatabase.Contains(transaction.Hash.ToBytes()); err != nil {
panic(err)
} else {
if !transactionStoredAlready {
// process transaction
}
}
}))
}
func run(node *node.Plugin) {
}
var transactionDatabase database.Database
func init() {
if db, err := database.Get("transactions"); err != nil {
panic(err)
} else {
transactionDatabase = db
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment