Skip to content
Snippets Groups Projects
Commit 40ef9dda authored by Wolfgang Welz's avatar Wolfgang Welz
Browse files

Add initial ZeroMQ support for incomming transactions

parent fdb3349b
No related branches found
No related tags found
No related merge requests found
......@@ -9,6 +9,7 @@ require (
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/ethereum/go-ethereum v1.8.27
github.com/gdamore/tcell v1.1.2
github.com/go-zeromq/zmq4 v0.4.0
github.com/golang/protobuf v1.3.1 // indirect
github.com/google/open-location-code/go v0.0.0-20190603181809-cf814bded323
github.com/labstack/echo v3.3.10+incompatible
......@@ -16,6 +17,7 @@ require (
github.com/magiconair/properties v1.8.1
github.com/pkg/errors v0.8.1
github.com/rivo/tview v0.0.0-20190609162513-b62197ade412
github.com/zeromq/goczmq v4.1.0+incompatible // indirect
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5
golang.org/x/net v0.0.0-20190611141213-3f473d35a33a
)
......@@ -18,6 +18,8 @@ github.com/gdamore/encoding v1.0.0 h1:+7OoQ1Bc6eTm5niUzBa0Ctsh6JbMW6Ra+YNuAtDBdk
github.com/gdamore/encoding v1.0.0/go.mod h1:alR0ol34c49FCSBLjhosxzcPHQbf2trDkoo5dl+VrEg=
github.com/gdamore/tcell v1.1.2 h1:Afe8cU6SECC06UmvaJ55Jr3Eh0tz/ywLjqWYqjGZp3s=
github.com/gdamore/tcell v1.1.2/go.mod h1:h3kq4HO9l2On+V9ed8w8ewqQEmGCSSHOgQ+2h8uzurE=
github.com/go-zeromq/zmq4 v0.4.0 h1:/TAahvAez7b6ycAyrxwN6Q36zy2OU8++uRLt6CwC74g=
github.com/go-zeromq/zmq4 v0.4.0/go.mod h1:Sm+6QJXXpXzdGVlOTEvV/6SDt+koNyLZwxe7sORh7FI=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/open-location-code/go v0.0.0-20190603181809-cf814bded323 h1:s+TV4ZPsmXNvFxr9gzRTOlqf7DT862ieiOGRWA9M3oM=
......@@ -36,6 +38,7 @@ github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/p7Y=
github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
......@@ -52,12 +55,17 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasttemplate v1.0.1 h1:tY9CJiPnMXf1ERmG2EyK7gNUd+c6RKGD0IfU8WdUSz8=
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
github.com/zeromq/goczmq v4.1.0+incompatible h1:cGVQaU6kIwwrGso0Pgbl84tzAz/h7FJ3wYQjSonjFFc=
github.com/zeromq/goczmq v4.1.0+incompatible/go.mod h1:1uZybAJoSRCvZMH2rZxEwWBSmC4T7CB/xQOfChwPEzg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5 h1:58fnuSXlxZmFdJyvtTFVmVhcMLU6v5fEb/ok4wyqtNU=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20180629035331-4cb1c02c05b0/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190611141213-3f473d35a33a h1:+KkCgOMgnKSgenxTBoiwkMqTiouMIy/3o8RLdmSbGoY=
golang.org/x/net v0.0.0-20190611141213-3f473d35a33a/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI=
......
......@@ -15,6 +15,7 @@ import (
"github.com/iotaledger/goshimmer/plugins/webapi"
webapi_gtta "github.com/iotaledger/goshimmer/plugins/webapi-gtta"
webapi_spammer "github.com/iotaledger/goshimmer/plugins/webapi-spammer"
"github.com/iotaledger/goshimmer/plugins/zeromq"
)
func main() {
......@@ -27,6 +28,7 @@ func main() {
analysis.PLUGIN,
gracefulshutdown.PLUGIN,
tipselection.PLUGIN,
zeromq.PLUGIN,
statusscreen.PLUGIN,
statusscreen_tps.PLUGIN,
......
package zeromq
import "github.com/iotaledger/goshimmer/packages/parameter"
var (
PORT = parameter.AddInt("ZEROMQ/PORT", 5556, "tcp port used to connect to the zmq feed")
)
package zeromq
import (
"strconv"
"strings"
"time"
"github.com/iotaledger/goshimmer/packages/daemon"
"github.com/iotaledger/goshimmer/packages/events"
"github.com/iotaledger/goshimmer/packages/model/value_transaction"
"github.com/iotaledger/goshimmer/packages/node"
"github.com/iotaledger/goshimmer/plugins/tangle"
)
var PLUGIN = node.NewPlugin("ZeroMQ", configure, run)
var publisher *Publisher
// Configure the zeromq plugin
func configure(plugin *node.Plugin) {
daemon.Events.Shutdown.Attach(events.NewClosure(func() {
plugin.LogInfo("Stopping ZeroMQ Publisher ...")
if err := publisher.Shutdown(); err != nil {
plugin.LogFailure("Stopping ZeroMQ Publisher: " + err.Error())
} else {
plugin.LogSuccess("Stopping ZeroMQ Publisher ... done")
}
}))
// TODO: should be tangle.Events.TransactionStored
tangle.Events.TransactionSolid.Attach(events.NewClosure(func(tx *value_transaction.ValueTransaction) {
if err := publishTx(tx); err != nil {
plugin.LogFailure(err.Error())
}
}))
}
// Start the zeromq plugin
func run(plugin *node.Plugin) {
plugin.LogInfo("Starting ZeroMQ Publisher (port " + strconv.Itoa(*PORT.Value) + ") ...")
daemon.BackgroundWorker(func() {
if err := startPublisher(plugin); err != nil {
plugin.LogFailure("Stopping ZeroMQ Publisher: " + err.Error())
} else {
plugin.LogSuccess("Starting ZeroMQ Publisher (port " + strconv.Itoa(*PORT.Value) + ") ... done")
}
})
}
// Start the zmq publisher.
func startPublisher(plugin *node.Plugin) error {
pub, err := NewPublisher()
if err != nil {
return err
}
publisher = pub
return publisher.Start(*PORT.Value)
}
// Publish a transaction that has recently been added to the ledger
func publishTx(tx *value_transaction.ValueTransaction) error {
hash := tx.MetaTransaction.GetHash()
address := tx.GetAddress()
value := tx.GetValue()
timestamp := int64(tx.GetTimestamp())
trunk := tx.MetaTransaction.GetTrunkTransactionHash()
branch := tx.MetaTransaction.GetBranchTransactionHash()
stored := time.Now().Unix()
tag := strings.Repeat("9", 27) // use empty tag
messages := []string{
"tx", // ZMQ event
hash.ToString(), // Transaction hash
address.ToString(), // Address
strconv.FormatInt(value, 10), // Value
tag, // Obsolete tag
strconv.FormatInt(timestamp, 10), // Timestamp
"0", // Index of the transaction in the bundle
"0", // Last transaction index of the bundle
hash.ToString(), // Bundle hash
trunk.ToString(), // Trunk transaction hash
branch.ToString(), // Branch transaction hash
strconv.FormatInt(stored, 10), // Unix timestamp for when the transaction was received
tag, // Tag
}
return publisher.Send(messages)
}
package zeromq
import (
"context"
"strconv"
"strings"
zmq "github.com/go-zeromq/zmq4"
)
// Simple zmq publisher abstraction
type Publisher struct {
socket zmq.Socket
}
// Create a new publisher.
func NewPublisher() (*Publisher, error) {
socket := zmq.NewPub(context.Background())
return &Publisher{
socket: socket,
}, nil
}
// Start the publisher on the given port.
func (pub *Publisher) Start(port int) error {
return pub.socket.Listen("tcp://*:" + strconv.Itoa(port))
}
// Stop the publisher.
func (pub *Publisher) Shutdown() error {
return pub.socket.Close()
}
// Publish a new list of messages.
func (pub *Publisher) Send(messages []string) error {
if len(messages) == 0 || len(messages[0]) == 0 {
panic("zmq: invalid messages")
}
data := strings.Join(messages, " ")
msg := zmq.NewMsgString(data)
return pub.socket.Send(msg)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment