package tangle

import (
	"container/list"
	"fmt"
	"math"
	"sort"
	"time"

	"github.com/dgraph-io/badger/v2"
	"github.com/iotaledger/hive.go/async"
	"github.com/iotaledger/hive.go/marshalutil"
	"github.com/iotaledger/hive.go/objectstorage"
	"github.com/iotaledger/hive.go/types"
	"golang.org/x/crypto/blake2b"

	"github.com/iotaledger/goshimmer/packages/binary/storageprefix"
	"github.com/iotaledger/goshimmer/packages/binary/valuetransfer/address"
	"github.com/iotaledger/goshimmer/packages/binary/valuetransfer/balance"
	"github.com/iotaledger/goshimmer/packages/binary/valuetransfer/payload"
	"github.com/iotaledger/goshimmer/packages/binary/valuetransfer/transaction"
)

// Tangle represents the value tangle that consists out of value payloads.
// It is an independent ontology, that lives inside the tangle.
type Tangle struct {
	payloadStorage         *objectstorage.ObjectStorage
	payloadMetadataStorage *objectstorage.ObjectStorage
	approverStorage        *objectstorage.ObjectStorage
	missingPayloadStorage  *objectstorage.ObjectStorage
	attachmentStorage      *objectstorage.ObjectStorage

	outputStorage        *objectstorage.ObjectStorage
	consumerStorage      *objectstorage.ObjectStorage
	missingOutputStorage *objectstorage.ObjectStorage
	branchStorage        *objectstorage.ObjectStorage

	Events Events

	storePayloadWorkerPool async.WorkerPool
	solidifierWorkerPool   async.WorkerPool
	bookerWorkerPool       async.WorkerPool
	cleanupWorkerPool      async.WorkerPool
}

func New(badgerInstance *badger.DB) (result *Tangle) {
	osFactory := objectstorage.NewFactory(badgerInstance, storageprefix.ValueTransfers)

	result = &Tangle{
		// payload related storage
		payloadStorage:         osFactory.New(osPayload, osPayloadFactory, objectstorage.CacheTime(time.Second)),
		payloadMetadataStorage: osFactory.New(osPayloadMetadata, osPayloadMetadataFactory, objectstorage.CacheTime(time.Second)),
		missingPayloadStorage:  osFactory.New(osMissingPayload, osMissingPayloadFactory, objectstorage.CacheTime(time.Second)),
		approverStorage:        osFactory.New(osApprover, osPayloadApproverFactory, objectstorage.CacheTime(time.Second), objectstorage.PartitionKey(payload.IdLength, payload.IdLength), objectstorage.KeysOnly(true)),

		// transaction related storage
		attachmentStorage:    osFactory.New(osAttachment, osAttachmentFactory, objectstorage.CacheTime(time.Second)),
		outputStorage:        osFactory.New(osOutput, osOutputFactory, OutputKeyPartitions, objectstorage.CacheTime(time.Second)),
		missingOutputStorage: osFactory.New(osMissingOutput, osMissingOutputFactory, MissingOutputKeyPartitions, objectstorage.CacheTime(time.Second)),
		consumerStorage:      osFactory.New(osConsumer, osConsumerFactory, ConsumerPartitionKeys, objectstorage.CacheTime(time.Second)),

		Events: *newEvents(),
	}

	return
}

// AttachPayload adds a new payload to the value tangle.
func (tangle *Tangle) AttachPayload(payload *payload.Payload) {
	tangle.storePayloadWorkerPool.Submit(func() { tangle.storePayloadWorker(payload) })
}

// GetPayload retrieves a payload from the object storage.
func (tangle *Tangle) GetPayload(payloadId payload.Id) *payload.CachedPayload {
	return &payload.CachedPayload{CachedObject: tangle.payloadStorage.Load(payloadId.Bytes())}
}

// GetPayloadMetadata retrieves the metadata of a value payload from the object storage.
func (tangle *Tangle) GetPayloadMetadata(payloadId payload.Id) *CachedPayloadMetadata {
	return &CachedPayloadMetadata{CachedObject: tangle.payloadMetadataStorage.Load(payloadId.Bytes())}
}

// GetPayloadMetadata retrieves the metadata of a value payload from the object storage.
func (tangle *Tangle) GetTransactionMetadata(transactionId transaction.Id) *CachedTransactionMetadata {
	return &CachedTransactionMetadata{CachedObject: tangle.missingOutputStorage.Load(transactionId.Bytes())}
}

