Skip to content
Snippets Groups Projects
plugin.go 3.13 KiB
package zeromq

import (
	"strconv"
	"strings"
	"time"

	"github.com/iotaledger/goshimmer/packages/model/value_transaction"
	"github.com/iotaledger/goshimmer/packages/parameter"
	"github.com/iotaledger/goshimmer/packages/shutdown"
	"github.com/iotaledger/goshimmer/plugins/tangle"
	"github.com/iotaledger/hive.go/daemon"
	"github.com/iotaledger/hive.go/events"
	"github.com/iotaledger/hive.go/logger"
	"github.com/iotaledger/hive.go/node"
)

// zeromq logging is disabled by default
var PLUGIN = node.NewPlugin("ZeroMQ", node.Disabled, configure, run)
var log *logger.Logger
var publisher *Publisher
var emptyTag = strings.Repeat("9", 27)

// Configure the zeromq plugin
func configure(plugin *node.Plugin) {
	log = logger.NewLogger("ZeroMQ")
	tangle.Events.TransactionStored.Attach(events.NewClosure(func(tx *value_transaction.ValueTransaction) {
		// create goroutine for every event
		go func() {
			if err := publishTx(tx); err != nil {
				log.Errorf("error publishing tx: %s", err.Error())
			}
		}()
	}))
}

// Start the zeromq plugin
func run(plugin *node.Plugin) {
	zeromqPort := parameter.NodeConfig.GetInt(ZEROMQ_PORT)
	log.Infof("Starting ZeroMQ Publisher (port %d) ...", zeromqPort)

	daemon.BackgroundWorker("ZeroMQ Publisher", func(shutdownSignal <-chan struct{}) {
		if err := startPublisher(plugin); err != nil {
			log.Errorf("Stopping ZeroMQ Publisher: %s", err.Error())
		} else {
			log.Infof("Starting ZeroMQ Publisher (port %d) ... done", zeromqPort)
		}

		<-shutdownSignal

		log.Info("Stopping ZeroMQ Publisher ...")
		if err := publisher.Shutdown(); err != nil {
			log.Errorf("Stopping ZeroMQ Publisher: %s", err.Error())
		} else {
			log.Info("Stopping ZeroMQ Publisher ... done")
		}
	}, shutdown.ShutdownPriorityZMQ)
}

// Start the zmq publisher.
func startPublisher(plugin *node.Plugin) error {
	pub, err := NewPublisher()
	if err != nil {
		return err
	}
	publisher = pub

	return publisher.Start(parameter.NodeConfig.GetInt(ZEROMQ_PORT))
}
// Publish a transaction that has recently been added to the ledger
func publishTx(tx *value_transaction.ValueTransaction) error {

	hash := tx.MetaTransaction.GetHash()
	address := tx.GetAddress()
	value := tx.GetValue()
	timestamp := int64(tx.GetTimestamp())
	trunk := tx.MetaTransaction.GetTrunkTransactionHash()
	branch := tx.MetaTransaction.GetBranchTransactionHash()
	stored := time.Now().Unix()

	messages := []string{
		"tx",                             // ZMQ event
		hash,                             // Transaction hash
		address,                          // Address
		strconv.FormatInt(value, 10),     // Value
		emptyTag,                         // Obsolete tag
		strconv.FormatInt(timestamp, 10), // Timestamp
		"0",                              // Index of the transaction in the bundle
		"0",                              // Last transaction index of the bundle
		hash,                             // Bundle hash
		trunk,                            // Trunk transaction hash
		branch,                           // Branch transaction hash
		strconv.FormatInt(stored, 10),    // Unix timestamp for when the transaction was received
		emptyTag,                         // Tag
	}

	return publisher.Send(messages)
}