From 6b3723facddf5ee7e7a6fd44a2a9b8207f453153 Mon Sep 17 00:00:00 2001
From: Jonas Theis <mail@jonastheis.de>
Date: Sat, 28 Mar 2020 13:36:21 +0100
Subject: [PATCH] Message factory (#286)

* Added plugin structure and DB sequence number. WIP

* First version of transaction factory and builders for payloads #275

* Move marshalutil to hive.go

* Refactor existing code (before binary) to use new hive.go/identity #282

* Adjust binary code to use hive.go/identity and remove signature/ed25519 #282

* Adjust moved files from merge to use hive.go/marshalutil

* Adjust moved files from merge to use hive.go/identity and hive.go/crypto

* Adjust to merged changes

* Rename to MessageFactory

* Use local identity and tip selector

* Rename package

* Add tests

* Changed BuildMessage to receive an interface value instead of pointer

* Refactoring and comments

* Fix: fixed issues due to refactor

* Refactor: refactored the code

* Refactor: removed unnecessary messagefactory plugin

* Refactor: cleaned up code

* Feat: made spammer use messagefactory

* Refactor: refactored method name and docs

* Refactor: additional refactor

* Refactor: go mod tidy

Co-authored-by: Hans Moog <hm@mkjc.net>
---
 go.mod                                        |   1 -
 go.sum                                        |  12 --
 .../messagelayer/messagefactory/events.go     |  26 ++++
 .../messagefactory/messagefactory.go          |  71 +++++++++
 .../messagefactory/messagefactory_test.go     | 147 ++++++++++++++++++
 .../messagelayer/test/transaction_test.go     |   1 +
 packages/binary/spammer/spammer.go            |  72 +--------
 pluginmgr/core/plugins.go                     |   1 +
 plugins/messagelayer/plugin.go                |  29 +++-
 plugins/webapi/broadcastData/plugin.go        |  48 +-----
 plugins/webapi/spammer/plugin.go              |   3 +-
 plugins/webapi/spammer/webapi.go              |  10 --
 12 files changed, 282 insertions(+), 139 deletions(-)
 create mode 100644 packages/binary/messagelayer/messagefactory/events.go
 create mode 100644 packages/binary/messagelayer/messagefactory/messagefactory.go
 create mode 100644 packages/binary/messagelayer/messagefactory/messagefactory_test.go

diff --git a/go.mod b/go.mod
index ff040d9c..6e2a8f1d 100644
--- a/go.mod
+++ b/go.mod
@@ -16,7 +16,6 @@ require (
 	github.com/labstack/gommon v0.3.0 // indirect
 	github.com/magiconair/properties v1.8.1
 	github.com/mr-tron/base58 v1.1.3
-	github.com/oasislabs/ed25519 v0.0.0-20200302143042-29f6767a7c3e
 	github.com/panjf2000/ants/v2 v2.2.2
 	github.com/pkg/errors v0.9.1
 	github.com/spf13/pflag v1.0.5
diff --git a/go.sum b/go.sum
index 3e63ee75..8d885270 100644
--- a/go.sum
+++ b/go.sum
@@ -130,16 +130,6 @@ github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T
 github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
 github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
 github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
-github.com/iotaledger/hive.go v0.0.0-20200325224052-ac4d38108211 h1:ckZnjlKHCqgsG6jV5EEyI4uSXElOJxLN1fwQHc4r+eM=
-github.com/iotaledger/hive.go v0.0.0-20200325224052-ac4d38108211/go.mod h1:EfH+ZcYGFJzzoFpO7NHGi2k7+Xc84ASyp1EwjhI3eJc=
-github.com/iotaledger/hive.go v0.0.0-20200326125723-9ba81bd19b75 h1:oZdDKfciKDJm/txN1/Ax61CCuFyOc7Y7mpMCrrgCVwo=
-github.com/iotaledger/hive.go v0.0.0-20200326125723-9ba81bd19b75/go.mod h1:EfH+ZcYGFJzzoFpO7NHGi2k7+Xc84ASyp1EwjhI3eJc=
-github.com/iotaledger/hive.go v0.0.0-20200326133753-889dd6edb2ee h1:/PPAbuLfHtBQhV7n9sFh+wVKTvPH7pX6pCCFixq8LXw=
-github.com/iotaledger/hive.go v0.0.0-20200326133753-889dd6edb2ee/go.mod h1:EfH+ZcYGFJzzoFpO7NHGi2k7+Xc84ASyp1EwjhI3eJc=
-github.com/iotaledger/hive.go v0.0.0-20200326142220-1a50b54482ba h1:h6ilKICR660XPJDKazAcr9BDsKE4D0RveH59McjhIL0=
-github.com/iotaledger/hive.go v0.0.0-20200326142220-1a50b54482ba/go.mod h1:EfH+ZcYGFJzzoFpO7NHGi2k7+Xc84ASyp1EwjhI3eJc=
-github.com/iotaledger/hive.go v0.0.0-20200326161423-f9e74839ddef h1:TbOHnWsk2sKDc4z92mMXMZL2FCFpexOjLTxrnyi8Nc0=
-github.com/iotaledger/hive.go v0.0.0-20200326161423-f9e74839ddef/go.mod h1:4sloxRutRhCuXgAgtOu1ZxVM95Na+ovK9MRDEQGZlzw=
 github.com/iotaledger/hive.go v0.0.0-20200326163241-4aa5b4d3b5b0 h1:ADYgndjbLPrCxxoCfHItxb6RjB3dzFxHlQ+mrqk16g8=
 github.com/iotaledger/hive.go v0.0.0-20200326163241-4aa5b4d3b5b0/go.mod h1:4sloxRutRhCuXgAgtOu1ZxVM95Na+ovK9MRDEQGZlzw=
 github.com/iotaledger/iota.go v1.0.0-beta.9/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8=
@@ -194,8 +184,6 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
 github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32/go.mod h1:9wM+0iRr9ahx58uYLpLIr5fm8diHn0JbqRycJi6w0Ms=
 github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo=
 github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM=
-github.com/oasislabs/ed25519 v0.0.0-20200302143042-29f6767a7c3e h1:85L+lUTJHx4O7UP9y/65XV8iq7oaA2Uqe5WiUSB8XE4=
-github.com/oasislabs/ed25519 v0.0.0-20200302143042-29f6767a7c3e/go.mod h1:xIpCyrK2ouGA4QBGbiNbkoONrvJ00u9P3QOkXSOAC0c=
 github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
 github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
 github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w=
diff --git a/packages/binary/messagelayer/messagefactory/events.go b/packages/binary/messagelayer/messagefactory/events.go
new file mode 100644
index 00000000..8916433f
--- /dev/null
+++ b/packages/binary/messagelayer/messagefactory/events.go
@@ -0,0 +1,26 @@
+package messagefactory
+
+import (
+	"github.com/iotaledger/hive.go/events"
+
+	"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
+)
+
+type Events struct {
+	// A MessageConstructed event is triggered when a message is built including tips, sequence number and other metadata.
+	MessageConstructed *events.Event
+
+	// Error gets triggered when an error occurred.
+	Error *events.Event
+}
+
+func newEvents() *Events {
+	return &Events{
+		MessageConstructed: events.NewEvent(messageConstructedEvent),
+		Error:              events.NewEvent(events.ErrorCaller),
+	}
+}
+
+func messageConstructedEvent(handler interface{}, params ...interface{}) {
+	handler.(func(*message.Message))(params[0].(*message.Message))
+}
diff --git a/packages/binary/messagelayer/messagefactory/messagefactory.go b/packages/binary/messagelayer/messagefactory/messagefactory.go
new file mode 100644
index 00000000..25529c46
--- /dev/null
+++ b/packages/binary/messagelayer/messagefactory/messagefactory.go
@@ -0,0 +1,71 @@
+package messagefactory
+
+import (
+	"fmt"
+	"time"
+
+	"github.com/dgraph-io/badger/v2"
+	"github.com/pkg/errors"
+
+	"github.com/iotaledger/hive.go/identity"
+
+	"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
+	"github.com/iotaledger/goshimmer/packages/binary/messagelayer/payload"
+	"github.com/iotaledger/goshimmer/packages/binary/messagelayer/tipselector"
+)
+
+type MessageFactory struct {
+	Events        *Events
+	sequence      *badger.Sequence
+	localIdentity *identity.LocalIdentity
+	tipSelector   *tipselector.TipSelector
+}
+
+func New(db *badger.DB, localIdentity *identity.LocalIdentity, tipSelector *tipselector.TipSelector, sequenceKey []byte) *MessageFactory {
+	sequence, err := db.GetSequence(sequenceKey, 100)
+	if err != nil {
+		panic(fmt.Errorf("Could not create transaction sequence number. %v", err))
+	}
+
+	return &MessageFactory{
+		Events:        newEvents(),
+		sequence:      sequence,
+		localIdentity: localIdentity,
+		tipSelector:   tipSelector,
+	}
+}
+
+// IssuePayload creates a new message including sequence number and tip selection and returns it.
+// It also triggers the MessageConstructed event once it's done, which is for example used by the plugins to listen for
+// messages that shall be attached to the tangle.
+func (m *MessageFactory) IssuePayload(payload payload.Payload) *message.Message {
+	sequenceNumber, err := m.sequence.Next()
+	if err != nil {
+		m.Events.Error.Trigger(errors.Wrap(err, "Could not create sequence number"))
+
+		return nil
+	}
+
+	trunkTransaction, branchTransaction := m.tipSelector.GetTips()
+
+	tx := message.New(
+		trunkTransaction,
+		branchTransaction,
+		m.localIdentity.PublicKey(),
+		time.Now(),
+		sequenceNumber,
+		payload,
+		m.localIdentity,
+	)
+
+	m.Events.MessageConstructed.Trigger(tx)
+
+	return tx
+}
+
+// Shutdown closes the  messageFactory and persists the sequence number
+func (m *MessageFactory) Shutdown() {
+	if err := m.sequence.Release(); err != nil {
+		m.Events.Error.Trigger(errors.Wrap(err, "Could not release transaction sequence number."))
+	}
+}
diff --git a/packages/binary/messagelayer/messagefactory/messagefactory_test.go b/packages/binary/messagelayer/messagefactory/messagefactory_test.go
new file mode 100644
index 00000000..8e2d3cbe
--- /dev/null
+++ b/packages/binary/messagelayer/messagefactory/messagefactory_test.go
@@ -0,0 +1,147 @@
+package messagefactory
+
+import (
+	"encoding"
+	"io/ioutil"
+	"os"
+	"sync"
+	"sync/atomic"
+	"testing"
+	"time"
+
+	"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
+	"github.com/iotaledger/goshimmer/packages/binary/messagelayer/payload"
+	"github.com/iotaledger/goshimmer/packages/binary/messagelayer/tipselector"
+	"github.com/iotaledger/goshimmer/packages/database"
+	"github.com/iotaledger/goshimmer/plugins/config"
+
+	"github.com/iotaledger/hive.go/events"
+	"github.com/iotaledger/hive.go/identity"
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+)
+
+const (
+	sequenceKey   = "seq"
+	totalMessages = 2000
+)
+
+func TestMessageFactory_BuildMessage(t *testing.T) {
+	// Set up DB for testing
+	dir, err := ioutil.TempDir("", t.Name())
+	require.NoError(t, err)
+	defer os.Remove(dir)
+	// use the tempdir for the database
+	config.Node.Set(database.CFG_DIRECTORY, dir)
+	db := database.GetBadgerInstance()
+
+	localIdentity := identity.GenerateLocalIdentity()
+	tipSelector := tipselector.New()
+
+	// keep track of sequence numbers
+	sequenceNumbers := sync.Map{}
+
+	msgFactory := New(db, localIdentity, tipSelector, []byte(sequenceKey))
+
+	// attach to event and count
+	countEvents := uint64(0)
+	msgFactory.Events.MessageConstructed.Attach(events.NewClosure(func(msg *message.Message) {
+		atomic.AddUint64(&countEvents, 1)
+	}))
+
+	t.Run("CheckProperties", func(t *testing.T) {
+		data := []byte("TestCheckProperties")
+		var p payload.Payload = NewMockPayload(data)
+		msg := msgFactory.IssuePayload(p)
+
+		assert.NotNil(t, msg.GetTrunkTransactionId())
+		assert.NotNil(t, msg.GetBranchTransactionId())
+
+		// time in range of 0.1 seconds
+		assert.InDelta(t, time.Now().UnixNano(), msg.IssuingTime().UnixNano(), 100000000)
+
+		// check payload
+		assert.Same(t, p, msg.GetPayload())
+		assert.Equal(t, data, msg.GetPayload().Bytes())
+
+		// check total events and sequence number
+		assert.EqualValues(t, 1, countEvents)
+		assert.EqualValues(t, 0, msg.SequenceNumber())
+
+		sequenceNumbers.Store(msg.SequenceNumber(), true)
+	})
+
+	// create messages in parallel
+	t.Run("ParallelCreation", func(t *testing.T) {
+		for i := 1; i < totalMessages; i++ {
+			t.Run("test", func(t *testing.T) {
+				t.Parallel()
+				data := []byte("TestCheckProperties")
+				var p payload.Payload = NewMockPayload(data)
+				msg := msgFactory.IssuePayload(p)
+
+				assert.NotNil(t, msg.GetTrunkTransactionId())
+				assert.NotNil(t, msg.GetBranchTransactionId())
+
+				// time in range of 0.1 seconds
+				assert.InDelta(t, time.Now().UnixNano(), msg.IssuingTime().UnixNano(), 100000000)
+
+				// check payload
+				assert.Same(t, p, msg.GetPayload())
+				assert.Equal(t, data, msg.GetPayload().Bytes())
+
+				sequenceNumbers.Store(msg.SequenceNumber(), true)
+			})
+		}
+	})
+
+	// check total events and sequence number
+	assert.EqualValues(t, totalMessages, countEvents)
+
+	max := uint64(0)
+	countSequence := 0
+	sequenceNumbers.Range(func(key, value interface{}) bool {
+		seq := key.(uint64)
+		val := value.(bool)
+		if val != true {
+			return false
+		}
+
+		// check for max sequence number
+		if seq > max {
+			max = seq
+		}
+		countSequence++
+		return true
+	})
+	assert.EqualValues(t, totalMessages-1, max)
+	assert.EqualValues(t, totalMessages, countSequence)
+
+	_ = db.Close()
+}
+
+type MockPayload struct {
+	data []byte
+	encoding.BinaryMarshaler
+	encoding.BinaryUnmarshaler
+}
+
+func NewMockPayload(data []byte) *MockPayload {
+	return &MockPayload{data: data}
+}
+
+func (m *MockPayload) Bytes() []byte {
+	return m.data
+}
+
+func (m *MockPayload) Type() payload.Type {
+	return payload.Type(0)
+}
+
+func (m *MockPayload) String() string {
+	return string(m.data)
+}
+
+func (m *MockPayload) Unmarshal(bytes []byte) error {
+	panic("implement me")
+}
diff --git a/packages/binary/messagelayer/test/transaction_test.go b/packages/binary/messagelayer/test/transaction_test.go
index e5b913cd..91532458 100644
--- a/packages/binary/messagelayer/test/transaction_test.go
+++ b/packages/binary/messagelayer/test/transaction_test.go
@@ -8,6 +8,7 @@ import (
 
 	"github.com/iotaledger/hive.go/async"
 	"github.com/iotaledger/hive.go/identity"
+
 	"github.com/panjf2000/ants/v2"
 
 	"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
diff --git a/packages/binary/spammer/spammer.go b/packages/binary/spammer/spammer.go
index 525ae075..34d1b98b 100644
--- a/packages/binary/spammer/spammer.go
+++ b/packages/binary/spammer/spammer.go
@@ -4,28 +4,24 @@ import (
 	"sync/atomic"
 	"time"
 
-	"github.com/iotaledger/hive.go/identity"
 	"github.com/iotaledger/hive.go/types"
 
-	"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
+	"github.com/iotaledger/goshimmer/packages/binary/messagelayer/messagefactory"
 	"github.com/iotaledger/goshimmer/packages/binary/messagelayer/payload"
-	"github.com/iotaledger/goshimmer/packages/binary/messagelayer/tipselector"
-	"github.com/iotaledger/goshimmer/packages/binary/messagelayer/transactionparser"
 )
 
 type Spammer struct {
-	transactionParser *transactionparser.TransactionParser
-	tipSelector       *tipselector.TipSelector
+	messageFactory *messagefactory.MessageFactory
 
 	processId      int64
 	shutdownSignal chan types.Empty
 }
 
-func New(transactionParser *transactionparser.TransactionParser, tipSelector *tipselector.TipSelector) *Spammer {
+func New(messageFactory *messagefactory.MessageFactory) *Spammer {
 	return &Spammer{
-		shutdownSignal:    make(chan types.Empty),
-		transactionParser: transactionParser,
-		tipSelector:       tipSelector,
+		messageFactory: messageFactory,
+
+		shutdownSignal: make(chan types.Empty),
 	}
 }
 
@@ -33,17 +29,11 @@ func (spammer *Spammer) Start(tps int) {
 	go spammer.run(tps, atomic.AddInt64(&spammer.processId, 1))
 }
 
-func (spammer *Spammer) Burst(transactions int) {
-	go spammer.sendBurst(transactions, atomic.AddInt64(&spammer.processId, 1))
-}
-
 func (spammer *Spammer) Shutdown() {
 	atomic.AddInt64(&spammer.processId, 1)
 }
 
 func (spammer *Spammer) run(tps int, processId int64) {
-	// TODO: this should be the local peer's identity
-	spammingIdentity := identity.GenerateLocalIdentity()
 	currentSentCounter := 0
 	start := time.Now()
 
@@ -52,19 +42,7 @@ func (spammer *Spammer) run(tps int, processId int64) {
 			return
 		}
 
-		// TODO: use transaction factory
-		trunkTransactionId, branchTransactionId := spammer.tipSelector.GetTips()
-
-		msg := message.New(
-			trunkTransactionId,
-			branchTransactionId,
-			spammingIdentity.PublicKey(),
-			time.Now(),
-			0,
-			payload.NewData([]byte("SPAM")),
-			spammingIdentity,
-		)
-		spammer.transactionParser.Parse(msg.Bytes(), nil)
+		spammer.messageFactory.IssuePayload(payload.NewData([]byte("SPAM")))
 
 		currentSentCounter++
 
@@ -80,39 +58,3 @@ func (spammer *Spammer) run(tps int, processId int64) {
 		}
 	}
 }
-
-func (spammer *Spammer) sendBurst(transactions int, processId int64) {
-	// TODO: this should be the local peer's identity
-	spammingIdentity := identity.GenerateLocalIdentity()
-
-	previousTransactionId := message.EmptyId
-
-	burstBuffer := make([][]byte, transactions)
-	for i := 0; i < transactions; i++ {
-		if atomic.LoadInt64(&spammer.processId) != processId {
-			return
-		}
-
-		// TODO: use transaction factory
-		spamTransaction := message.New(
-			previousTransactionId,
-			previousTransactionId,
-			spammingIdentity.PublicKey(),
-			time.Now(),
-			0,
-			payload.NewData([]byte("SPAM")),
-			spammingIdentity,
-		)
-
-		previousTransactionId = spamTransaction.GetId()
-		burstBuffer[i] = spamTransaction.Bytes()
-	}
-
-	for i := 0; i < transactions; i++ {
-		if atomic.LoadInt64(&spammer.processId) != processId {
-			return
-		}
-
-		spammer.transactionParser.Parse(burstBuffer[i], nil)
-	}
-}
diff --git a/pluginmgr/core/plugins.go b/pluginmgr/core/plugins.go
index 8453fb5e..a210f3d1 100644
--- a/pluginmgr/core/plugins.go
+++ b/pluginmgr/core/plugins.go
@@ -12,6 +12,7 @@ import (
 	"github.com/iotaledger/goshimmer/plugins/messagelayer"
 	"github.com/iotaledger/goshimmer/plugins/metrics"
 	"github.com/iotaledger/goshimmer/plugins/portcheck"
+
 	"github.com/iotaledger/hive.go/node"
 )
 
diff --git a/plugins/messagelayer/plugin.go b/plugins/messagelayer/plugin.go
index 51efe391..d9faebe1 100644
--- a/plugins/messagelayer/plugin.go
+++ b/plugins/messagelayer/plugin.go
@@ -3,7 +3,13 @@ package messagelayer
 import (
 	"github.com/iotaledger/hive.go/autopeering/peer"
 
+	"github.com/iotaledger/hive.go/daemon"
+	"github.com/iotaledger/hive.go/events"
+	"github.com/iotaledger/hive.go/logger"
+	"github.com/iotaledger/hive.go/node"
+
 	"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
+	"github.com/iotaledger/goshimmer/packages/binary/messagelayer/messagefactory"
 	"github.com/iotaledger/goshimmer/packages/binary/messagelayer/tangle"
 	"github.com/iotaledger/goshimmer/packages/binary/messagelayer/tipselector"
 	"github.com/iotaledger/goshimmer/packages/binary/messagelayer/transactionparser"
@@ -11,14 +17,15 @@ import (
 	"github.com/iotaledger/goshimmer/packages/binary/storageprefix"
 	"github.com/iotaledger/goshimmer/packages/database"
 	"github.com/iotaledger/goshimmer/packages/shutdown"
+	"github.com/iotaledger/goshimmer/plugins/autopeering/local"
+)
 
-	"github.com/iotaledger/hive.go/daemon"
-	"github.com/iotaledger/hive.go/events"
-	"github.com/iotaledger/hive.go/logger"
-	"github.com/iotaledger/hive.go/node"
+const (
+	PLUGIN_NAME        = "MessageLayer"
+	DB_SEQUENCE_NUMBER = "seq"
 )
 
-var PLUGIN = node.NewPlugin("Tangle", node.Enabled, configure, run)
+var PLUGIN = node.NewPlugin(PLUGIN_NAME, node.Enabled, configure, run)
 
 var TransactionParser *transactionparser.TransactionParser
 
@@ -28,10 +35,12 @@ var TipSelector *tipselector.TipSelector
 
 var Tangle *tangle.Tangle
 
+var MessageFactory *messagefactory.MessageFactory
+
 var log *logger.Logger
 
 func configure(*node.Plugin) {
-	log = logger.NewLogger("Tangle")
+	log = logger.NewLogger(PLUGIN_NAME)
 
 	// create instances
 	TransactionParser = transactionparser.New()
@@ -39,6 +48,13 @@ func configure(*node.Plugin) {
 	TipSelector = tipselector.New()
 	Tangle = tangle.New(database.GetBadgerInstance(), storageprefix.MainNet)
 
+	// Setup MessageFactory (behavior + logging))
+	MessageFactory = messagefactory.New(database.GetBadgerInstance(), local.GetInstance().LocalIdentity(), TipSelector, []byte(DB_SEQUENCE_NUMBER))
+	MessageFactory.Events.MessageConstructed.Attach(events.NewClosure(Tangle.AttachMessage))
+	MessageFactory.Events.Error.Attach(events.NewClosure(func(err error) {
+		log.Errorf("Error in MessageFactory: %v", err)
+	}))
+
 	// setup TransactionParser
 	TransactionParser.Events.TransactionParsed.Attach(events.NewClosure(func(transaction *message.Message, peer *peer.Peer) {
 		// TODO: ADD PEER
@@ -68,6 +84,7 @@ func run(*node.Plugin) {
 	_ = daemon.BackgroundWorker("Tangle", func(shutdownSignal <-chan struct{}) {
 		<-shutdownSignal
 
+		MessageFactory.Shutdown()
 		TransactionParser.Shutdown()
 		Tangle.Shutdown()
 	}, shutdown.ShutdownPriorityTangle)
diff --git a/plugins/webapi/broadcastData/plugin.go b/plugins/webapi/broadcastData/plugin.go
index 1139efbd..c0cb7a50 100644
--- a/plugins/webapi/broadcastData/plugin.go
+++ b/plugins/webapi/broadcastData/plugin.go
@@ -3,11 +3,11 @@ package broadcastData
 import (
 	"net/http"
 
-	"github.com/iotaledger/goshimmer/packages/model/value_transaction"
+	"github.com/iotaledger/goshimmer/packages/binary/messagelayer/payload"
+	"github.com/iotaledger/goshimmer/plugins/messagelayer"
 	"github.com/iotaledger/goshimmer/plugins/webapi"
 	"github.com/iotaledger/hive.go/logger"
 	"github.com/iotaledger/hive.go/node"
-	"github.com/iotaledger/hive.go/typeutils"
 	"github.com/labstack/echo"
 )
 
@@ -28,49 +28,9 @@ func broadcastData(c echo.Context) error {
 		return c.JSON(http.StatusBadRequest, Response{Error: err.Error()})
 	}
 
-	log.Debug("Received - address:", request.Address, " data:", request.Data)
+	tx := messagelayer.MessageFactory.IssuePayload(payload.NewData([]byte(request.Data)))
 
-	tx := value_transaction.New()
-	tx.SetHead(true)
-	tx.SetTail(true)
-
-	buffer := make([]byte, 2187)
-	if len(request.Data) > 2187 {
-		log.Warnf("data exceeds 2187 byte limit - (payload data size: %d)", len(request.Data))
-		return c.JSON(http.StatusBadRequest, Response{Error: "data exceeds 2187 byte limit"})
-	}
-
-	copy(buffer, typeutils.StringToBytes(request.Data))
-
-	// TODO: FIX FOR NEW TX LAYOUT
-
-	/*
-		trytes, err := trinary.BytesToTrytes(buffer)
-		if err != nil {
-			log.Warnf("trytes conversion failed: %s", err.Error())
-			return c.JSON(http.StatusBadRequest, Response{Error: err.Error()})
-		}
-
-		err = address.ValidAddress(request.Address)
-		if err != nil {
-			log.Warnf("invalid Address: %s", request.Address)
-			return c.JSON(http.StatusBadRequest, Response{Error: err.Error()})
-		}
-
-		tx.SetAddress(request.Address)
-		tx.SetSignatureMessageFragment(trytes)
-		tx.SetValue(0)
-		tx.SetBranchTransactionHash(tipselectionn.GetRandomTip())
-		tx.SetTrunkTransactionHash(tipselectionn.GetRandomTip(tx.GetBranchTransactionHash()))
-		tx.SetTimestamp(uint(time.Now().Unix()))
-		if err := tx.DoProofOfWork(meta_transaction.MIN_WEIGHT_MAGNITUDE); err != nil {
-			log.Warnf("PoW failed: %s", err)
-			return c.JSON(http.StatusInternalServerError, Response{Error: err.Error()})
-		}
-
-		gossip.Events.TransactionReceived.Trigger(&gossip.TransactionReceivedEvent{Data: tx.GetBytes(), Peer: &local.GetInstance().Peer})
-	*/
-	return c.JSON(http.StatusOK, Response{Hash: tx.GetHash()})
+	return c.JSON(http.StatusOK, Response{Hash: tx.GetId().String()})
 }
 
 type Response struct {
diff --git a/plugins/webapi/spammer/plugin.go b/plugins/webapi/spammer/plugin.go
index 12de5fd2..b83f0b6e 100644
--- a/plugins/webapi/spammer/plugin.go
+++ b/plugins/webapi/spammer/plugin.go
@@ -16,7 +16,8 @@ var transactionSpammer *spammer.Spammer
 var PLUGIN = node.NewPlugin("Spammer", node.Disabled, configure, run)
 
 func configure(plugin *node.Plugin) {
-	transactionSpammer = spammer.New(messagelayer.TransactionParser, messagelayer.TipSelector)
+	transactionSpammer = spammer.New(messagelayer.MessageFactory)
+
 	webapi.Server.GET("spammer", handleRequest)
 }
 
diff --git a/plugins/webapi/spammer/webapi.go b/plugins/webapi/spammer/webapi.go
index f020f847..f85f6e62 100644
--- a/plugins/webapi/spammer/webapi.go
+++ b/plugins/webapi/spammer/webapi.go
@@ -2,7 +2,6 @@ package spammer
 
 import (
 	"net/http"
-	"strconv"
 
 	"github.com/labstack/echo"
 )
@@ -22,15 +21,6 @@ func handleRequest(c echo.Context) error {
 		transactionSpammer.Shutdown()
 		transactionSpammer.Start(request.Tps)
 		return c.JSON(http.StatusOK, Response{Message: "started spamming transactions"})
-	case "burst":
-		if request.Tps == 0 {
-			return c.JSON(http.StatusBadRequest, Response{Error: "burst requires the tps to be set"})
-		}
-
-		transactionSpammer.Shutdown()
-		transactionSpammer.Burst(request.Tps)
-
-		return c.JSON(http.StatusOK, Response{Message: "sent a burst of " + strconv.Itoa(request.Tps) + " transactions"})
 	case "stop":
 		transactionSpammer.Shutdown()
 		return c.JSON(http.StatusOK, Response{Message: "stopped spamming transactions"})
-- 
GitLab