func (tangle *Tangle) GetTransactionOutput(outputId transaction.OutputId) *CachedOutput {
	return &CachedOutput{CachedObject: tangle.outputStorage.Load(outputId.Bytes())}
}

// GetApprovers retrieves the approvers of a payload from the object storage.
func (tangle *Tangle) GetApprovers(payloadId payload.Id) CachedApprovers {
	approvers := make(CachedApprovers, 0)
	tangle.approverStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool {
		approvers = append(approvers, &CachedPayloadApprover{CachedObject: cachedObject})

		return true
	}, payloadId.Bytes())

	return approvers
}

// GetConsumers retrieves the approvers of a payload from the object storage.
func (tangle *Tangle) GetConsumers(outputId transaction.OutputId) CachedConsumers {
	consumers := make(CachedConsumers, 0)
	tangle.consumerStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool {
		consumers = append(consumers, &CachedConsumer{CachedObject: cachedObject})

		return true
	}, outputId.Bytes())

	return consumers
}

// GetApprovers retrieves the approvers of a payload from the object storage.
func (tangle *Tangle) GetAttachments(transactionId transaction.Id) CachedAttachments {
	attachments := make(CachedAttachments, 0)
	tangle.attachmentStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool {
		attachments = append(attachments, &CachedAttachment{CachedObject: cachedObject})

		return true
	}, transactionId.Bytes())

	return attachments
}

// Shutdown stops the worker pools and shuts down the object storage instances.
func (tangle *Tangle) Shutdown() *Tangle {
	tangle.storePayloadWorkerPool.ShutdownGracefully()
	tangle.solidifierWorkerPool.ShutdownGracefully()
	tangle.cleanupWorkerPool.ShutdownGracefully()

	tangle.payloadStorage.Shutdown()
	tangle.payloadMetadataStorage.Shutdown()
	tangle.approverStorage.Shutdown()
	tangle.missingPayloadStorage.Shutdown()

	return tangle
}

// Prune resets the database and deletes all objects (for testing or "node resets").
func (tangle *Tangle) Prune() error {
	for _, storage := range []*objectstorage.ObjectStorage{
		tangle.payloadStorage,
		tangle.payloadMetadataStorage,
		tangle.approverStorage,
		tangle.missingPayloadStorage,
	} {
		if err := storage.Prune(); err != nil {
			return err
		}
	}

	return nil
}

// storePayloadWorker is the worker function that stores the payload and calls the corresponding storage events.
func (tangle *Tangle) storePayloadWorker(payloadToStore *payload.Payload) {
	// store the payload and transaction models
	cachedPayload, cachedPayloadMetadata, payloadStored := tangle.storePayload(payloadToStore)
	if !payloadStored {
		// abort if we have seen the payload already
		return
	}
	cachedTransactionMetadata, transactionStored := tangle.storeTransaction(payloadToStore.Transaction())

	// store the references between the different entities (we do this after the actual entities were stored, so that
	// all the metadata models exist in the database as soon as the entities are reachable by walks).
	tangle.storePayloadReferences(payloadToStore)
	if transactionStored {
		tangle.storeTransactionReferences(payloadToStore.Transaction())
	}

	// trigger events
	if tangle.missingPayloadStorage.DeleteIfPresent(payloadToStore.Id().Bytes()) {
		tangle.Events.MissingPayloadReceived.Trigger(cachedPayload, cachedPayloadMetadata)
	}
	tangle.Events.PayloadAttached.Trigger(cachedPayload, cachedPayloadMetadata)

	// check solidity
	tangle.solidifierWorkerPool.Submit(func() {
		tangle.solidifyTransactionWorker(cachedPayload, cachedPayloadMetadata, cachedTransactionMetadata)
	})
}

func (tangle *Tangle) storePayload(payloadToStore *payload.Payload) (cachedPayload *payload.CachedPayload, cachedMetadata *CachedPayloadMetadata, payloadStored bool) {
	if _tmp, transactionIsNew := tangle.payloadStorage.StoreIfAbsent(payloadToStore); !transactionIsNew {
		return
	} else {
		cachedPayload = &payload.CachedPayload{CachedObject: _tmp}
		cachedMetadata = &CachedPayloadMetadata{CachedObject: tangle.payloadMetadataStorage.Store(NewPayloadMetadata(payloadToStore.Id()))}
		payloadStored = true

		return
	}
}

