Skip to content
Snippets Groups Projects
Unverified Commit 215fcaf6 authored by Hans Moog's avatar Hans Moog Committed by GitHub
Browse files

Merge pull request #30 from iotaledger/feat/zmq

Send stored transactions via ZMQ
parents aed32723 e8440576
No related branches found
No related tags found
No related merge requests found
......@@ -9,6 +9,7 @@ require (
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/ethereum/go-ethereum v1.8.27
github.com/gdamore/tcell v1.1.2
github.com/go-zeromq/zmq4 v0.4.0
github.com/golang/protobuf v1.3.1 // indirect
github.com/google/open-location-code/go v0.0.0-20190603181809-cf814bded323
github.com/labstack/echo v3.3.10+incompatible
......@@ -16,6 +17,7 @@ require (
github.com/magiconair/properties v1.8.1
github.com/pkg/errors v0.8.1
github.com/rivo/tview v0.0.0-20190609162513-b62197ade412
github.com/zeromq/goczmq v4.1.0+incompatible // indirect
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5
golang.org/x/net v0.0.0-20190611141213-3f473d35a33a
)
......@@ -18,6 +18,8 @@ github.com/gdamore/encoding v1.0.0 h1:+7OoQ1Bc6eTm5niUzBa0Ctsh6JbMW6Ra+YNuAtDBdk
github.com/gdamore/encoding v1.0.0/go.mod h1:alR0ol34c49FCSBLjhosxzcPHQbf2trDkoo5dl+VrEg=
github.com/gdamore/tcell v1.1.2 h1:Afe8cU6SECC06UmvaJ55Jr3Eh0tz/ywLjqWYqjGZp3s=
github.com/gdamore/tcell v1.1.2/go.mod h1:h3kq4HO9l2On+V9ed8w8ewqQEmGCSSHOgQ+2h8uzurE=
github.com/go-zeromq/zmq4 v0.4.0 h1:/TAahvAez7b6ycAyrxwN6Q36zy2OU8++uRLt6CwC74g=
github.com/go-zeromq/zmq4 v0.4.0/go.mod h1:Sm+6QJXXpXzdGVlOTEvV/6SDt+koNyLZwxe7sORh7FI=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/open-location-code/go v0.0.0-20190603181809-cf814bded323 h1:s+TV4ZPsmXNvFxr9gzRTOlqf7DT862ieiOGRWA9M3oM=
......@@ -36,6 +38,7 @@ github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/p7Y=
github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
......@@ -52,12 +55,17 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasttemplate v1.0.1 h1:tY9CJiPnMXf1ERmG2EyK7gNUd+c6RKGD0IfU8WdUSz8=
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
github.com/zeromq/goczmq v4.1.0+incompatible h1:cGVQaU6kIwwrGso0Pgbl84tzAz/h7FJ3wYQjSonjFFc=
github.com/zeromq/goczmq v4.1.0+incompatible/go.mod h1:1uZybAJoSRCvZMH2rZxEwWBSmC4T7CB/xQOfChwPEzg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5 h1:58fnuSXlxZmFdJyvtTFVmVhcMLU6v5fEb/ok4wyqtNU=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20180629035331-4cb1c02c05b0/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190611141213-3f473d35a33a h1:+KkCgOMgnKSgenxTBoiwkMqTiouMIy/3o8RLdmSbGoY=
golang.org/x/net v0.0.0-20190611141213-3f473d35a33a/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI=
......
......@@ -15,6 +15,7 @@ import (
"github.com/iotaledger/goshimmer/plugins/webapi"
webapi_gtta "github.com/iotaledger/goshimmer/plugins/webapi-gtta"
webapi_spammer "github.com/iotaledger/goshimmer/plugins/webapi-spammer"
"github.com/iotaledger/goshimmer/plugins/zeromq"
)
func main() {
......@@ -27,6 +28,7 @@ func main() {
analysis.PLUGIN,
gracefulshutdown.PLUGIN,
tipselection.PLUGIN,
zeromq.PLUGIN,
statusscreen.PLUGIN,
statusscreen_tps.PLUGIN,
......
package node
import (
"fmt"
"strings"
"sync"
......@@ -15,21 +14,9 @@ type Node struct {
logLevel int
}
var disabledPlugins = make(map[string]bool)
var DisabledPlugins = make(map[string]bool)
func Load(plugins ...*Plugin) *Node {
for _, disabledPlugin := range strings.Fields(*DISABLE_PLUGINS.Value) {
disabledPlugins[strings.ToLower(disabledPlugin)] = true
}
fmt.Println(" _____ _ _ ________ ______ ___ ___________ ")
fmt.Println(" / ___| | | |_ _| \\/ || \\/ || ___| ___ \\")
fmt.Println(" \\ `--.| |_| | | | | . . || . . || |__ | |_/ /")
fmt.Println(" `--. \\ _ | | | | |\\/| || |\\/| || __|| / ")
fmt.Println(" /\\__/ / | | |_| |_| | | || | | || |___| |\\ \\ ")
fmt.Println(" \\____/\\_| |_/\\___/\\_| |_/\\_| |_/\\____/\\_| \\_| fullnode 1.0")
fmt.Println()
node := &Node{
logLevel: *LOG_LEVEL.Value,
loggers: make([]*Logger, 0),
......@@ -105,11 +92,9 @@ func (node *Node) LogFailure(pluginName string, message string) {
}
func (node *Node) Load(plugins ...*Plugin) {
node.LogInfo("Node", "Loading plugins ...")
if len(plugins) >= 1 {
for _, plugin := range plugins {
if _, exists := disabledPlugins[strings.ToLower(strings.Replace(plugin.Name, " ", "", -1))]; !exists {
if _, exists := DisabledPlugins[strings.ToLower(strings.Replace(plugin.Name, " ", "", -1))]; !exists {
plugin.wg = node.wg
plugin.Node = node
......
......@@ -30,6 +30,10 @@ func (trytes Trytes) ToTrits() Trits {
return trits
}
func (trytes Trytes) ToString() string {
return string(trytes)
}
func (this Trits) ToBytes() []byte {
tritsLength := len(this)
bytesLength := (tritsLength + NUMBER_OF_TRITS_IN_A_BYTE - 1) / NUMBER_OF_TRITS_IN_A_BYTE
......
......@@ -2,6 +2,7 @@ package cli
import (
"flag"
"fmt"
"strings"
"github.com/iotaledger/goshimmer/packages/events"
......@@ -34,10 +35,26 @@ func init() {
parameter.Events.AddString.Attach(events.NewClosure(onAddStringParameter))
flag.Usage = printUsage
}
func configure(ctx *node.Plugin) {
flag.Parse()
}
func configure(ctx *node.Plugin) {}
for _, disabledPlugin := range strings.Fields(*node.DISABLE_PLUGINS.Value) {
node.DisabledPlugins[strings.ToLower(disabledPlugin)] = true
}
fmt.Println(" _____ _ _ ________ ______ ___ ___________ ")
fmt.Println(" / ___| | | |_ _| \\/ || \\/ || ___| ___ \\")
fmt.Println(" \\ `--.| |_| | | | | . . || . . || |__ | |_/ /")
fmt.Println(" `--. \\ _ | | | | |\\/| || |\\/| || __|| / ")
fmt.Println(" /\\__/ / | | |_| |_| | | || | | || |___| |\\ \\ ")
fmt.Println(" \\____/\\_| |_/\\___/\\_| |_/\\_| |_/\\____/\\_| \\_| fullnode 1.0")
fmt.Println()
ctx.Node.LogInfo("Node", "Loading plugins ...")
}
var PLUGIN = node.NewPlugin("CLI", configure)
var PLUGIN = node.NewPlugin("CLI", configure, func(plugin *node.Plugin) {
})
package zeromq
import "github.com/iotaledger/goshimmer/packages/parameter"
var (
PORT = parameter.AddInt("ZEROMQ/PORT", 5556, "tcp port used to connect to the zmq feed")
)
package zeromq
import (
"strconv"
"strings"
"time"
"github.com/iotaledger/goshimmer/packages/daemon"
"github.com/iotaledger/goshimmer/packages/events"
"github.com/iotaledger/goshimmer/packages/model/value_transaction"
"github.com/iotaledger/goshimmer/packages/node"
"github.com/iotaledger/goshimmer/plugins/tangle"
)
var PLUGIN = node.NewPlugin("ZeroMQ", configure, run)
var publisher *Publisher
var emptyTag = strings.Repeat("9", 27)
// Configure the zeromq plugin
func configure(plugin *node.Plugin) {
daemon.Events.Shutdown.Attach(events.NewClosure(func() {
plugin.LogInfo("Stopping ZeroMQ Publisher ...")
if err := publisher.Shutdown(); err != nil {
plugin.LogFailure("Stopping ZeroMQ Publisher: " + err.Error())
} else {
plugin.LogSuccess("Stopping ZeroMQ Publisher ... done")
}
}))
// TODO: should be tangle.Events.TransactionStored
tangle.Events.TransactionSolid.Attach(events.NewClosure(func(tx *value_transaction.ValueTransaction) {
// create goroutine for every event
go func() {
if err := publishTx(tx); err != nil {
plugin.LogFailure(err.Error())
}
}()
}))
}
// Start the zeromq plugin
func run(plugin *node.Plugin) {
plugin.LogInfo("Starting ZeroMQ Publisher (port " + strconv.Itoa(*PORT.Value) + ") ...")
daemon.BackgroundWorker(func() {
if err := startPublisher(plugin); err != nil {
plugin.LogFailure("Stopping ZeroMQ Publisher: " + err.Error())
} else {
plugin.LogSuccess("Starting ZeroMQ Publisher (port " + strconv.Itoa(*PORT.Value) + ") ... done")
}
})
}
// Start the zmq publisher.
func startPublisher(plugin *node.Plugin) error {
pub, err := NewPublisher()
if err != nil {
return err
}
publisher = pub
return publisher.Start(*PORT.Value)
}
// Publish a transaction that has recently been added to the ledger
func publishTx(tx *value_transaction.ValueTransaction) error {
hash := tx.MetaTransaction.GetHash()
address := tx.GetAddress()
value := tx.GetValue()
timestamp := int64(tx.GetTimestamp())
trunk := tx.MetaTransaction.GetTrunkTransactionHash()
branch := tx.MetaTransaction.GetBranchTransactionHash()
stored := time.Now().Unix()
messages := []string{
"tx", // ZMQ event
hash.ToString(), // Transaction hash
address.ToString(), // Address
strconv.FormatInt(value, 10), // Value
emptyTag, // Obsolete tag
strconv.FormatInt(timestamp, 10), // Timestamp
"0", // Index of the transaction in the bundle
"0", // Last transaction index of the bundle
hash.ToString(), // Bundle hash
trunk.ToString(), // Trunk transaction hash
branch.ToString(), // Branch transaction hash
strconv.FormatInt(stored, 10), // Unix timestamp for when the transaction was received
emptyTag, // Tag
}
return publisher.Send(messages)
}
package zeromq
import (
"context"
"strconv"
"strings"
zmq "github.com/go-zeromq/zmq4"
)
// Simple zmq publisher abstraction
type Publisher struct {
socket zmq.Socket
}
// Create a new publisher.
func NewPublisher() (*Publisher, error) {
socket := zmq.NewPub(context.Background())
return &Publisher{
socket: socket,
}, nil
}
// Start the publisher on the given port.
func (pub *Publisher) Start(port int) error {
return pub.socket.Listen("tcp://*:" + strconv.Itoa(port))
}
// Stop the publisher.
func (pub *Publisher) Shutdown() error {
return pub.socket.Close()
}
// Publish a new list of messages.
func (pub *Publisher) Send(messages []string) error {
if len(messages) == 0 || len(messages[0]) == 0 {
panic("zmq: invalid messages")
}
data := strings.Join(messages, " ")
msg := zmq.NewMsgString(data)
return pub.socket.Send(msg)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment