Skip to content
Snippets Groups Projects
bundleprocessor.go 3.74 KiB
package bundleprocessor

import (
	"fmt"
	"runtime"

	"github.com/iotaledger/goshimmer/packages/errors"
	"github.com/iotaledger/goshimmer/packages/model/bundle"
	"github.com/iotaledger/goshimmer/packages/model/transactionmetadata"
	"github.com/iotaledger/goshimmer/packages/model/value_transaction"
	"github.com/iotaledger/goshimmer/packages/workerpool"
	"github.com/iotaledger/goshimmer/plugins/tangle"
	"github.com/iotaledger/iota.go/trinary"
)

var workerPool = workerpool.New(func(task workerpool.Task) {
	if err := ProcessSolidBundleHead(task.Param(0).(*value_transaction.ValueTransaction)); err != nil {
		Events.Error.Trigger(err)
	}

	task.Return(nil)
}, workerpool.WorkerCount(WORKER_COUNT), workerpool.QueueSize(2*WORKER_COUNT))

var WORKER_COUNT = runtime.NumCPU()

func ProcessSolidBundleHead(headTransaction *value_transaction.ValueTransaction) errors.IdentifiableError {
	// only process the bundle if we didn't process it, yet
	_, err := tangle.GetBundle(headTransaction.GetHash(), func(headTransactionHash trinary.Trytes) (*bundle.Bundle, errors.IdentifiableError) {
		// abort if bundle syntax is wrong
		if !headTransaction.IsHead() {
			return nil, ErrProcessBundleFailed.Derive(errors.New("invalid parameter"), "transaction needs to be head of bundle")
		}

		// initialize event variables
		newBundle := bundle.New(headTransactionHash)
		bundleTransactions := make([]*value_transaction.ValueTransaction, 0)

		// iterate through trunk transactions until we reach the tail
		currentTransaction := headTransaction
		for {
			// abort if we reached a previous head
			if currentTransaction.IsHead() && currentTransaction != headTransaction {
				newBundle.SetTransactionHashes(mapTransactionsToTransactionHashes(bundleTransactions))

				Events.InvalidBundle.Trigger(newBundle, bundleTransactions)

				return nil, ErrProcessBundleFailed.Derive(errors.New("invalid bundle found"), "missing bundle tail")
			}

			// update bundle transactions
			bundleTransactions = append(bundleTransactions, currentTransaction)

			// retrieve & update metadata
			currentTransactionMetadata, dbErr := tangle.GetTransactionMetadata(currentTransaction.GetHash(), transactionmetadata.New)
			if dbErr != nil {
				return nil, ErrProcessBundleFailed.Derive(dbErr, "failed to retrieve transaction metadata")
			}
			currentTransactionMetadata.SetBundleHeadHash(headTransactionHash)

			// update value bundle flag
			if !newBundle.IsValueBundle() && currentTransaction.GetValue() < 0 {
				newBundle.SetValueBundle(true)
			}

			// if we are done -> trigger events
			if currentTransaction.IsTail() {
				newBundle.SetTransactionHashes(mapTransactionsToTransactionHashes(bundleTransactions))

				if newBundle.IsValueBundle() {
					valueBundleProcessorWorkerPool.Submit(newBundle, bundleTransactions)

					return newBundle, nil
				}

				Events.BundleSolid.Trigger(newBundle, bundleTransactions)

				return newBundle, nil
			}

			// try to iterate to next turn
			if nextTransaction, err := tangle.GetTransaction(currentTransaction.GetTrunkTransactionHash()); err != nil {
				return nil, ErrProcessBundleFailed.Derive(err, "failed to retrieve trunk while processing bundle")
			} else if nextTransaction == nil {
				fmt.Println(ErrProcessBundleFailed.Derive(errors.New("missing transaction "+currentTransaction.GetTrunkTransactionHash()), "failed to retrieve trunk while processing bundle"))
				return nil, ErrProcessBundleFailed.Derive(err, "failed to retrieve trunk while processing bundle")
			} else {
				currentTransaction = nextTransaction
			}
		}
	})

	return err
}

func mapTransactionsToTransactionHashes(transactions []*value_transaction.ValueTransaction) (result []trinary.Trytes) {
	result = make([]trinary.Trytes, len(transactions))
	for k, v := range transactions {
		result[k] = v.GetHash()
	}

	return
}