func (tangle *Tangle) storeTransaction(tx *transaction.Transaction) (cachedTransactionMetadata *CachedTransactionMetadata, transactionStored bool) {
	cachedTransactionMetadata = &CachedTransactionMetadata{CachedObject: tangle.payloadMetadataStorage.ComputeIfAbsent(tx.Id().Bytes(), func(key []byte) objectstorage.StorableObject {
		transactionStored = true

		result := NewTransactionMetadata(tx.Id())
		result.Persist()
		result.SetModified()

		return result
	})}

	return
}

func (tangle *Tangle) storePayloadReferences(payload *payload.Payload) {
	// store trunk approver
	trunkId := payload.TrunkId()
	tangle.approverStorage.Store(NewPayloadApprover(trunkId, payload.Id())).Release()

	// store branch approver
	if branchId := payload.BranchId(); branchId != trunkId {
		tangle.approverStorage.Store(NewPayloadApprover(branchId, trunkId)).Release()
	}

	// store a reference from the transaction to the payload that attached it
	tangle.attachmentStorage.Store(NewAttachment(payload.Transaction().Id(), payload.Id()))
}

func (tangle *Tangle) storeTransactionReferences(tx *transaction.Transaction) {
	// store references to the consumed outputs
	tx.Inputs().ForEach(func(outputId transaction.OutputId) bool {
		tangle.consumerStorage.Store(NewConsumer(outputId, tx.Id()))

		return true
	})
}

func (tangle *Tangle) popElementsFromSolidificationStack(stack *list.List) (*payload.CachedPayload, *CachedPayloadMetadata, *CachedTransactionMetadata) {
	currentSolidificationEntry := stack.Front()
	currentCachedPayload := currentSolidificationEntry.Value.([3]interface{})[0]
	currentCachedMetadata := currentSolidificationEntry.Value.([3]interface{})[1]
	currentCachedTransactionMetadata := currentSolidificationEntry.Value.([3]interface{})[2]
	stack.Remove(currentSolidificationEntry)

	return currentCachedPayload.(*payload.CachedPayload), currentCachedMetadata.(*CachedPayloadMetadata), currentCachedTransactionMetadata.(*CachedTransactionMetadata)
}

// solidifyTransactionWorker is the worker function that solidifies the payloads (recursively from past to present).
func (tangle *Tangle) solidifyTransactionWorker(cachedPayload *payload.CachedPayload, cachedMetadata *CachedPayloadMetadata, cachedTransactionMetadata *CachedTransactionMetadata) {
	// initialize the stack
	solidificationStack := list.New()
	solidificationStack.PushBack([3]interface{}{cachedPayload, cachedMetadata, cachedTransactionMetadata})

	// process payloads that are supposed to be checked for solidity recursively
	for solidificationStack.Len() > 0 {
		// execute logic inside a func, so we can use defer to release the objects
		func() {
			// retrieve cached objects
			currentCachedPayload, currentCachedMetadata, currentCachedTransactionMetadata := tangle.popElementsFromSolidificationStack(solidificationStack)
			defer currentCachedPayload.Release()
			defer currentCachedMetadata.Release()
			defer currentCachedTransactionMetadata.Release()

			// unwrap cached objects
			currentPayload := currentCachedPayload.Unwrap()
			currentPayloadMetadata := currentCachedMetadata.Unwrap()
			currentTransactionMetadata := currentCachedTransactionMetadata.Unwrap()

			// abort if any of the retrieved models is nil or payload is not solid
			if currentPayload == nil || currentPayloadMetadata == nil || currentTransactionMetadata == nil || !tangle.isPayloadSolid(currentPayload, currentPayloadMetadata) {
				return
			}

			// abort if the transaction is not solid or invalid
			if transactionSolid, err := tangle.isTransactionSolid(currentPayload.Transaction(), currentTransactionMetadata); !transactionSolid || err != nil {
				if err != nil {
					// TODO: TRIGGER INVALID TX + REMOVE TXS THAT APPROVE IT
					fmt.Println(err)
				}

				return
			}

			// abort if the payload was marked as solid already (if a payload is solid already then the tx is also solid)
			if !currentPayloadMetadata.SetSolid(true) {
				return
			}

			// ... trigger solid event ...
			tangle.Events.PayloadSolid.Trigger(currentCachedPayload, currentCachedMetadata)

			// ... and schedule check of approvers
			tangle.ForeachApprovers(currentPayload.Id(), func(payload *payload.CachedPayload, payloadMetadata *CachedPayloadMetadata, cachedTransactionMetadata *CachedTransactionMetadata) {
				solidificationStack.PushBack([3]interface{}{payload, payloadMetadata, cachedTransactionMetadata})
			})

			// book the outputs
			if !currentTransactionMetadata.SetSolid(true) {
				return
			}

			tangle.Events.TransactionSolid.Trigger(currentPayload.Transaction(), currentTransactionMetadata)

			tangle.ForEachConsumers(currentPayload.Transaction(), func(payload *payload.CachedPayload, payloadMetadata *CachedPayloadMetadata, cachedTransactionMetadata *CachedTransactionMetadata) {
				solidificationStack.PushBack([3]interface{}{payload, payloadMetadata, cachedTransactionMetadata})
			})

			payloadToBook := cachedPayload.Retain()
			tangle.bookerWorkerPool.Submit(func() {
				tangle.bookPayloadTransaction(payloadToBook)
			})
		}()
	}
}

