-
Luca Moser authored
Signed-off-by:
Luca Moser <moser.luca@gmail.com>
Luca Moser authoredSigned-off-by:
Luca Moser <moser.luca@gmail.com>
plugin.go 3.13 KiB
package zeromq
import (
"strconv"
"strings"
"time"
"github.com/iotaledger/goshimmer/packages/model/value_transaction"
"github.com/iotaledger/goshimmer/packages/parameter"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/tangle"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
)
// zeromq logging is disabled by default
var PLUGIN = node.NewPlugin("ZeroMQ", node.Disabled, configure, run)
var log *logger.Logger
var publisher *Publisher
var emptyTag = strings.Repeat("9", 27)
// Configure the zeromq plugin
func configure(plugin *node.Plugin) {
log = logger.NewLogger("ZeroMQ")
tangle.Events.TransactionStored.Attach(events.NewClosure(func(tx *value_transaction.ValueTransaction) {
// create goroutine for every event
go func() {
if err := publishTx(tx); err != nil {
log.Errorf("error publishing tx: %s", err.Error())
}
}()
}))
}
// Start the zeromq plugin
func run(plugin *node.Plugin) {
zeromqPort := parameter.NodeConfig.GetInt(ZEROMQ_PORT)
log.Infof("Starting ZeroMQ Publisher (port %d) ...", zeromqPort)
daemon.BackgroundWorker("ZeroMQ Publisher", func(shutdownSignal <-chan struct{}) {
if err := startPublisher(plugin); err != nil {
log.Errorf("Stopping ZeroMQ Publisher: %s", err.Error())
} else {
log.Infof("Starting ZeroMQ Publisher (port %d) ... done", zeromqPort)
}
<-shutdownSignal
log.Info("Stopping ZeroMQ Publisher ...")
if err := publisher.Shutdown(); err != nil {
log.Errorf("Stopping ZeroMQ Publisher: %s", err.Error())
} else {
log.Info("Stopping ZeroMQ Publisher ... done")
}
}, shutdown.ShutdownPriorityZMQ)
}
// Start the zmq publisher.
func startPublisher(plugin *node.Plugin) error {
pub, err := NewPublisher()
if err != nil {
return err
}
publisher = pub
return publisher.Start(parameter.NodeConfig.GetInt(ZEROMQ_PORT))
}
// Publish a transaction that has recently been added to the ledger
func publishTx(tx *value_transaction.ValueTransaction) error {
hash := tx.MetaTransaction.GetHash()
address := tx.GetAddress()
value := tx.GetValue()
timestamp := int64(tx.GetTimestamp())
trunk := tx.MetaTransaction.GetTrunkTransactionHash()
branch := tx.MetaTransaction.GetBranchTransactionHash()
stored := time.Now().Unix()
messages := []string{
"tx", // ZMQ event
hash, // Transaction hash
address, // Address
strconv.FormatInt(value, 10), // Value
emptyTag, // Obsolete tag
strconv.FormatInt(timestamp, 10), // Timestamp
"0", // Index of the transaction in the bundle
"0", // Last transaction index of the bundle
hash, // Bundle hash
trunk, // Trunk transaction hash
branch, // Branch transaction hash
strconv.FormatInt(stored, 10), // Unix timestamp for when the transaction was received
emptyTag, // Tag
}
return publisher.Send(messages)
}