diff --git a/go.mod b/go.mod index a413ef26e1f17529de05b94d408f8f8903121cfb..5b1e2b90feef5adf92afd95e2d5ae0048f180fa5 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/dustin/go-humanize v1.0.0 // indirect github.com/ethereum/go-ethereum v1.8.27 github.com/gdamore/tcell v1.1.2 + github.com/go-zeromq/zmq4 v0.4.0 github.com/golang/protobuf v1.3.1 // indirect github.com/google/open-location-code/go v0.0.0-20190603181809-cf814bded323 github.com/labstack/echo v3.3.10+incompatible @@ -16,6 +17,7 @@ require ( github.com/magiconair/properties v1.8.1 github.com/pkg/errors v0.8.1 github.com/rivo/tview v0.0.0-20190609162513-b62197ade412 + github.com/zeromq/goczmq v4.1.0+incompatible // indirect golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5 golang.org/x/net v0.0.0-20190611141213-3f473d35a33a ) diff --git a/go.sum b/go.sum index 6f70e39527103c6f15bb44bf3d005faf4aeac3cd..58add10c5d5ca57dc9bf817eb1409462e7f98f78 100644 --- a/go.sum +++ b/go.sum @@ -18,6 +18,8 @@ github.com/gdamore/encoding v1.0.0 h1:+7OoQ1Bc6eTm5niUzBa0Ctsh6JbMW6Ra+YNuAtDBdk github.com/gdamore/encoding v1.0.0/go.mod h1:alR0ol34c49FCSBLjhosxzcPHQbf2trDkoo5dl+VrEg= github.com/gdamore/tcell v1.1.2 h1:Afe8cU6SECC06UmvaJ55Jr3Eh0tz/ywLjqWYqjGZp3s= github.com/gdamore/tcell v1.1.2/go.mod h1:h3kq4HO9l2On+V9ed8w8ewqQEmGCSSHOgQ+2h8uzurE= +github.com/go-zeromq/zmq4 v0.4.0 h1:/TAahvAez7b6ycAyrxwN6Q36zy2OU8++uRLt6CwC74g= +github.com/go-zeromq/zmq4 v0.4.0/go.mod h1:Sm+6QJXXpXzdGVlOTEvV/6SDt+koNyLZwxe7sORh7FI= github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/google/open-location-code/go v0.0.0-20190603181809-cf814bded323 h1:s+TV4ZPsmXNvFxr9gzRTOlqf7DT862ieiOGRWA9M3oM= @@ -36,6 +38,7 @@ github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/p7Y= github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -52,12 +55,17 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v1.0.1 h1:tY9CJiPnMXf1ERmG2EyK7gNUd+c6RKGD0IfU8WdUSz8= github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= +github.com/zeromq/goczmq v4.1.0+incompatible h1:cGVQaU6kIwwrGso0Pgbl84tzAz/h7FJ3wYQjSonjFFc= +github.com/zeromq/goczmq v4.1.0+incompatible/go.mod h1:1uZybAJoSRCvZMH2rZxEwWBSmC4T7CB/xQOfChwPEzg= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5 h1:58fnuSXlxZmFdJyvtTFVmVhcMLU6v5fEb/ok4wyqtNU= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/net v0.0.0-20180629035331-4cb1c02c05b0/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190611141213-3f473d35a33a h1:+KkCgOMgnKSgenxTBoiwkMqTiouMIy/3o8RLdmSbGoY= golang.org/x/net v0.0.0-20190611141213-3f473d35a33a/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI= diff --git a/main.go b/main.go index 2eb9d49f9bc77f40923a2f0f0846d38ffab50591..91ec88b8e32820b238b744768ba952c3ab681374 100644 --- a/main.go +++ b/main.go @@ -15,6 +15,7 @@ import ( "github.com/iotaledger/goshimmer/plugins/webapi" webapi_gtta "github.com/iotaledger/goshimmer/plugins/webapi-gtta" webapi_spammer "github.com/iotaledger/goshimmer/plugins/webapi-spammer" + "github.com/iotaledger/goshimmer/plugins/zeromq" ) func main() { @@ -27,6 +28,7 @@ func main() { analysis.PLUGIN, gracefulshutdown.PLUGIN, tipselection.PLUGIN, + zeromq.PLUGIN, statusscreen.PLUGIN, statusscreen_tps.PLUGIN, diff --git a/packages/node/node.go b/packages/node/node.go index 5811b8af675c87d4425dd38f72eabcddfb4e43ee..c0b99d373206f8bd6fd59a145c0a37c3d36d4ec9 100644 --- a/packages/node/node.go +++ b/packages/node/node.go @@ -1,7 +1,6 @@ package node import ( - "fmt" "strings" "sync" @@ -15,21 +14,9 @@ type Node struct { logLevel int } -var disabledPlugins = make(map[string]bool) +var DisabledPlugins = make(map[string]bool) func Load(plugins ...*Plugin) *Node { - for _, disabledPlugin := range strings.Fields(*DISABLE_PLUGINS.Value) { - disabledPlugins[strings.ToLower(disabledPlugin)] = true - } - - fmt.Println(" _____ _ _ ________ ______ ___ ___________ ") - fmt.Println(" / ___| | | |_ _| \\/ || \\/ || ___| ___ \\") - fmt.Println(" \\ `--.| |_| | | | | . . || . . || |__ | |_/ /") - fmt.Println(" `--. \\ _ | | | | |\\/| || |\\/| || __|| / ") - fmt.Println(" /\\__/ / | | |_| |_| | | || | | || |___| |\\ \\ ") - fmt.Println(" \\____/\\_| |_/\\___/\\_| |_/\\_| |_/\\____/\\_| \\_| fullnode 1.0") - fmt.Println() - node := &Node{ logLevel: *LOG_LEVEL.Value, loggers: make([]*Logger, 0), @@ -105,11 +92,9 @@ func (node *Node) LogFailure(pluginName string, message string) { } func (node *Node) Load(plugins ...*Plugin) { - node.LogInfo("Node", "Loading plugins ...") - if len(plugins) >= 1 { for _, plugin := range plugins { - if _, exists := disabledPlugins[strings.ToLower(strings.Replace(plugin.Name, " ", "", -1))]; !exists { + if _, exists := DisabledPlugins[strings.ToLower(strings.Replace(plugin.Name, " ", "", -1))]; !exists { plugin.wg = node.wg plugin.Node = node diff --git a/packages/ternary/ternary.go b/packages/ternary/ternary.go index 31ada8e4465e97a1953cee8e51f493c863bfc0cb..cedfbebd47a1c4b3f23d705a599b1bc30daf0c02 100644 --- a/packages/ternary/ternary.go +++ b/packages/ternary/ternary.go @@ -30,6 +30,10 @@ func (trytes Trytes) ToTrits() Trits { return trits } +func (trytes Trytes) ToString() string { + return string(trytes) +} + func (this Trits) ToBytes() []byte { tritsLength := len(this) bytesLength := (tritsLength + NUMBER_OF_TRITS_IN_A_BYTE - 1) / NUMBER_OF_TRITS_IN_A_BYTE diff --git a/plugins/cli/plugin.go b/plugins/cli/plugin.go index 2f73f2393320ac72375f53c686cc8550ee4ab447..6f0d789a3783d2d228c395f582a0f1bec87b790b 100644 --- a/plugins/cli/plugin.go +++ b/plugins/cli/plugin.go @@ -2,6 +2,7 @@ package cli import ( "flag" + "fmt" "strings" "github.com/iotaledger/goshimmer/packages/events" @@ -34,10 +35,26 @@ func init() { parameter.Events.AddString.Attach(events.NewClosure(onAddStringParameter)) flag.Usage = printUsage +} +func configure(ctx *node.Plugin) { flag.Parse() -} -func configure(ctx *node.Plugin) {} + for _, disabledPlugin := range strings.Fields(*node.DISABLE_PLUGINS.Value) { + node.DisabledPlugins[strings.ToLower(disabledPlugin)] = true + } + + fmt.Println(" _____ _ _ ________ ______ ___ ___________ ") + fmt.Println(" / ___| | | |_ _| \\/ || \\/ || ___| ___ \\") + fmt.Println(" \\ `--.| |_| | | | | . . || . . || |__ | |_/ /") + fmt.Println(" `--. \\ _ | | | | |\\/| || |\\/| || __|| / ") + fmt.Println(" /\\__/ / | | |_| |_| | | || | | || |___| |\\ \\ ") + fmt.Println(" \\____/\\_| |_/\\___/\\_| |_/\\_| |_/\\____/\\_| \\_| fullnode 1.0") + fmt.Println() + + ctx.Node.LogInfo("Node", "Loading plugins ...") +} -var PLUGIN = node.NewPlugin("CLI", configure) +var PLUGIN = node.NewPlugin("CLI", configure, func(plugin *node.Plugin) { + +}) diff --git a/plugins/zeromq/parameters.go b/plugins/zeromq/parameters.go new file mode 100644 index 0000000000000000000000000000000000000000..a74b4be24d4113a317d11dbc6efc15e60c711a93 --- /dev/null +++ b/plugins/zeromq/parameters.go @@ -0,0 +1,7 @@ +package zeromq + +import "github.com/iotaledger/goshimmer/packages/parameter" + +var ( + PORT = parameter.AddInt("ZEROMQ/PORT", 5556, "tcp port used to connect to the zmq feed") +) diff --git a/plugins/zeromq/plugin.go b/plugins/zeromq/plugin.go new file mode 100644 index 0000000000000000000000000000000000000000..c5299c7120cc83fd37f9f5f1b0e8ab27abcb4feb --- /dev/null +++ b/plugins/zeromq/plugin.go @@ -0,0 +1,97 @@ +package zeromq + +import ( + "strconv" + "strings" + "time" + + "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/events" + "github.com/iotaledger/goshimmer/packages/model/value_transaction" + "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/plugins/tangle" +) + +var PLUGIN = node.NewPlugin("ZeroMQ", configure, run) + +var publisher *Publisher +var emptyTag = strings.Repeat("9", 27) + +// Configure the zeromq plugin +func configure(plugin *node.Plugin) { + + daemon.Events.Shutdown.Attach(events.NewClosure(func() { + plugin.LogInfo("Stopping ZeroMQ Publisher ...") + + if err := publisher.Shutdown(); err != nil { + plugin.LogFailure("Stopping ZeroMQ Publisher: " + err.Error()) + } else { + plugin.LogSuccess("Stopping ZeroMQ Publisher ... done") + } + })) + + // TODO: should be tangle.Events.TransactionStored + tangle.Events.TransactionSolid.Attach(events.NewClosure(func(tx *value_transaction.ValueTransaction) { + // create goroutine for every event + go func() { + if err := publishTx(tx); err != nil { + plugin.LogFailure(err.Error()) + } + }() + })) +} + +// Start the zeromq plugin +func run(plugin *node.Plugin) { + + plugin.LogInfo("Starting ZeroMQ Publisher (port " + strconv.Itoa(*PORT.Value) + ") ...") + + daemon.BackgroundWorker(func() { + if err := startPublisher(plugin); err != nil { + plugin.LogFailure("Stopping ZeroMQ Publisher: " + err.Error()) + } else { + plugin.LogSuccess("Starting ZeroMQ Publisher (port " + strconv.Itoa(*PORT.Value) + ") ... done") + } + }) +} + +// Start the zmq publisher. +func startPublisher(plugin *node.Plugin) error { + pub, err := NewPublisher() + if err != nil { + return err + } + publisher = pub + + return publisher.Start(*PORT.Value) +} + +// Publish a transaction that has recently been added to the ledger +func publishTx(tx *value_transaction.ValueTransaction) error { + + hash := tx.MetaTransaction.GetHash() + address := tx.GetAddress() + value := tx.GetValue() + timestamp := int64(tx.GetTimestamp()) + trunk := tx.MetaTransaction.GetTrunkTransactionHash() + branch := tx.MetaTransaction.GetBranchTransactionHash() + stored := time.Now().Unix() + + messages := []string{ + "tx", // ZMQ event + hash.ToString(), // Transaction hash + address.ToString(), // Address + strconv.FormatInt(value, 10), // Value + emptyTag, // Obsolete tag + strconv.FormatInt(timestamp, 10), // Timestamp + "0", // Index of the transaction in the bundle + "0", // Last transaction index of the bundle + hash.ToString(), // Bundle hash + trunk.ToString(), // Trunk transaction hash + branch.ToString(), // Branch transaction hash + strconv.FormatInt(stored, 10), // Unix timestamp for when the transaction was received + emptyTag, // Tag + } + + return publisher.Send(messages) +} diff --git a/plugins/zeromq/publisher.go b/plugins/zeromq/publisher.go new file mode 100644 index 0000000000000000000000000000000000000000..521a9fb31361a994b6cbed9e94469e0bdab0a1ac --- /dev/null +++ b/plugins/zeromq/publisher.go @@ -0,0 +1,47 @@ +package zeromq + +import ( + "context" + "strconv" + "strings" + + zmq "github.com/go-zeromq/zmq4" +) + +// Simple zmq publisher abstraction +type Publisher struct { + socket zmq.Socket +} + +// Create a new publisher. +func NewPublisher() (*Publisher, error) { + + socket := zmq.NewPub(context.Background()) + + return &Publisher{ + socket: socket, + }, nil +} + +// Start the publisher on the given port. +func (pub *Publisher) Start(port int) error { + + return pub.socket.Listen("tcp://*:" + strconv.Itoa(port)) +} + +// Stop the publisher. +func (pub *Publisher) Shutdown() error { + + return pub.socket.Close() +} + +// Publish a new list of messages. +func (pub *Publisher) Send(messages []string) error { + if len(messages) == 0 || len(messages[0]) == 0 { + panic("zmq: invalid messages") + } + + data := strings.Join(messages, " ") + msg := zmq.NewMsgString(data) + return pub.socket.Send(msg) +}