func (tangle *Tangle) bookPayloadTransaction(cachedPayload *payload.CachedPayload) {
	payloadToBook := cachedPayload.Unwrap()
	defer cachedPayload.Release()

	if payloadToBook == nil {
		return
	}
	transactionToBook := payloadToBook.Transaction()

	consumedBranches := make(map[BranchId]types.Empty)
	conflictingConsumersToFork := make(map[transaction.Id]types.Empty)
	createFork := false

	inputsSuccessfullyProcessed := payloadToBook.Transaction().Inputs().ForEach(func(outputId transaction.OutputId) bool {
		cachedOutput := tangle.GetTransactionOutput(outputId)
		defer cachedOutput.Release()

		// abort if the output could not be found
		output := cachedOutput.Unwrap()
		if output == nil {
			return false
		}

		consumedBranches[output.BranchId()] = types.Void

		// continue if we are the first consumer and there is no double spend
		consumerCount, firstConsumerId := output.RegisterConsumer(transactionToBook.Id())
		if consumerCount == 0 {
			return true
		}

		// fork into a new branch
		createFork = true

		// also fork the previous consumer
		if consumerCount == 1 {
			conflictingConsumersToFork[firstConsumerId] = types.Void
		}

		return true
	})

	if !inputsSuccessfullyProcessed {
		return
	}

	transactionToBook.Outputs().ForEach(func(address address.Address, balances []*balance.Balance) bool {
		newOutput := NewOutput(address, transactionToBook.Id(), MasterBranchId, balances)
		newOutput.SetSolid(true)
		tangle.outputStorage.Store(newOutput)

		return true
	})

	fmt.Println(consumedBranches)
	fmt.Println(MasterBranchId)
	fmt.Println(createFork)
}

func (tangle *Tangle) InheritBranches(branches ...BranchId) (cachedAggregatedBranch *CachedBranch, err error) {
	// return the MasterBranch if we have no branches in the parameters
	if len(branches) == 0 {
		cachedAggregatedBranch = tangle.GetBranch(MasterBranchId)

		return
	}

	if len(branches) == 1 {
		cachedAggregatedBranch = tangle.GetBranch(branches[0])

		return
	}

	// filter out duplicates and shared ancestor Branches (abort if we faced an error)
	deepestCommonAncestors, err := tangle.findDeepestCommonAncestorBranches(branches...)
	if err != nil {
		return
	}

	// if there is only one branch that we found, then we are done
	if len(deepestCommonAncestors) == 1 {
		for _, firstBranchInList := range deepestCommonAncestors {
			cachedAggregatedBranch = firstBranchInList
		}

		return
	}

	// if there is more than one parents: aggregate
	aggregatedBranchId, aggregatedBranchParents, err := tangle.determineAggregatedBranchDetails(deepestCommonAncestors)
	if err != nil {
		return
	}

	newAggregatedBranchCreated := false
	cachedAggregatedBranch = &CachedBranch{CachedObject: tangle.branchStorage.ComputeIfAbsent(aggregatedBranchId.Bytes(), func(key []byte) (object objectstorage.StorableObject) {
		aggregatedReality := NewBranch(aggregatedBranchId, aggregatedBranchParents)

		// TODO: FIX
		/*
			for _, parentRealityId := range aggregatedBranchParents {
				tangle.GetBranch(parentRealityId).Consume(func(branch *Branch) {
					branch.RegisterSubReality(aggregatedRealityId)
				})
			}
		*/

		aggregatedReality.SetModified()

		newAggregatedBranchCreated = true

		return aggregatedReality
	})}

	if !newAggregatedBranchCreated {
		fmt.Println("1")
		// TODO: FIX
		/*
			aggregatedBranch := cachedAggregatedBranch.Unwrap()

			for _, realityId := range aggregatedBranchParents {
				if aggregatedBranch.AddParentReality(realityId) {
					tangle.GetBranch(realityId).Consume(func(branch *Branch) {
						branch.RegisterSubReality(aggregatedRealityId)
					})
				}
			}
		*/
	}

	return
}

