From 282a620b935fce6889e57de7b3be9c8551b9a892 Mon Sep 17 00:00:00 2001
From: Hans Moog <hm@mkjc.net>
Date: Thu, 16 May 2019 12:21:50 +0200
Subject: [PATCH] Feat: added transaction processor and filter (250 times
 faster)

---
 main.go                                      |  8 +++
 packages/database/database.go                | 15 +++++
 packages/database/interfaces.go              |  1 +
 packages/filter/byte_array_filter.go         | 53 ++++++++++++++++++
 packages/filter/byte_array_filter_test.go    | 45 +++++++++++++++
 packages/typeconversion/typeconversion.go    | 12 ++++
 plugins/gossip/events.go                     | 16 +++---
 plugins/gossip/neighbors.go                  | 59 ++++++++++----------
 plugins/gossip/plugin.go                     | 10 +++-
 plugins/gossip/protocol.go                   | 58 +++++++++++++++----
 plugins/gossip/protocol_v1.go                | 13 ++---
 plugins/gossip/server.go                     |  1 +
 plugins/gossip/transaction_processor.go      | 18 ++++++
 plugins/gossip/transaction_processor_test.go | 48 ++++++++++++++++
 plugins/tangle/plugin.go                     | 37 ++++++++++++
 15 files changed, 335 insertions(+), 59 deletions(-)
 create mode 100644 packages/filter/byte_array_filter.go
 create mode 100644 packages/filter/byte_array_filter_test.go
 create mode 100644 packages/typeconversion/typeconversion.go
 create mode 100644 plugins/gossip/transaction_processor.go
 create mode 100644 plugins/gossip/transaction_processor_test.go
 create mode 100644 plugins/tangle/plugin.go

diff --git a/main.go b/main.go
index 9d80fd89..62b4183b 100644
--- a/main.go
+++ b/main.go
@@ -1,7 +1,10 @@
 package main
 
 import (
+    "github.com/iotaledger/goshimmer/packages/database"
+    "github.com/iotaledger/goshimmer/packages/events"
     "github.com/iotaledger/goshimmer/packages/node"
+    "github.com/iotaledger/goshimmer/packages/transaction"
     "github.com/iotaledger/goshimmer/plugins/analysis"
     "github.com/iotaledger/goshimmer/plugins/autopeering"
     "github.com/iotaledger/goshimmer/plugins/cli"
@@ -19,4 +22,9 @@ func main() {
         statusscreen.PLUGIN,
         gracefulshutdown.PLUGIN,
     )
+
+    db, _ := database.Get("transactions")
+    gossip.Events.ReceiveTransaction.Attach(events.NewClosure(func(tx *transaction.Transaction) {
+        db.Set(tx.Hash.ToBytes(), tx.Bytes)
+    }))
 }
diff --git a/packages/database/database.go b/packages/database/database.go
index 4dfb0803..70f1f544 100644
--- a/packages/database/database.go
+++ b/packages/database/database.go
@@ -76,6 +76,21 @@ func (this *databaseImpl) Set(key []byte, value []byte) error {
     return nil
 }
 