func (tangle *Tangle) determineAggregatedBranchDetails(deepestCommonAncestors CachedBranches) (aggregatedBranchId BranchId, aggregatedBranchParents []BranchId, err error) {
	aggregatedBranchParents = make([]BranchId, len(deepestCommonAncestors))

	i := 0
	aggregatedBranchConflictParents := make(CachedBranches)
	for branchId, cachedBranch := range deepestCommonAncestors {
		// release all following entries if we have encountered an error
		if err != nil {
			cachedBranch.Release()

			continue
		}

		// store BranchId as parent
		aggregatedBranchParents[i] = branchId
		i++

		// abort if we could not unwrap the Branch (should never happen)
		branch := cachedBranch.Unwrap()
		if branch == nil {
			cachedBranch.Release()

			err = fmt.Errorf("failed to unwrap brach '%s'", branchId)

			continue
		}

		if branch.IsAggregated() {
			aggregatedBranchConflictParents[branchId] = cachedBranch

			continue
		}

		err = tangle.collectClosestConflictAncestors(branch, aggregatedBranchConflictParents)

		cachedBranch.Release()
	}

	if err != nil {
		aggregatedBranchConflictParents.Release()
		aggregatedBranchConflictParents = nil

		return
	}

	aggregatedBranchId = tangle.generateAggregatedBranchId(aggregatedBranchConflictParents)

	return
}

func (tangle *Tangle) generateAggregatedBranchId(aggregatedBranches CachedBranches) BranchId {
	counter := 0
	branchIds := make([]BranchId, len(aggregatedBranches))
	for branchId, cachedBranch := range aggregatedBranches {
		branchIds[counter] = branchId

		counter++

		cachedBranch.Release()
	}

	sort.Slice(branchIds, func(i, j int) bool {
		for k := 0; k < len(branchIds[k]); k++ {
			if branchIds[i][k] < branchIds[j][k] {
				return true
			} else if branchIds[i][k] > branchIds[j][k] {
				return false
			}
		}

		return false
	})

	marshalUtil := marshalutil.New(BranchIdLength * len(branchIds))
	for _, branchId := range branchIds {
		marshalUtil.WriteBytes(branchId.Bytes())
	}

	return blake2b.Sum256(marshalUtil.Bytes())
}

func (tangle *Tangle) collectClosestConflictAncestors(branch *Branch, closestConflictAncestors CachedBranches) (err error) {
	// initialize stack
	stack := list.New()
	for _, parentRealityId := range branch.ParentBranches() {
		stack.PushBack(parentRealityId)
	}

	// work through stack
	processedBranches := make(map[BranchId]types.Empty)
	for stack.Len() != 0 {
		// iterate through the parents (in a func so we can used defer)
		err = func() error {
			// pop parent branch id from stack
			firstStackElement := stack.Front()
			defer stack.Remove(firstStackElement)
			parentBranchId := stack.Front().Value.(BranchId)

			// abort if the parent has been processed already
			if _, branchProcessed := processedBranches[parentBranchId]; branchProcessed {
				return nil
			}
			processedBranches[parentBranchId] = types.Void

			// load parent branch from database
			cachedParentBranch := tangle.GetBranch(parentBranchId)

			// abort if the parent branch could not be found (should never happen)
			parentBranch := cachedParentBranch.Unwrap()
			if parentBranch == nil {
				cachedParentBranch.Release()

				return fmt.Errorf("failed to load branch '%s'", parentBranchId)
			}

			// if the parent Branch is not aggregated, then we have found the closest conflict ancestor
			if !parentBranch.IsAggregated() {
				closestConflictAncestors[parentBranchId] = cachedParentBranch

				return nil
			}

			// queue parents for additional check (recursion)
			for _, parentRealityId := range parentBranch.ParentBranches() {
				stack.PushBack(parentRealityId)
			}

			// release the branch (we don't need it anymore)
			cachedParentBranch.Release()

			return nil
		}()

		if err != nil {
			return
		}
	}

	return
}

// findDeepestCommonAncestorBranches takes a number of BranchIds and determines the most specialized Branches (furthest
// away from the MasterBranch) in that list, that contains all of the named BranchIds.
//
// Example: If we hand in "A, B" and B has A as its parent, then the result will contain the Branch B, because B is a
//          child of A.
func (tangle *Tangle) findDeepestCommonAncestorBranches(branches ...BranchId) (result CachedBranches, err error) {
	result = make(CachedBranches)

	processedBranches := make(map[BranchId]types.Empty)
	for _, branchId := range branches {
		err = func() error {
			// continue, if we have processed this branch already
			if _, exists := processedBranches[branchId]; exists {
				return nil
			}
			processedBranches[branchId] = types.Void

			// load branch from objectstorage
			cachedBranch := tangle.GetBranch(branchId)

			// abort if we could not load the CachedBranch
			branch := cachedBranch.Unwrap()
			if branch == nil {
				cachedBranch.Release()

				return fmt.Errorf("could not load branch '%s'", branchId)
			}

			// check branches position relative to already aggregated branches
			for aggregatedBranchId, cachedAggregatedBranch := range result {
				// abort if we can not load the branch
				aggregatedBranch := cachedAggregatedBranch.Unwrap()
				if aggregatedBranch == nil {
					return fmt.Errorf("could not load branch '%s'", aggregatedBranchId)
				}

				// if the current branch is an ancestor of an already aggregated branch, then we have found the more
				// "specialized" branch already and keep it
				if isAncestor, ancestorErr := tangle.branchIsAncestorOfBranch(branch, aggregatedBranch); isAncestor || ancestorErr != nil {
					return ancestorErr
				}

				// check if the aggregated Branch is an ancestor of the current Branch and abort if we face an error
				isAncestor, ancestorErr := tangle.branchIsAncestorOfBranch(aggregatedBranch, branch)
				if ancestorErr != nil {
					return ancestorErr
				}

				// if the aggregated branch is an ancestor of the current branch, then we have found a more specialized
				// Branch and replace the old one with this one.
				if isAncestor {
					// replace aggregated branch if we have found a more specialized on
					delete(result, aggregatedBranchId)
					cachedAggregatedBranch.Release()

					result[branchId] = cachedBranch

					return nil
				}
			}

			// store the branch as a new aggregate candidate if it was not found to be in any relation with the already
			// aggregated ones.
			result[branchId] = cachedBranch

			return nil
		}()

		// abort if an error occurred while processing the current branch
		if err != nil {
			result.Release()
			result = nil

			return
		}
	}

	return
}

func (tangle *Tangle) branchIsAncestorOfBranch(ancestor *Branch, descendant *Branch) (isAncestor bool, err error) {
	if ancestor.Id() == descendant.Id() {
		return true, nil
	}

	ancestorBranches, err := tangle.getAncestorBranches(descendant)
	if err != nil {
		return
	}

	ancestorBranches.Consume(func(ancestorOfDescendant *Branch) {
		if ancestorOfDescendant.Id() == ancestor.Id() {
			isAncestor = true
		}
	})

	return
}

func (tangle *Tangle) getAncestorBranches(branch *Branch) (ancestorBranches CachedBranches, err error) {
	// initialize result
	ancestorBranches = make(CachedBranches)

	// initialize stack
	stack := list.New()
	for _, parentRealityId := range branch.ParentBranches() {
		stack.PushBack(parentRealityId)
	}

	// work through stack
	for stack.Len() != 0 {
		// iterate through the parents (in a func so we can used defer)
		err = func() error {
			// pop parent branch id from stack
			firstStackElement := stack.Front()
			defer stack.Remove(firstStackElement)
			parentBranchId := stack.Front().Value.(BranchId)

			// abort if the parent has been processed already
			if _, branchProcessed := ancestorBranches[parentBranchId]; branchProcessed {
				return nil
			}

			// load parent branch from database
			cachedParentBranch := tangle.GetBranch(parentBranchId)

			// abort if the parent branch could not be founds (should never happen)
			parentBranch := cachedParentBranch.Unwrap()
			if parentBranch == nil {
				cachedParentBranch.Release()

				return fmt.Errorf("failed to unwrap branch '%s'", parentBranchId)
			}

			// store parent branch in result
			ancestorBranches[parentBranchId] = cachedParentBranch

			// queue parents for additional check (recursion)
			for _, parentRealityId := range parentBranch.ParentBranches() {
				stack.PushBack(parentRealityId)
			}

			return nil
		}()

		// abort if an error occurs while trying to process the parents
		if err != nil {
			ancestorBranches.Release()
			ancestorBranches = nil

			return
		}
	}

	return
}