+func (this *databaseImpl) Contains(key []byte) (bool, error) {
+    if err := this.db.View(func(txn *badger.Txn) error {
+        _, err := txn.Get(key)
+        if err != nil {
+            return err
+        }
+
+        return nil
+    }); err == ErrKeyNotFound {
+        return false, nil
+    } else {
+        return err == nil, err
+    }
+}
+
 func (this *databaseImpl) Get(key []byte) ([]byte, error) {
     var result []byte = nil
     var err error = nil
diff --git a/packages/database/interfaces.go b/packages/database/interfaces.go
index c8fbb829..c3ff1c35 100644
--- a/packages/database/interfaces.go
+++ b/packages/database/interfaces.go
@@ -4,5 +4,6 @@ type Database interface {
     Open() error
     Set(key []byte, value []byte) error
     Get(key []byte) ([]byte, error)
+    Contains(key []byte) (bool, error)
     Close() error
 }
diff --git a/packages/filter/byte_array_filter.go b/packages/filter/byte_array_filter.go
new file mode 100644
index 00000000..c879cc88
--- /dev/null
+++ b/packages/filter/byte_array_filter.go
@@ -0,0 +1,53 @@
+package filter
+
+import (
+    "github.com/iotaledger/goshimmer/packages/typeconversion"
+    "sync"
+)
+
+type ByteArrayFilter struct {
+    byteArrays      [][]byte
+    byteArraysByKey map[string]bool
+    size            int
+    mutex           sync.RWMutex
+}
+
+func NewByteArrayFilter(size int) *ByteArrayFilter {
+    return &ByteArrayFilter{
+        byteArrays:      make([][]byte, 0, size),
+        byteArraysByKey: make(map[string]bool, size),
+        size:            size,
+    }
+}
+
+func (filter *ByteArrayFilter) Contains(byteArray []byte) bool {
+    filter.mutex.RLock()
+    defer filter.mutex.RUnlock()
+
+    _, exists := filter.byteArraysByKey[typeconversion.BytesToString(byteArray)]
+
+    return exists
+}
+
+func (filter *ByteArrayFilter) Add(byteArray []byte) bool {
+    key := typeconversion.BytesToString(byteArray)
+
+    filter.mutex.Lock()
+    defer filter.mutex.Unlock()
+
+    if _, exists := filter.byteArraysByKey[key]; !exists {
+        if len(filter.byteArrays) == filter.size {
+            delete(filter.byteArraysByKey, typeconversion.BytesToString(filter.byteArrays[0]))
+
+            filter.byteArrays = append(filter.byteArrays[1:], byteArray)
+        } else {
+            filter.byteArrays = append(filter.byteArrays, byteArray)
+        }
+
+        filter.byteArraysByKey[key] = true
+
+        return true
+    } else {
+        return false
+    }
+}
diff --git a/packages/filter/byte_array_filter_test.go b/packages/filter/byte_array_filter_test.go
new file mode 100644
index 00000000..c2031bd1
--- /dev/null
+++ b/packages/filter/byte_array_filter_test.go
@@ -0,0 +1,45 @@
+package filter
+
+import "testing"
+
+func BenchmarkAdd(b *testing.B) {
+    filter, byteArray := setupFilter(15000, 1604)
+
+    b.ResetTimer()
+
+    for i := 0; i < b.N; i++ {
+        filter.Add(byteArray)
+    }
+}
+
+func BenchmarkContains(b *testing.B) {
+    filter, byteArray := setupFilter(15000, 1604)
+
+    b.ResetTimer()
+
+    for i := 0; i < b.N; i++ {
+        filter.Contains(byteArray)
+    }
+}
+
+func setupFilter(filterSize int, byteArraySize int) (*ByteArrayFilter, []byte) {
+    filter := NewByteArrayFilter(filterSize)
+
+    for j := 0; j < filterSize; j++ {
+        byteArray := make([]byte, byteArraySize)
+
+        for i := 0; i < len(byteArray); i++ {
+            byteArray[(i + j) % byteArraySize] = byte((i + j) % 128)
+        }
+
+        filter.Add(byteArray)
+    }
+
+    byteArray := make([]byte, byteArraySize)
+
+    for i := 0; i < len(byteArray); i++ {
+        byteArray[i] = byte(i % 128)
+    }
+
+    return filter, byteArray
+}
diff --git a/packages/typeconversion/typeconversion.go b/packages/typeconversion/typeconversion.go
new file mode 100644
index 00000000..e820a9b6
--- /dev/null
+++ b/packages/typeconversion/typeconversion.go
@@ -0,0 +1,12 @@
+package typeconversion
+
+import (
+    "reflect"
+    "unsafe"
+)
+
+func BytesToString(b []byte) string {
+    bh := (*reflect.SliceHeader)(unsafe.Pointer(&b))
+
+    return *(*string)(unsafe.Pointer(&reflect.StringHeader{bh.Data, bh.Len}))
+}
\ No newline at end of file
diff --git a/plugins/gossip/events.go b/plugins/gossip/events.go
index e22b0117..2e0162b5 100644
--- a/plugins/gossip/events.go
+++ b/plugins/gossip/events.go
@@ -50,14 +50,14 @@ type pluginEvents struct {
 }
 
 type protocolEvents struct {
-    ReceiveVersion         *events.Event
-    ReceiveIdentification  *events.Event
-    AcceptConnection       *events.Event
-    RejectConnection       *events.Event
-    DropConnection         *events.Event
-    ReceiveTransactionData *events.Event
-    ReceiveRequestData     *events.Event
-    Error                  *events.Event
+    ReceiveVersion            *events.Event
+    ReceiveIdentification     *events.Event
+    ReceiveConnectionAccepted *events.Event
+    ReceiveConnectionRejected *events.Event
+    ReceiveDropConnection     *events.Event
+    ReceiveTransactionData    *events.Event
+    ReceiveRequestData        *events.Event
+    Error                     *events.Event
 }
 
 func intCaller(handler interface{}, params ...interface{}) { handler.(func(int))(params[0].(int)) }
diff --git a/plugins/gossip/neighbors.go b/plugins/gossip/neighbors.go
index c27c5f8b..ebc42a47 100644
--- a/plugins/gossip/neighbors.go
+++ b/plugins/gossip/neighbors.go
@@ -48,45 +48,43 @@ func manageConnection(plugin *node.Plugin, neighbor *Peer) {
     daemon.BackgroundWorker(func() {
         failedConnectionAttempts := 0
 
-        for failedConnectionAttempts < CONNECTION_MAX_ATTEMPTS {
-            if neighbor, exists := GetNeighbor(neighbor.Identity.StringIdentifier); !exists {
-                return
-            } else {
-                if conn, newConnection, err := neighbor.Connect(); err != nil {
-                    failedConnectionAttempts++
-
-                    plugin.LogFailure("connection attempt [" + strconv.Itoa(int(failedConnectionAttempts)) + "/" + strconv.Itoa(CONNECTION_MAX_ATTEMPTS) + "] " + err.Error())
-
-                    if failedConnectionAttempts <= CONNECTION_MAX_ATTEMPTS {
-                        select {
-                        case <-daemon.ShutdownSignal:
-                            return
-
-                        case <-time.After(time.Duration(int(math.Pow(2, float64(failedConnectionAttempts-1)))) * CONNECTION_BASE_TIMEOUT):
-                            // continue
-                        }
-                    }
-                } else {
-                    failedConnectionAttempts = 0
-
-                    disconnectChan := make(chan int, 1)
-                    conn.Events.Close.Attach(events.NewClosure(func() {
-                        close(disconnectChan)
-                    }))
+        for _, exists := GetNeighbor(neighbor.Identity.StringIdentifier); exists && failedConnectionAttempts < CONNECTION_MAX_ATTEMPTS; {
+            conn, dialed, err := neighbor.Connect()
+            if err != nil {
+                failedConnectionAttempts++
 
-                    if newConnection {
-                        go newProtocol(conn).init()
-                    }
+                plugin.LogFailure("connection attempt [" + strconv.Itoa(int(failedConnectionAttempts)) + "/" + strconv.Itoa(CONNECTION_MAX_ATTEMPTS) + "] " + err.Error())
 
+                if failedConnectionAttempts <= CONNECTION_MAX_ATTEMPTS {
                     select {
                     case <-daemon.ShutdownSignal:
                         return
 
-                    case <-disconnectChan:
-                        break
+                    case <-time.After(time.Duration(int(math.Pow(2, float64(failedConnectionAttempts-1)))) * CONNECTION_BASE_TIMEOUT):
+                        continue
                     }
                 }
             }
+
+            failedConnectionAttempts = 0
+
+            disconnectChan := make(chan int, 1)
+            conn.Events.Close.Attach(events.NewClosure(func() {
+                close(disconnectChan)
+            }))
+
+            if dialed {
+                go newProtocol(conn).init()
+            }
+
+            // wait for shutdown or
+            select {
+            case <-daemon.ShutdownSignal:
+                return
+
+            case <-disconnectChan:
+                break
+            }
         }
 
         RemoveNeighbor(neighbor.Identity.StringIdentifier)
@@ -211,7 +209,6 @@ func GetNeighbors() map[string]*Peer {
 const (
     CONNECTION_MAX_ATTEMPTS        = 5
     CONNECTION_BASE_TIMEOUT        = 10 * time.Second
-    MARSHALLED_NEIGHBOR_TOTAL_SIZE = 1
 )
 
 var neighbors = make(map[string]*Peer)
diff --git a/plugins/gossip/plugin.go b/plugins/gossip/plugin.go
index 1f76cf1a..2ef6867c 100644
--- a/plugins/gossip/plugin.go
+++ b/plugins/gossip/plugin.go
@@ -1,12 +1,20 @@
 package gossip
 
-import "github.com/iotaledger/goshimmer/packages/node"
+import (
+    "github.com/iotaledger/goshimmer/packages/events"
+    "github.com/iotaledger/goshimmer/packages/node"
+    "github.com/iotaledger/goshimmer/packages/transaction"
+)
 
 var PLUGIN = node.NewPlugin("Gossip", configure, run)
 
 func configure(plugin *node.Plugin) {
     configureNeighbors(plugin)
     configureServer(plugin)
+
+    Events.ReceiveTransaction.Attach(events.NewClosure(func(transaction *transaction.Transaction) {
+
+    }))
 }
 
 func run(plugin *node.Plugin) {
diff --git a/plugins/gossip/protocol.go b/plugins/gossip/protocol.go
index 7ba2e322..e8b35fa6 100644
--- a/plugins/gossip/protocol.go
+++ b/plugins/gossip/protocol.go
@@ -4,6 +4,7 @@ import (
     "github.com/iotaledger/goshimmer/packages/accountability"
     "github.com/iotaledger/goshimmer/packages/errors"
     "github.com/iotaledger/goshimmer/packages/events"
+    "github.com/iotaledger/goshimmer/packages/identity"
     "github.com/iotaledger/goshimmer/packages/network"
     "strconv"
 )
@@ -16,7 +17,7 @@ type protocolState interface {
 
 // endregion ////////////////////////////////////////////////////////////////////////////////////////////////////////////
 
-// region protocol /////////////////////////////////////////////////////////////////////////////////////////////////////
+//region protocol //////////////////////////////////////////////////////////////////////////////////////////////////////
 
 type protocol struct {
     Conn         *network.ManagedConnection
@@ -31,33 +32,68 @@ func newProtocol(conn *network.ManagedConnection) *protocol {
         Conn:         conn,
         CurrentState: &versionState{},
         Events: protocolEvents{
-            ReceiveVersion:        events.NewEvent(intCaller),
-            ReceiveIdentification: events.NewEvent(identityCaller),
+            ReceiveVersion:            events.NewEvent(intCaller),
+            ReceiveIdentification:     events.NewEvent(identityCaller),
+            ReceiveConnectionAccepted: events.NewEvent(events.CallbackCaller),
+            ReceiveConnectionRejected: events.NewEvent(events.CallbackCaller),
         },
     }
 
     return protocol
 }
 
+func (protocol *protocol) sendVersion() {
+    protocol.Conn.Write([]byte{1})
+}
+
+func (protocol *protocol) sendIdentification() {
+    if signature, err := accountability.OWN_ID.Sign(accountability.OWN_ID.Identifier); err == nil {
+        protocol.Conn.Write(accountability.OWN_ID.Identifier)
+        protocol.Conn.Write(signature)
+    }
+}
+
+func (protocol *protocol) rejectConnection() {
+    protocol.Conn.Write([]byte{0})
+
+    protocol.Conn.Close()
+}
+
+func (protocol *protocol) acceptConnection() {
+    protocol.Conn.Write([]byte{1})
+}
+
 func (protocol *protocol) init() {
-    var onClose, onReceiveData *events.Closure
+    //region setup event handlers
+    onReceiveIdentification := events.NewClosure(func(identity *identity.Identity) {
+        if protocol.Neighbor == nil {
+            protocol.rejectConnection()
+        } else {
+            protocol.acceptConnection()
+        }
+    })
+
+    onReceiveData := events.NewClosure(protocol.parseData)
 
-    onReceiveData = events.NewClosure(protocol.parseData)
+    var onClose *events.Closure // define var first so we can use it in the closure
     onClose = events.NewClosure(func() {
         protocol.Conn.Events.ReceiveData.Detach(onReceiveData)
         protocol.Conn.Events.Close.Detach(onClose)
     })
+    //endregion
 
+    //region register event handlers
+    protocol.Events.ReceiveIdentification.Attach(onReceiveIdentification)
     protocol.Conn.Events.ReceiveData.Attach(onReceiveData)
     protocol.Conn.Events.Close.Attach(onClose)
+    //endregion
 
-    protocol.Conn.Write([]byte{1})
-    protocol.Conn.Write(accountability.OWN_ID.Identifier)
-
-    if signature, err := accountability.OWN_ID.Sign(accountability.OWN_ID.Identifier); err == nil {
-        protocol.Conn.Write(signature)
-    }
+    //region send initial handshake
+    protocol.sendVersion()
+    protocol.sendIdentification()
+    //endregion
 
+    // start reading from the connection
     protocol.Conn.Read(make([]byte, 1000))
 }
 
diff --git a/plugins/gossip/protocol_v1.go b/plugins/gossip/protocol_v1.go
index 09495280..14468a9f 100644
--- a/plugins/gossip/protocol_v1.go
+++ b/plugins/gossip/protocol_v1.go
@@ -74,16 +74,15 @@ func newacceptanceStateV1() *acceptanceStateV1 {
 func (state *acceptanceStateV1) Consume(protocol *protocol, data []byte, offset int, length int) (int, errors.IdentifiableError) {
     switch data[offset] {
         case 0:
-            protocol.Events.RejectConnection.Trigger()
+            protocol.Events.ReceiveConnectionRejected.Trigger()
 
-            RemoveNeighbor(protocol.Neighbor.Identity.StringIdentifier)
+            protocol.Conn.Close()
 
-            protocol.Neighbor.InitiatedConn.Close()
             protocol.CurrentState = nil
         break
 
         case 1:
-            protocol.Events.AcceptConnection.Trigger()
+            protocol.Events.ReceiveConnectionAccepted.Trigger()
 
             protocol.CurrentState = newDispatchStateV1()
         break
@@ -108,7 +107,7 @@ func newDispatchStateV1() *dispatchStateV1 {
 func (state *dispatchStateV1) Consume(protocol *protocol, data []byte, offset int, length int) (int, errors.IdentifiableError) {
     switch data[0] {
         case 0:
-            protocol.Events.RejectConnection.Trigger()
+            protocol.Events.ReceiveConnectionRejected.Trigger()
 
             protocol.Neighbor.InitiatedConn.Close()
             protocol.CurrentState = nil
@@ -153,9 +152,7 @@ func (state *transactionStateV1) Consume(protocol *protocol, data []byte, offset
 
         protocol.Events.ReceiveTransactionData.Trigger(transactionData)
 
-        go func() {
-            Events.ReceiveTransaction.Trigger(transaction.FromBytes(transactionData))
-        }()
+        go processTransactionData(transactionData)
 
         protocol.CurrentState = newDispatchStateV1()
         state.offset = 0
diff --git a/plugins/gossip/server.go b/plugins/gossip/server.go
index d323c9b6..0584b59e 100644
--- a/plugins/gossip/server.go
+++ b/plugins/gossip/server.go
@@ -16,6 +16,7 @@ func configureServer(plugin *node.Plugin) {
     TCPServer.Events.Connect.Attach(events.NewClosure(func(conn *network.ManagedConnection) {
         protocol := newProtocol(conn)
 
+        // store connection in neighbor if its a neighbor calling
         protocol.Events.ReceiveIdentification.Attach(events.NewClosure(func(identity *identity.Identity) {
             if protocol.Neighbor != nil {
                 protocol.Neighbor.acceptedConnMutex.Lock()
diff --git a/plugins/gossip/transaction_processor.go b/plugins/gossip/transaction_processor.go
new file mode 100644
index 00000000..49f4f7df
--- /dev/null
+++ b/plugins/gossip/transaction_processor.go
@@ -0,0 +1,18 @@
+package gossip
+
+import (
+    "github.com/iotaledger/goshimmer/packages/filter"
+    "github.com/iotaledger/goshimmer/packages/transaction"
+)
+
+var transactionFilter = filter.NewByteArrayFilter(TRANSACTION_FILTER_SIZE)
+
+func processTransactionData(transactionData []byte) {
+    if transactionFilter.Add(transactionData) {
+        Events.ReceiveTransaction.Trigger(transaction.FromBytes(transactionData))
+    }
+}
+
+const (
+    TRANSACTION_FILTER_SIZE = 5000
+)
diff --git a/plugins/gossip/transaction_processor_test.go b/plugins/gossip/transaction_processor_test.go
new file mode 100644
index 00000000..1e567f60
--- /dev/null
+++ b/plugins/gossip/transaction_processor_test.go
@@ -0,0 +1,48 @@
+package gossip
+
+import (
+    "github.com/iotaledger/goshimmer/packages/ternary"
+    "github.com/iotaledger/goshimmer/packages/transaction"
+    "sync"
+    "testing"
+)
+
+func BenchmarkProcessSimilarTransactionsFiltered(b *testing.B) {
+    byteArray := setupTransaction(transaction.MARSHALLED_TOTAL_SIZE / ternary.NUMBER_OF_TRITS_IN_A_BYTE)
+
+    b.ResetTimer()
+
+    for i := 0; i < b.N; i++ {
+        processTransactionData(byteArray)
+    }
+}
+
+func BenchmarkProcessSimilarTransactionsUnfiltered(b *testing.B) {
+    byteArray := setupTransaction(transaction.MARSHALLED_TOTAL_SIZE / ternary.NUMBER_OF_TRITS_IN_A_BYTE)
+
+    b.ResetTimer()
+
+    var wg sync.WaitGroup
+
+    for i := 0; i < b.N; i++ {
+        wg.Add(1)
+
+        go func() {
+            Events.ReceiveTransaction.Trigger(transaction.FromBytes(byteArray))
+
+            wg.Done()
+        }()
+    }
+
+    wg.Wait()
+}
+
+func setupTransaction(byteArraySize int) []byte {
+    byteArray := make([]byte, byteArraySize)
+
+    for i := 0; i < len(byteArray); i++ {
+        byteArray[i] = byte(i % 128)
+    }
+
+    return byteArray
+}
diff --git a/plugins/tangle/plugin.go b/plugins/tangle/plugin.go
new file mode 100644
index 00000000..d6a4d0e8
--- /dev/null
+++ b/plugins/tangle/plugin.go
@@ -0,0 +1,37 @@
+package tangle
+
+import (
+    "github.com/iotaledger/goshimmer/packages/database"
+    "github.com/iotaledger/goshimmer/packages/events"
+    "github.com/iotaledger/goshimmer/packages/node"
+    "github.com/iotaledger/goshimmer/packages/transaction"
+    "github.com/iotaledger/goshimmer/plugins/gossip"
+)
+
+var PLUGIN = node.NewPlugin("Tangle", configure, run)
+
+func configure(node *node.Plugin) {
+    gossip.Events.ReceiveTransaction.Attach(events.NewClosure(func(transaction *transaction.Transaction) {
+        if transactionStoredAlready, err := transactionDatabase.Contains(transaction.Hash.ToBytes()); err != nil {
+            panic(err)
+        } else {
+            if !transactionStoredAlready {
+                // process transaction
+            }
+        }
+    }))
+}
+
+func run(node *node.Plugin) {
+
+}
+
+var transactionDatabase database.Database
+
+func init() {
+    if db, err := database.Get("transactions"); err != nil {
+        panic(err)
+    } else {
+        transactionDatabase = db
+    }
+}
-- 
GitLab