func (tangle *Tangle) GetBranch(branchId BranchId) *CachedBranch {
	// TODO: IMPLEMENT
	return nil
}

func (tangle *Tangle) ForeachApprovers(payloadId payload.Id, consume func(payload *payload.CachedPayload, payloadMetadata *CachedPayloadMetadata, cachedTransactionMetadata *CachedTransactionMetadata)) {
	tangle.GetApprovers(payloadId).Consume(func(approver *PayloadApprover) {
		approvingPayloadId := approver.GetApprovingPayloadId()
		approvingCachedPayload := tangle.GetPayload(approvingPayloadId)

		approvingCachedPayload.Consume(func(payload *payload.Payload) {
			consume(approvingCachedPayload, tangle.GetPayloadMetadata(approvingPayloadId), tangle.GetTransactionMetadata(payload.Transaction().Id()))
		})
	})
}

func (tangle *Tangle) ForEachConsumers(currentTransaction *transaction.Transaction, consume func(payload *payload.CachedPayload, payloadMetadata *CachedPayloadMetadata, cachedTransactionMetadata *CachedTransactionMetadata)) {
	seenTransactions := make(map[transaction.Id]types.Empty)
	currentTransaction.Outputs().ForEach(func(address address.Address, balances []*balance.Balance) bool {
		tangle.GetConsumers(transaction.NewOutputId(address, currentTransaction.Id())).Consume(func(consumer *Consumer) {
			// keep track of the processed transactions (the same transaction can consume multiple outputs)
			if _, transactionSeen := seenTransactions[consumer.TransactionId()]; transactionSeen {
				seenTransactions[consumer.TransactionId()] = types.Void

				transactionMetadata := tangle.GetTransactionMetadata(consumer.TransactionId())

				// retrieve all the payloads that attached the transaction
				tangle.GetAttachments(consumer.TransactionId()).Consume(func(attachment *Attachment) {
					consume(tangle.GetPayload(attachment.PayloadId()), tangle.GetPayloadMetadata(attachment.PayloadId()), transactionMetadata)
				})
			}
		})

		return true
	})
}

// isPayloadSolid returns true if the given payload is solid. A payload is considered to be solid solid, if it is either
// already marked as solid or if its referenced payloads are marked as solid.
func (tangle *Tangle) isPayloadSolid(payload *payload.Payload, metadata *PayloadMetadata) bool {
	if payload == nil || payload.IsDeleted() {
		return false
	}

	if metadata == nil || metadata.IsDeleted() {
		return false
	}

	if metadata.IsSolid() {
		return true
	}

	return tangle.isPayloadMarkedAsSolid(payload.TrunkId()) && tangle.isPayloadMarkedAsSolid(payload.BranchId())
}

// isPayloadMarkedAsSolid returns true if the payload was marked as solid already (by setting the corresponding flags
// in its metadata.
func (tangle *Tangle) isPayloadMarkedAsSolid(payloadId payload.Id) bool {
	if payloadId == payload.GenesisId {
		return true
	}

	transactionMetadataCached := tangle.GetPayloadMetadata(payloadId)
	if transactionMetadata := transactionMetadataCached.Unwrap(); transactionMetadata == nil {
		transactionMetadataCached.Release()

		// if transaction is missing and was not reported as missing, yet
		if cachedMissingPayload, missingPayloadStored := tangle.missingPayloadStorage.StoreIfAbsent(NewMissingPayload(payloadId)); missingPayloadStored {
			cachedMissingPayload.Consume(func(object objectstorage.StorableObject) {
				tangle.Events.PayloadMissing.Trigger(object.(*MissingPayload).GetId())
			})
		}

		return false
	} else if !transactionMetadata.IsSolid() {
		transactionMetadataCached.Release()

		return false
	}
	transactionMetadataCached.Release()

	return true
}

func (tangle *Tangle) isTransactionSolid(tx *transaction.Transaction, metadata *TransactionMetadata) (bool, error) {
	// abort if any of the models are nil or has been deleted
	if tx == nil || tx.IsDeleted() || metadata == nil || metadata.IsDeleted() {
		return false, nil
	}

	// abort if we have previously determined the solidity status of the transaction already
	if metadata.Solid() {
		return true, nil
	}

	// get outputs that were referenced in the transaction inputs
	cachedInputs := tangle.getCachedOutputsFromTransactionInputs(tx)
	defer cachedInputs.Release()

	// check the solidity of the inputs and retrieve the consumed balances
	inputsSolid, consumedBalances, err := tangle.checkTransactionInputs(cachedInputs)

	// abort if an error occurred or the inputs are not solid, yet
	if !inputsSolid || err != nil {
		return false, err
	}

	if !tangle.checkTransactionOutputs(consumedBalances, tx.Outputs()) {
		return false, fmt.Errorf("the outputs do not match the inputs in transaction with id '%s'", tx.Id())
	}

	return true, nil
}

func (tangle *Tangle) getCachedOutputsFromTransactionInputs(tx *transaction.Transaction) (result CachedOutputs) {
	result = make(CachedOutputs)
	tx.Inputs().ForEach(func(inputId transaction.OutputId) bool {
		result[inputId] = tangle.GetTransactionOutput(inputId)

		return true
	})

	return
}

func (tangle *Tangle) checkTransactionInputs(cachedInputs CachedOutputs) (inputsSolid bool, consumedBalances map[balance.Color]int64, err error) {
	inputsSolid = true
	consumedBalances = make(map[balance.Color]int64)

	for inputId, cachedInput := range cachedInputs {
		if !cachedInput.Exists() {
			inputsSolid = false

			if cachedMissingOutput, missingOutputStored := tangle.missingOutputStorage.StoreIfAbsent(NewMissingOutput(inputId)); missingOutputStored {
				cachedMissingOutput.Consume(func(object objectstorage.StorableObject) {
					tangle.Events.OutputMissing.Trigger(object.(*MissingOutput).Id())
				})
			}

			continue
		}

		// should never be nil as we check Exists() before
		input := cachedInput.Unwrap()

		// update solid status
		inputsSolid = inputsSolid && input.Solid()

		// calculate the input balances
		for _, inputBalance := range input.Balances() {
			var newBalance int64
			if currentBalance, balanceExists := consumedBalances[inputBalance.Color()]; balanceExists {
				// check overflows in the numbers
				if inputBalance.Value() > math.MaxInt64-currentBalance {
					err = fmt.Errorf("buffer overflow in balances of inputs")

					return
				}

				newBalance = currentBalance + inputBalance.Value()
			} else {
				newBalance = inputBalance.Value()
			}
			consumedBalances[inputBalance.Color()] = newBalance
		}
	}

	return
}

// checkTransactionOutputs is a utility function that returns true, if the outputs are consuming all of the given inputs
// (the sum of all the balance changes is 0). It also accounts for the ability to "recolor" coins during the creating of
// outputs. If this function returns false, then the outputs that are defined in the transaction are invalid and the
// transaction should be removed from the ledger state.
func (tangle *Tangle) checkTransactionOutputs(inputBalances map[balance.Color]int64, outputs *transaction.Outputs) bool {
	// create a variable to keep track of outputs that create a new color
	var newlyColoredCoins int64

	// iterate through outputs and check them one by one
	aborted := !outputs.ForEach(func(address address.Address, balances []*balance.Balance) bool {
		for _, outputBalance := range balances {
			// abort if the output creates a negative or empty output
			if outputBalance.Value() <= 0 {
				return false
			}

			// sidestep logic if we have a newly colored output (we check the supply later)
			if outputBalance.Color() == balance.COLOR_NEW {
				// catch overflows
				if newlyColoredCoins > math.MaxInt64-outputBalance.Value() {
					return false
				}

				newlyColoredCoins += outputBalance.Value()

				continue
			}

			// check if the used color does not exist in our supply
			availableBalance, spentColorExists := inputBalances[outputBalance.Color()]
			if !spentColorExists {
				return false
			}

			// abort if we spend more coins of the given color than we have
			if availableBalance < outputBalance.Value() {
				return false
			}

			// subtract the spent coins from the supply of this transaction
			inputBalances[outputBalance.Color()] -= outputBalance.Value()

			// cleanup the entry in the supply map if we have exhausted all funds
			if inputBalances[outputBalance.Color()] == 0 {
				delete(inputBalances, outputBalance.Color())
			}
		}

		return true
	})

	// abort if the previous checks failed
	if aborted {
		return false
	}

	// determine the unspent inputs
	var unspentCoins int64
	for _, unspentBalance := range inputBalances {
		// catch overflows
		if unspentCoins > math.MaxInt64-unspentBalance {
			return false
		}

		unspentCoins += unspentBalance
	}

	// the outputs are valid if they spend all outputs
	return unspentCoins == newlyColoredCoins
}