From 24fbbae4f36b793bfb243fc3891cde9bc4236f22 Mon Sep 17 00:00:00 2001
From: Hans Moog <hm@mkjc.net>
Date: Fri, 3 Apr 2020 10:33:42 +0200
Subject: [PATCH] ValueTangle implementation (#320)

* Feat: refactored message + started to add tests

* Refactor: go mod tidy

* Refactor: continued to refactor message

* Feat: updated to last hive.go + added Signature() methid

* Feat: go mod tidy

* Feat: added mutex to the signature write in Bytes()

* Fix: fixed Signature method

* Feat: refactored message methods

* Feat: refactored output

* Fix: fixed signature mismatch in tangle factory methods

* Refactor: go mod tidy

* Feat: added solditification logic to value tangle

* Feat: fixed some code related to objectstorage factories
---
 go.sum                                        |   2 -
 .../binary/messagelayer/tangle/approver.go    |   6 +-
 .../messagelayer/tangle/missingmessage.go     |   2 +-
 .../binary/storageprefix/storageprefix.go     |   9 +-
 .../binary/valuetransfer/tangle/attachment.go |  38 +++
 .../binary/valuetransfer/tangle/consumer.go   | 100 +++++--
 .../binary/valuetransfer/tangle/tangle.go     | 274 ++++++++++++------
 .../tangle/transactionoutputmetadata.go       | 215 --------------
 .../valuetransfer/transaction/output.go       | 213 +++++++++++---
 .../valuetransfer/transaction/output_test.go  |  35 +++
 .../valuetransfer/transaction/transaction.go  |   4 +
 11 files changed, 511 insertions(+), 387 deletions(-)
 delete mode 100644 packages/binary/valuetransfer/tangle/transactionoutputmetadata.go

diff --git a/go.sum b/go.sum
index 4faabd5a..956e2e53 100644
--- a/go.sum
+++ b/go.sum
@@ -131,8 +131,6 @@ github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T
 github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
 github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
 github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
-github.com/iotaledger/hive.go v0.0.0-20200329235804-a34899d73dc0 h1:15y1xve54VKQbtt27BYBTsPeGsUQ0vqg96AtS7lRLss=
-github.com/iotaledger/hive.go v0.0.0-20200329235804-a34899d73dc0/go.mod h1:4sloxRutRhCuXgAgtOu1ZxVM95Na+ovK9MRDEQGZlzw=
 github.com/iotaledger/hive.go v0.0.0-20200330121034-e4a505bcf2cd h1:GZ9zGBj+tK1jHqTD5+OoPLVVlk/sB2pkKmQt9vjR8uY=
 github.com/iotaledger/hive.go v0.0.0-20200330121034-e4a505bcf2cd/go.mod h1:LYUD1U+BxF+OY6zCZ4xp38vzjp/QWbUdCw9iwmxkGnc=
 github.com/iotaledger/iota.go v1.0.0-beta.9/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8=
diff --git a/packages/binary/messagelayer/tangle/approver.go b/packages/binary/messagelayer/tangle/approver.go
index 9c638ed1..f705b45a 100644
--- a/packages/binary/messagelayer/tangle/approver.go
+++ b/packages/binary/messagelayer/tangle/approver.go
@@ -69,12 +69,10 @@ func ApproverFromStorageKey(key []byte, optionalTargetObject ...*Approver) (resu
 
 	// parse the properties that are stored in the key
 	marshalUtil := marshalutil.New(key)
-	result.(*Approver).referencedMessageId, err = message.ParseId(marshalUtil)
-	if err != nil {
+	if result.(*Approver).referencedMessageId, err = message.ParseId(marshalUtil); err != nil {
 		return
 	}
-	result.(*Approver).approvingMessageId, err = message.ParseId(marshalUtil)
-	if err != nil {
+	if result.(*Approver).approvingMessageId, err = message.ParseId(marshalUtil); err != nil {
 		return
 	}
 	consumedBytes = marshalUtil.ReadOffset()
diff --git a/packages/binary/messagelayer/tangle/missingmessage.go b/packages/binary/messagelayer/tangle/missingmessage.go
index e05d11f0..929b56bf 100644
--- a/packages/binary/messagelayer/tangle/missingmessage.go
+++ b/packages/binary/messagelayer/tangle/missingmessage.go
@@ -27,7 +27,7 @@ func MissingMessageFromStorageKey(key []byte, optionalTargetObject ...*MissingMe
 	// determine the target object that will hold the unmarshaled information
 	switch len(optionalTargetObject) {
 	case 0:
-		result = &Approver{}
+		result = &MissingMessage{}
 	case 1:
 		result = optionalTargetObject[0]
 	default:
diff --git a/packages/binary/storageprefix/storageprefix.go b/packages/binary/storageprefix/storageprefix.go
index efafc56e..10bb2099 100644
--- a/packages/binary/storageprefix/storageprefix.go
+++ b/packages/binary/storageprefix/storageprefix.go
@@ -14,9 +14,10 @@ var (
 	ValueTransferMissingPayload  = []byte{8}
 	ValueTransferAttachment      = []byte{9}
 	ValueTransferConsumer        = []byte{10}
+	ValueTangleOutputs           = []byte{11}
 
-	LedgerStateTransferOutput        = []byte{11}
-	LedgerStateTransferOutputBooking = []byte{12}
-	LedgerStateReality               = []byte{13}
-	LedgerStateConflictSet           = []byte{14}
+	LedgerStateTransferOutput        = []byte{12}
+	LedgerStateTransferOutputBooking = []byte{13}
+	LedgerStateReality               = []byte{14}
+	LedgerStateConflictSet           = []byte{15}
 )
diff --git a/packages/binary/valuetransfer/tangle/attachment.go b/packages/binary/valuetransfer/tangle/attachment.go
index c7dcd552..ebeb7a7d 100644
--- a/packages/binary/valuetransfer/tangle/attachment.go
+++ b/packages/binary/valuetransfer/tangle/attachment.go
@@ -130,3 +130,41 @@ var _ objectstorage.StorableObject = &Attachment{}
 
 // AttachmentLength holds the length of a marshaled Attachment in bytes.
 const AttachmentLength = transaction.IdLength + payload.IdLength
+
+// region CachedAttachment /////////////////////////////////////////////////////////////////////////////////////////////
+
+type CachedAttachment struct {
+	objectstorage.CachedObject
+}
+
+func (cachedAttachment *CachedAttachment) Unwrap() *Attachment {
+	if untypedObject := cachedAttachment.Get(); untypedObject == nil {
+		return nil
+	} else {
+		if typedObject := untypedObject.(*Attachment); typedObject == nil || typedObject.IsDeleted() {
+			return nil
+		} else {
+			return typedObject
+		}
+	}
+}
+
+func (cachedAttachment *CachedAttachment) Consume(consumer func(attachment *Attachment)) (consumed bool) {
+	return cachedAttachment.CachedObject.Consume(func(object objectstorage.StorableObject) {
+		consumer(object.(*Attachment))
+	})
+}
+
+type CachedAttachments []*CachedAttachment
+
+func (cachedAttachments CachedAttachments) Consume(consumer func(attachment *Attachment)) (consumed bool) {
+	for _, cachedAttachment := range cachedAttachments {
+		consumed = cachedAttachment.Consume(func(output *Attachment) {
+			consumer(output)
+		}) || consumed
+	}
+
+	return
+}
+
+// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/packages/binary/valuetransfer/tangle/consumer.go b/packages/binary/valuetransfer/tangle/consumer.go
index 9dadca65..bbb5bfc4 100644
--- a/packages/binary/valuetransfer/tangle/consumer.go
+++ b/packages/binary/valuetransfer/tangle/consumer.go
@@ -35,6 +35,36 @@ func NewConsumer(consumedInput transaction.OutputId, transactionId transaction.I
 // ConsumerFromBytes unmarshals a Consumer from a sequence of bytes - it either creates a new object or fills the
 // optionally provided one with the parsed information.
 func ConsumerFromBytes(bytes []byte, optionalTargetObject ...*Consumer) (result *Consumer, err error, consumedBytes int) {
+	marshalUtil := marshalutil.New(bytes)
+	result, err = ParseConsumer(marshalUtil, optionalTargetObject...)
+	consumedBytes = marshalUtil.ReadOffset()
+
+	return
+}
+
+func ParseConsumer(marshalUtil *marshalutil.MarshalUtil, optionalTargetObject ...*Consumer) (result *Consumer, err error) {
+	if parsedObject, parseErr := marshalUtil.Parse(func(data []byte) (interface{}, error, int) {
+		return ConsumerFromStorageKey(data, optionalTargetObject...)
+	}); parseErr != nil {
+		err = parseErr
+
+		return
+	} else {
+		result = parsedObject.(*Consumer)
+	}
+
+	if _, err = marshalUtil.Parse(func(data []byte) (parseResult interface{}, parseErr error, parsedBytes int) {
+		parseErr, parsedBytes = result.UnmarshalObjectStorageValue(data)
+
+		return
+	}); err != nil {
+		return
+	}
+
+	return
+}
+
+func ConsumerFromStorageKey(key []byte, optionalTargetObject ...*Consumer) (result objectstorage.StorableObject, err error, consumedBytes int) {
 	// determine the target object that will hold the unmarshaled information
 	switch len(optionalTargetObject) {
 	case 0:
@@ -42,43 +72,23 @@ func ConsumerFromBytes(bytes []byte, optionalTargetObject ...*Consumer) (result
 	case 1:
 		result = optionalTargetObject[0]
 	default:
-		panic("too many arguments in call to ConsumerFromBytes")
+		panic("too many arguments in call to ConsumerFromStorageKey")
 	}
 
-	// parse the bytes
-	marshalUtil := marshalutil.New(bytes)
-	if result.consumedInput, err = transaction.ParseOutputId(marshalUtil); err != nil {
+	// parse the properties that are stored in the key
+	marshalUtil := marshalutil.New(key)
+	if result.(*Consumer).consumedInput, err = transaction.ParseOutputId(marshalUtil); err != nil {
 		return
 	}
-	if result.transactionId, err = transaction.ParseId(marshalUtil); err != nil {
+	if result.(*Consumer).transactionId, err = transaction.ParseId(marshalUtil); err != nil {
 		return
 	}
-	result.storageKey = marshalutil.New(bytes[:ConsumerLength]).Bytes(true)
+	result.(*Consumer).storageKey = marshalutil.New(key[:marshalUtil.ReadOffset()]).Bytes(true)
 	consumedBytes = marshalUtil.ReadOffset()
 
 	return
 }
 
-// Parse is a wrapper for simplified unmarshaling of Consumers from a byte stream using the marshalUtil package.
-func ParseConsumer(marshalUtil *marshalutil.MarshalUtil) (*Consumer, error) {
-	if consumer, err := marshalUtil.Parse(func(data []byte) (interface{}, error, int) { return ConsumerFromBytes(data) }); err != nil {
-		return nil, err
-	} else {
-		return consumer.(*Consumer), nil
-	}
-}
-
-// ConsumerFromStorage gets called when we restore an Consumer from the storage - it parses the key bytes and
-// returns the new object.
-func ConsumerFromStorage(keyBytes []byte) objectstorage.StorableObject {
-	result, err, _ := ConsumerFromBytes(keyBytes)
-	if err != nil {
-		panic(err)
-	}
-
-	return result
-}
-
 // ConsumedInput returns the OutputId of the Consumer.
 func (consumer *Consumer) ConsumedInput() transaction.OutputId {
 	return consumer.consumedInput
@@ -129,3 +139,41 @@ var _ objectstorage.StorableObject = &Consumer{}
 
 // ConsumerLength holds the length of a marshaled Consumer in bytes.
 const ConsumerLength = transaction.OutputIdLength + transaction.IdLength
+
+// region CachedConsumer /////////////////////////////////////////////////////////////////////////////////////////////////
+
+type CachedConsumer struct {
+	objectstorage.CachedObject
+}
+
+func (cachedConsumer *CachedConsumer) Unwrap() *Consumer {
+	if untypedObject := cachedConsumer.Get(); untypedObject == nil {
+		return nil
+	} else {
+		if typedObject := untypedObject.(*Consumer); typedObject == nil || typedObject.IsDeleted() {
+			return nil
+		} else {
+			return typedObject
+		}
+	}
+}
+
+func (cachedConsumer *CachedConsumer) Consume(consumer func(consumer *Consumer)) (consumed bool) {
+	return cachedConsumer.CachedObject.Consume(func(object objectstorage.StorableObject) {
+		consumer(object.(*Consumer))
+	})
+}
+
+type CachedConsumers []*CachedConsumer
+
+func (cachedConsumers CachedConsumers) Consume(consumer func(consumer *Consumer)) (consumed bool) {
+	for _, cachedConsumer := range cachedConsumers {
+		consumed = cachedConsumer.Consume(func(output *Consumer) {
+			consumer(output)
+		}) || consumed
+	}
+
+	return
+}
+
+// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/packages/binary/valuetransfer/tangle/tangle.go b/packages/binary/valuetransfer/tangle/tangle.go
index 01b00b50..61bb3ca1 100644
--- a/packages/binary/valuetransfer/tangle/tangle.go
+++ b/packages/binary/valuetransfer/tangle/tangle.go
@@ -7,8 +7,11 @@ import (
 	"github.com/dgraph-io/badger/v2"
 	"github.com/iotaledger/hive.go/async"
 	"github.com/iotaledger/hive.go/objectstorage"
+	"github.com/iotaledger/hive.go/types"
 
 	"github.com/iotaledger/goshimmer/packages/binary/storageprefix"
+	"github.com/iotaledger/goshimmer/packages/binary/valuetransfer/address"
+	"github.com/iotaledger/goshimmer/packages/binary/valuetransfer/balance"
 	"github.com/iotaledger/goshimmer/packages/binary/valuetransfer/payload"
 	"github.com/iotaledger/goshimmer/packages/binary/valuetransfer/transaction"
 )
@@ -24,9 +27,9 @@ type Tangle struct {
 	missingPayloadStorage  *objectstorage.ObjectStorage
 	attachmentStorage      *objectstorage.ObjectStorage
 
-	consumerStorage                  *objectstorage.ObjectStorage
-	transactionOutputMetadataStorage *objectstorage.ObjectStorage
-	missingOutputStorage             *objectstorage.ObjectStorage
+	outputStorage        *objectstorage.ObjectStorage
+	consumerStorage      *objectstorage.ObjectStorage
+	missingOutputStorage *objectstorage.ObjectStorage
 
 	Events Events
 
@@ -46,9 +49,9 @@ func New(badgerInstance *badger.DB, storageId []byte) (result *Tangle) {
 		approverStorage:        objectstorage.New(badgerInstance, append(storageId, storageprefix.ValueTransferApprover...), PayloadApproverFromStorageKey, objectstorage.CacheTime(time.Second), objectstorage.PartitionKey(payload.IdLength, payload.IdLength), objectstorage.KeysOnly(true)),
 
 		// transaction related storage
-		transactionOutputMetadataStorage: objectstorage.New(badgerInstance, append(storageId, storageprefix.Layer0Approvers...), transaction.OutputFromStorageKey, objectstorage.CacheTime(time.Second)),
-		missingOutputStorage:             objectstorage.New(badgerInstance, append(storageId, storageprefix.ValueTransferMissingPayload...), MissingOutputFromStorageKey, objectstorage.CacheTime(time.Second)),
-		consumerStorage:                  objectstorage.New(badgerInstance, append(storageId, storageprefix.ValueTransferConsumer...), transaction.OutputFromStorageKey, objectstorage.CacheTime(time.Second)),
+		outputStorage:        objectstorage.New(badgerInstance, append(storageId, storageprefix.ValueTangleOutputs...), outputFromStorageKey, objectstorage.PartitionKey(transaction.OutputKeyPartitions...), objectstorage.CacheTime(time.Second)),
+		missingOutputStorage: objectstorage.New(badgerInstance, append(storageId, storageprefix.ValueTransferMissingPayload...), MissingOutputFromStorageKey, objectstorage.CacheTime(time.Second)),
+		consumerStorage:      objectstorage.New(badgerInstance, append(storageId, storageprefix.ValueTransferConsumer...), consumerFromStorageKey, objectstorage.CacheTime(time.Second)),
 
 		attachmentStorage: objectstorage.New(badgerInstance, append(storageId, storageprefix.ValueTransferAttachment...), AttachmentFromStorageKey, objectstorage.CacheTime(time.Second)),
 
@@ -80,18 +83,46 @@ func (tangle *Tangle) GetTransactionMetadata(transactionId transaction.Id) *Cach
 	return &CachedTransactionMetadata{CachedObject: tangle.missingOutputStorage.Load(transactionId.Bytes())}
 }
 
+func (tangle *Tangle) GetTransactionOutput(outputId transaction.OutputId) *transaction.CachedOutput {
+	return &transaction.CachedOutput{CachedObject: tangle.outputStorage.Load(outputId.Bytes())}
+}
+
 // GetApprovers retrieves the approvers of a payload from the object storage.
-func (tangle *Tangle) GetApprovers(transactionId payload.Id) CachedApprovers {
+func (tangle *Tangle) GetApprovers(payloadId payload.Id) CachedApprovers {
 	approvers := make(CachedApprovers, 0)
 	tangle.approverStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool {
 		approvers = append(approvers, &CachedPayloadApprover{CachedObject: cachedObject})
 
 		return true
-	}, transactionId[:])
+	}, payloadId.Bytes())
 
 	return approvers
 }
 
+// GetApprovers retrieves the approvers of a payload from the object storage.
+func (tangle *Tangle) GetConsumers(outputId transaction.OutputId) CachedConsumers {
+	consumers := make(CachedConsumers, 0)
+	tangle.consumerStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool {
+		consumers = append(consumers, &CachedConsumer{CachedObject: cachedObject})
+
+		return true
+	}, outputId.Bytes())
+
+	return consumers
+}
+
+// GetApprovers retrieves the approvers of a payload from the object storage.
+func (tangle *Tangle) GetAttachments(transactionId transaction.Id) CachedAttachments {
+	attachments := make(CachedAttachments, 0)
+	tangle.attachmentStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool {
+		attachments = append(attachments, &CachedAttachment{CachedObject: cachedObject})
+
+		return true
+	}, transactionId.Bytes())
+
+	return attachments
+}
+
 // Shutdown stops the worker pools and shuts down the object storage instances.
 func (tangle *Tangle) Shutdown() *Tangle {
 	tangle.storePayloadWorkerPool.ShutdownGracefully()
@@ -124,61 +155,85 @@ func (tangle *Tangle) Prune() error {
 
 // storePayloadWorker is the worker function that stores the payload and calls the corresponding storage events.
 func (tangle *Tangle) storePayloadWorker(payloadToStore *payload.Payload) {
-	// store payload
-	var cachedPayload *payload.CachedPayload
+	// store the payload and transaction models
+	cachedPayload, cachedPayloadMetadata, payloadStored := tangle.storePayload(payloadToStore)
+	if !payloadStored {
+		// abort if we have seen the payload already
+		return
+	}
+	cachedTransactionMetadata, transactionStored := tangle.storeTransaction(payloadToStore.Transaction())
+
+	// store the references between the different entities (we do this after the actual entities were stored, so that
+	// all the metadata models exist in the database as soon as the entities are reachable by walks).
+	tangle.storePayloadReferences(payloadToStore)
+	if transactionStored {
+		tangle.storeTransactionReferences(payloadToStore.Transaction())
+	}
+
+	// trigger events
+	if tangle.missingPayloadStorage.DeleteIfPresent(payloadToStore.Id().Bytes()) {
+		tangle.Events.MissingPayloadReceived.Trigger(cachedPayload, cachedPayloadMetadata)
+	}
+	tangle.Events.PayloadAttached.Trigger(cachedPayload, cachedPayloadMetadata)
+
+	// check solidity
+	tangle.solidifierWorkerPool.Submit(func() {
+		tangle.solidifyTransactionWorker(cachedPayload, cachedPayloadMetadata, cachedTransactionMetadata)
+	})
+}
+
+func (tangle *Tangle) storePayload(payloadToStore *payload.Payload) (cachedPayload *payload.CachedPayload, cachedMetadata *CachedPayloadMetadata, payloadStored bool) {
 	if _tmp, transactionIsNew := tangle.payloadStorage.StoreIfAbsent(payloadToStore); !transactionIsNew {
 		return
 	} else {
 		cachedPayload = &payload.CachedPayload{CachedObject: _tmp}
-	}
+		cachedMetadata = &CachedPayloadMetadata{CachedObject: tangle.payloadMetadataStorage.Store(NewPayloadMetadata(payloadToStore.Id()))}
+		payloadStored = true
 
-	// store payload metadata
-	payloadId := payloadToStore.Id()
-	cachedMetadata := &CachedPayloadMetadata{CachedObject: tangle.payloadMetadataStorage.Store(NewPayloadMetadata(payloadId))}
+		return
+	}
+}
 
-	// retrieve or store TransactionMetadata
-	newTransaction := false
-	transactionId := cachedPayload.Unwrap().Transaction().Id()
-	cachedTransactionMetadata := &CachedTransactionMetadata{CachedObject: tangle.payloadMetadataStorage.ComputeIfAbsent(transactionId.Bytes(), func(key []byte) objectstorage.StorableObject {
-		newTransaction = true
+func (tangle *Tangle) storeTransaction(tx *transaction.Transaction) (cachedTransactionMetadata *CachedTransactionMetadata, transactionStored bool) {
+	cachedTransactionMetadata = &CachedTransactionMetadata{CachedObject: tangle.payloadMetadataStorage.ComputeIfAbsent(tx.Id().Bytes(), func(key []byte) objectstorage.StorableObject {
+		transactionStored = true
 
-		result := NewTransactionMetadata(transactionId)
+		result := NewTransactionMetadata(tx.Id())
 		result.Persist()
 		result.SetModified()
 
 		return result
 	})}
 
+	if transactionStored {
+		tx.Outputs().ForEach(func(address address.Address, balances []*balance.Balance) {
+			tangle.outputStorage.Store(transaction.NewOutput(address, tx.Id(), balances))
+		})
+	}
+
+	return
+}
+
+func (tangle *Tangle) storePayloadReferences(payload *payload.Payload) {
 	// store trunk approver
-	trunkId := payloadToStore.TrunkId()
-	tangle.approverStorage.Store(NewPayloadApprover(trunkId, payloadId)).Release()
+	trunkId := payload.TrunkId()
+	tangle.approverStorage.Store(NewPayloadApprover(trunkId, payload.Id())).Release()
 
 	// store branch approver
-	if branchId := payloadToStore.BranchId(); branchId != trunkId {
+	if branchId := payload.BranchId(); branchId != trunkId {
 		tangle.approverStorage.Store(NewPayloadApprover(branchId, trunkId)).Release()
 	}
 
-	// store the consumers, the first time we see a Transaction
-	if newTransaction {
-		payloadToStore.Transaction().Inputs().ForEach(func(outputId transaction.OutputId) bool {
-			tangle.consumerStorage.Store(NewConsumer(outputId, transactionId))
-
-			return true
-		})
-	}
-
-	// store attachment
-	tangle.attachmentStorage.StoreIfAbsent(NewAttachment(payloadToStore.Transaction().Id(), payloadToStore.Id()))
+	// store a reference from the transaction to the payload that attached it
+	tangle.attachmentStorage.Store(NewAttachment(payload.Transaction().Id(), payload.Id()))
+}
 
-	// trigger events
-	if tangle.missingPayloadStorage.DeleteIfPresent(payloadId.Bytes()) {
-		tangle.Events.MissingPayloadReceived.Trigger(cachedPayload, cachedMetadata)
-	}
-	tangle.Events.PayloadAttached.Trigger(cachedPayload, cachedMetadata)
+func (tangle *Tangle) storeTransactionReferences(tx *transaction.Transaction) {
+	// store references to the consumed outputs
+	tx.Inputs().ForEach(func(outputId transaction.OutputId) bool {
+		tangle.consumerStorage.Store(NewConsumer(outputId, tx.Id()))
 
-	// check solidity
-	tangle.solidifierWorkerPool.Submit(func() {
-		tangle.solidifyTransactionWorker(cachedPayload, cachedMetadata, cachedTransactionMetadata)
+		return true
 	})
 }
 
@@ -200,57 +255,94 @@ func (tangle *Tangle) solidifyTransactionWorker(cachedPayload *payload.CachedPay
 
 	// process payloads that are supposed to be checked for solidity recursively
 	for solidificationStack.Len() > 0 {
-		currentCachedPayload, currentCachedMetadata, currentCachedTransactionMetadata := popElementsFromStack(solidificationStack)
-
-		currentPayload := currentCachedPayload.Unwrap()
-		currentPayloadMetadata := currentCachedMetadata.Unwrap()
-		currentTransaction := currentPayload.Transaction()
-		currentTransactionMetadata := currentCachedTransactionMetadata.Unwrap()
-		if currentPayload == nil || currentPayloadMetadata == nil || currentTransactionMetadata == nil {
-			currentCachedPayload.Release()
-			currentCachedMetadata.Release()
-			currentCachedTransactionMetadata.Release()
-
-			continue
-		}
+		// execute logic inside a func, so we can use defer to release the objects
+		func() {
+			// retrieve cached objects
+			currentCachedPayload, currentCachedMetadata, currentCachedTransactionMetadata := popElementsFromStack(solidificationStack)
+			defer currentCachedPayload.Release()
+			defer currentCachedMetadata.Release()
+			defer currentCachedTransactionMetadata.Release()
+
+			// unwrap cached objects
+			currentPayload := currentCachedPayload.Unwrap()
+			currentPayloadMetadata := currentCachedMetadata.Unwrap()
+			currentTransactionMetadata := currentCachedTransactionMetadata.Unwrap()
+			currentTransaction := currentPayload.Transaction()
+
+			// abort if any of the retrieved models is nil
+			if currentPayload == nil || currentPayloadMetadata == nil || currentTransactionMetadata == nil {
+				return
+			}
 
-		// if current transaction and payload are solid ...
-		if tangle.isPayloadSolid(currentPayload, currentPayloadMetadata) && tangle.isTransactionSolid(currentTransaction, currentTransactionMetadata) {
+			// abort if the entities are not solid
+			if !tangle.isPayloadSolid(currentPayload, currentPayloadMetadata) || !tangle.isTransactionSolid(currentTransaction, currentTransactionMetadata) {
+				return
+			}
+
+			// abort if the payload was marked as solid already (if a payload is solid already then the tx is also solid)
 			payloadBecameSolid := currentPayloadMetadata.SetSolid(true)
+			if !payloadBecameSolid {
+				return
+			}
+
+			// set the transaction related entities to be solid
 			transactionBecameSolid := currentTransactionMetadata.SetSolid(true)
+			if transactionBecameSolid {
+				currentTransaction.Outputs().ForEach(func(address address.Address, balances []*balance.Balance) {
+					tangle.GetTransactionOutput(transaction.NewOutputId(address, currentTransaction.Id())).Consume(func(output *transaction.Output) {
+						output.SetSolid(true)
+					})
+				})
+			}
 
-			// if payload was marked as solid the first time ...
-			if payloadBecameSolid {
-				tangle.Events.PayloadSolid.Trigger(currentCachedPayload, currentCachedMetadata)
+			// ... trigger solid event ...
+			tangle.Events.PayloadSolid.Trigger(currentCachedPayload, currentCachedMetadata)
 
-				tangle.GetApprovers(currentPayload.Id()).Consume(func(approver *PayloadApprover) {
-					approvingPayloadId := approver.GetApprovingPayloadId()
-					approvingCachedPayload := tangle.GetPayload(approvingPayloadId)
+			// ... and schedule check of approvers
+			tangle.GetApprovers(currentPayload.Id()).Consume(func(approver *PayloadApprover) {
+				approvingPayloadId := approver.GetApprovingPayloadId()
+				approvingCachedPayload := tangle.GetPayload(approvingPayloadId)
 
-					approvingCachedPayload.Consume(func(payload *payload.Payload) {
-						solidificationStack.PushBack([3]interface{}{
-							approvingCachedPayload,
-							tangle.GetPayloadMetadata(approvingPayloadId),
-							tangle.GetTransactionMetadata(payload.Transaction().Id()),
-						})
+				approvingCachedPayload.Consume(func(payload *payload.Payload) {
+					solidificationStack.PushBack([3]interface{}{
+						approvingCachedPayload,
+						tangle.GetPayloadMetadata(approvingPayloadId),
+						tangle.GetTransactionMetadata(payload.Transaction().Id()),
 					})
 				})
+			})
+
+			if !transactionBecameSolid {
+				return
 			}
 
-			if transactionBecameSolid {
-				tangle.Events.TransactionSolid.Trigger(currentTransaction, currentTransactionMetadata)
+			tangle.Events.TransactionSolid.Trigger(currentTransaction, currentTransactionMetadata)
 
-				currentTransaction.Inputs().ForEach(func(outputId transaction.OutputId) bool {
-					return true
+			seenTransactions := make(map[transaction.Id]types.Empty)
+			currentTransaction.Outputs().ForEach(func(address address.Address, balances []*balance.Balance) {
+				tangle.GetTransactionOutput(transaction.NewOutputId(address, currentTransaction.Id())).Consume(func(output *transaction.Output) {
+					// trigger events
 				})
-				//tangle.GetConsumers(outputId)
-			}
-		}
 
-		// release cached objects
-		currentCachedPayload.Release()
-		currentCachedMetadata.Release()
-		currentCachedTransactionMetadata.Release()
+				tangle.GetConsumers(transaction.NewOutputId(address, currentTransaction.Id())).Consume(func(consumer *Consumer) {
+					// keep track of the processed transactions (the same transaction can consume multiple outputs)
+					if _, transactionSeen := seenTransactions[consumer.TransactionId()]; transactionSeen {
+						seenTransactions[consumer.TransactionId()] = types.Void
+
+						transactionMetadata := tangle.GetTransactionMetadata(consumer.TransactionId())
+
+						// retrieve all the payloads that attached the transaction
+						tangle.GetAttachments(consumer.TransactionId()).Consume(func(attachment *Attachment) {
+							solidificationStack.PushBack([3]interface{}{
+								tangle.GetPayload(attachment.PayloadId()),
+								tangle.GetPayloadMetadata(attachment.PayloadId()),
+								transactionMetadata,
+							})
+						})
+					}
+				})
+			})
+		}()
 	}
 }
 
@@ -271,17 +363,13 @@ func (tangle *Tangle) isTransactionSolid(transaction *transaction.Transaction, m
 	return transaction.Inputs().ForEach(tangle.isOutputMarkedAsSolid)
 }
 
-func (tangle *Tangle) GetTransferOutputMetadata(transactionOutputId transaction.OutputId) *CachedTransactionOutputMetadata {
-	return &CachedTransactionOutputMetadata{CachedObject: tangle.transactionOutputMetadataStorage.Load(transactionOutputId.Bytes())}
-}
-
-func (tangle *Tangle) isOutputMarkedAsSolid(transferOutputId transaction.OutputId) (result bool) {
-	objectConsumed := tangle.GetTransferOutputMetadata(transferOutputId).Consume(func(transferOutputMetadata *TransactionOutputMetadata) {
-		result = transferOutputMetadata.Solid()
+func (tangle *Tangle) isOutputMarkedAsSolid(transactionOutputId transaction.OutputId) (result bool) {
+	outputExists := tangle.GetTransactionOutput(transactionOutputId).Consume(func(output *transaction.Output) {
+		result = output.Solid()
 	})
 
-	if !objectConsumed {
-		if cachedMissingOutput, missingOutputStored := tangle.missingOutputStorage.StoreIfAbsent(NewMissingOutput(transferOutputId)); missingOutputStored {
+	if !outputExists {
+		if cachedMissingOutput, missingOutputStored := tangle.missingOutputStorage.StoreIfAbsent(NewMissingOutput(transactionOutputId)); missingOutputStored {
 			cachedMissingOutput.Consume(func(object objectstorage.StorableObject) {
 				tangle.Events.OutputMissing.Trigger(object.(*MissingOutput).Id())
 			})
@@ -337,3 +425,11 @@ func (tangle *Tangle) isPayloadMarkedAsSolid(payloadId payload.Id) bool {
 
 	return true
 }
+
+func outputFromStorageKey(key []byte) (objectstorage.StorableObject, error, int) {
+	return transaction.OutputFromStorageKey(key)
+}
+
+func consumerFromStorageKey(key []byte) (objectstorage.StorableObject, error, int) {
+	return ConsumerFromStorageKey(key)
+}
diff --git a/packages/binary/valuetransfer/tangle/transactionoutputmetadata.go b/packages/binary/valuetransfer/tangle/transactionoutputmetadata.go
deleted file mode 100644
index a4c7e976..00000000
--- a/packages/binary/valuetransfer/tangle/transactionoutputmetadata.go
+++ /dev/null
@@ -1,215 +0,0 @@
-package tangle
-
-import (
-	"sync"
-	"time"
-
-	"github.com/iotaledger/hive.go/marshalutil"
-	"github.com/iotaledger/hive.go/objectstorage"
-	"github.com/iotaledger/hive.go/stringify"
-
-	"github.com/iotaledger/goshimmer/packages/binary/valuetransfer/transaction"
-)
-
-// TransactionOutputMetadata contains the information of a transaction output, that are based on our local perception of things (i.e. if it
-// is solid, or when we it became solid).
-type TransactionOutputMetadata struct {
-	objectstorage.StorableObjectFlags
-
-	id                 transaction.OutputId
-	solid              bool
-	solidificationTime time.Time
-
-	solidMutex              sync.RWMutex
-	solidificationTimeMutex sync.RWMutex
-}
-
-// NewOutputMetadata is the constructor for the TransactionOutputMetadata type.
-func NewTransactionOutputMetadata(outputId transaction.OutputId) *TransactionOutputMetadata {
-	return &TransactionOutputMetadata{
-		id: outputId,
-	}
-}
-
-// TransactionOutputMetadataFromBytes unmarshals a TransactionOutputMetadata object from a sequence of bytes.
-// It either creates a new object or fills the optionally provided object with the parsed information.
-func TransactionOutputMetadataFromBytes(bytes []byte, optionalTargetObject ...*TransactionOutputMetadata) (result *TransactionOutputMetadata, err error, consumedBytes int) {
-	// determine the target object that will hold the unmarshaled information
-	switch len(optionalTargetObject) {
-	case 0:
-		result = &TransactionOutputMetadata{}
-	case 1:
-		result = optionalTargetObject[0]
-	default:
-		panic("too many arguments in call to TransactionOutputMetadataFromBytes")
-	}
-
-	// parse the bytes
-	marshalUtil := marshalutil.New(bytes)
-	if result.id, err = transaction.ParseOutputId(marshalUtil); err != nil {
-		return
-	}
-	if result.solidificationTime, err = marshalUtil.ReadTime(); err != nil {
-		return
-	}
-	if result.solid, err = marshalUtil.ReadBool(); err != nil {
-		return
-	}
-	consumedBytes = marshalUtil.ReadOffset()
-
-	return
-}
-
-// TransactionOutputMetadataFromStorage is the factory method for TransactionOutputMetadata objects stored in the objectstorage. The bytes and the content
-// will be filled by the objectstorage, by subsequently calling ObjectStorageValue.
-func TransactionOutputMetadataFromStorage(storageKey []byte) objectstorage.StorableObject {
-	result := &TransactionOutputMetadata{}
-
-	var err error
-	if result.id, err = transaction.ParseOutputId(marshalutil.New(storageKey)); err != nil {
-		panic(err)
-	}
-
-	return result
-}
-
-// Parse is a wrapper for simplified unmarshaling of TransactionOutputMetadata objects from a byte stream using the marshalUtil package.
-func ParseTransactionOutputMetadata(marshalUtil *marshalutil.MarshalUtil) (*TransactionOutputMetadata, error) {
-	if outputMetadata, err := marshalUtil.Parse(func(data []byte) (interface{}, error, int) { return TransactionOutputMetadataFromBytes(data) }); err != nil {
-		return nil, err
-	} else {
-		return outputMetadata.(*TransactionOutputMetadata), nil
-	}
-}
-
-// OutputId returns the id of the Output that this TransactionOutputMetadata is associated to.
-func (transactionOutputMetadata *TransactionOutputMetadata) Id() transaction.OutputId {
-	return transactionOutputMetadata.id
-}
-
-// Solid returns true if the Output has been marked as solid.
-func (transactionOutputMetadata *TransactionOutputMetadata) Solid() (result bool) {
-	transactionOutputMetadata.solidMutex.RLock()
-	result = transactionOutputMetadata.solid
-	transactionOutputMetadata.solidMutex.RUnlock()
-
-	return
-}
-
-// SetSolid marks a Output as either solid or not solid.
-// It returns true if the solid flag was changes and automatically updates the solidificationTime as well.
-func (transactionOutputMetadata *TransactionOutputMetadata) SetSolid(solid bool) (modified bool) {
-	transactionOutputMetadata.solidMutex.RLock()
-	if transactionOutputMetadata.solid != solid {
-		transactionOutputMetadata.solidMutex.RUnlock()
-
-		transactionOutputMetadata.solidMutex.Lock()
-		if transactionOutputMetadata.solid != solid {
-			transactionOutputMetadata.solid = solid
-			if solid {
-				transactionOutputMetadata.solidificationTimeMutex.Lock()
-				transactionOutputMetadata.solidificationTime = time.Now()
-				transactionOutputMetadata.solidificationTimeMutex.Unlock()
-			}
-
-			transactionOutputMetadata.SetModified()
-
-			modified = true
-		}
-		transactionOutputMetadata.solidMutex.Unlock()
-
-	} else {
-		transactionOutputMetadata.solidMutex.RUnlock()
-	}
-
-	return
-}
-
-// SoldificationTime returns the time when the Output was marked to be solid.
-func (transactionOutputMetadata *TransactionOutputMetadata) SoldificationTime() time.Time {
-	transactionOutputMetadata.solidificationTimeMutex.RLock()
-	defer transactionOutputMetadata.solidificationTimeMutex.RUnlock()
-
-	return transactionOutputMetadata.solidificationTime
-}
-
-// Bytes marshals the TransactionOutputMetadata object into a sequence of bytes.
-func (transactionOutputMetadata *TransactionOutputMetadata) Bytes() []byte {
-	marshalUtil := marshalutil.New()
-
-	marshalUtil.WriteBytes(transactionOutputMetadata.id.Bytes())
-	marshalUtil.WriteTime(transactionOutputMetadata.solidificationTime)
-	marshalUtil.WriteBool(transactionOutputMetadata.solid)
-
-	return marshalUtil.Bytes()
-}
-
-// String creates a human readable version of the metadata (for debug purposes).
-func (transactionOutputMetadata *TransactionOutputMetadata) String() string {
-	return stringify.Struct("transaction.TransactionOutputMetadata",
-		stringify.StructField("payloadId", transactionOutputMetadata.Id()),
-		stringify.StructField("solid", transactionOutputMetadata.Solid()),
-		stringify.StructField("solidificationTime", transactionOutputMetadata.SoldificationTime()),
-	)
-}
-
-// ObjectStorageKey returns the key that is used to identify the TransactionOutputMetadata in the objectstorage.
-func (transactionOutputMetadata *TransactionOutputMetadata) ObjectStorageKey() []byte {
-	return transactionOutputMetadata.id.Bytes()
-}
-
-// ObjectStorageValue returns the bytes, that are stored in the value part of the k/v store.
-func (transactionOutputMetadata *TransactionOutputMetadata) ObjectStorageValue() []byte {
-	return transactionOutputMetadata.Bytes()
-}
-
-// UnmarshalObjectStorageValue restores the values of a TransactionOutputMetadata object from a sequence of bytes and matches the
-// encoding.BinaryUnmarshaler interface.
-func (transactionOutputMetadata *TransactionOutputMetadata) UnmarshalObjectStorageValue(data []byte) (err error, consumedBytes int) {
-	_, err, consumedBytes = TransactionOutputMetadataFromBytes(data, transactionOutputMetadata)
-
-	return
-}
-
-// Update is disabled and panics if it ever gets called - updates are supposed to happen through the setters.
-func (transactionOutputMetadata *TransactionOutputMetadata) Update(other objectstorage.StorableObject) {
-	panic("update forbidden")
-}
-
-// Interface contract: make compiler warn if the interface is not implemented correctly.
-var _ objectstorage.StorableObject = &TransactionOutputMetadata{}
-
-// CachedTransactionOutputMetadata is a wrapper for the object storage, that takes care of type casting the TransactionOutputMetadata objects.
-// Since go does not have generics (yet), the object storage works based on the generic "interface{}" type, which means
-// that we have to regularly type cast the returned objects, to match the expected type. To reduce the burden of
-// manually managing these type, we create a wrapper that does this for us. This way, we can consistently handle the
-// specialized types of TransactionOutputMetadata, without having to manually type cast over and over again.
-type CachedTransactionOutputMetadata struct {
-	objectstorage.CachedObject
-}
-
-// Retain overrides the underlying method to return a new CachedTransactionOutputMetadata instead of a generic CachedObject.
-func (cachedOutputMetadata *CachedTransactionOutputMetadata) Retain() *CachedTransactionOutputMetadata {
-	return &CachedTransactionOutputMetadata{cachedOutputMetadata.CachedObject.Retain()}
-}
-
-// Consume  overrides the underlying method to use a CachedTransactionOutputMetadata object instead of a generic CachedObject in the
-// consumer).
-func (cachedOutputMetadata *CachedTransactionOutputMetadata) Consume(consumer func(outputMetadata *TransactionOutputMetadata)) bool {
-	return cachedOutputMetadata.CachedObject.Consume(func(object objectstorage.StorableObject) {
-		consumer(object.(*TransactionOutputMetadata))
-	})
-}
-
-// Unwrap provides a way to retrieve a type casted version of the underlying object.
-func (cachedOutputMetadata *CachedTransactionOutputMetadata) Unwrap() *TransactionOutputMetadata {
-	if untypedTransaction := cachedOutputMetadata.Get(); untypedTransaction == nil {
-		return nil
-	} else {
-		if typeCastedTransaction := untypedTransaction.(*TransactionOutputMetadata); typeCastedTransaction == nil || typeCastedTransaction.IsDeleted() {
-			return nil
-		} else {
-			return typeCastedTransaction
-		}
-	}
-}
diff --git a/packages/binary/valuetransfer/transaction/output.go b/packages/binary/valuetransfer/transaction/output.go
index a5d0aab1..2fa3a6e9 100644
--- a/packages/binary/valuetransfer/transaction/output.go
+++ b/packages/binary/valuetransfer/transaction/output.go
@@ -1,6 +1,7 @@
 package transaction
 
 import (
+	"sync"
 	"time"
 
 	"github.com/iotaledger/hive.go/marshalutil"
@@ -10,13 +11,18 @@ import (
 	"github.com/iotaledger/goshimmer/packages/binary/valuetransfer/balance"
 )
 
+var OutputKeyPartitions = []int{address.Length, IdLength}
+
 // Output represents the output of a Transaction and contains the balances and the identifiers for this output.
 type Output struct {
-	address       address.Address
-	transactionId Id
-	solid         bool
-	solidSince    time.Time
-	balances      []*balance.Balance
+	address            address.Address
+	transactionId      Id
+	solid              bool
+	solidificationTime time.Time
+	balances           []*balance.Balance
+
+	solidMutex              sync.RWMutex
+	solidificationTimeMutex sync.RWMutex
 
 	objectstorage.StorableObjectFlags
 	storageKey []byte
@@ -25,11 +31,11 @@ type Output struct {
 // NewOutput creates an Output that contains the balances and identifiers of a Transaction.
 func NewOutput(address address.Address, transactionId Id, balances []*balance.Balance) *Output {
 	return &Output{
-		address:       address,
-		transactionId: transactionId,
-		solid:         false,
-		solidSince:    time.Time{},
-		balances:      balances,
+		address:            address,
+		transactionId:      transactionId,
+		solid:              false,
+		solidificationTime: time.Time{},
+		balances:           balances,
 
 		storageKey: marshalutil.New().WriteBytes(address.Bytes()).WriteBytes(transactionId.Bytes()).Bytes(),
 	}
@@ -38,6 +44,39 @@ func NewOutput(address address.Address, transactionId Id, balances []*balance.Ba
 // OutputFromBytes unmarshals an Output object from a sequence of bytes.
 // It either creates a new object or fills the optionally provided object with the parsed information.
 func OutputFromBytes(bytes []byte, optionalTargetObject ...*Output) (result *Output, err error, consumedBytes int) {
+	marshalUtil := marshalutil.New(bytes)
+	result, err = ParseOutput(marshalUtil, optionalTargetObject...)
+	consumedBytes = marshalUtil.ReadOffset()
+
+	return
+}
+
+func ParseOutput(marshalUtil *marshalutil.MarshalUtil, optionalTargetObject ...*Output) (result *Output, err error) {
+	if parsedObject, parseErr := marshalUtil.Parse(func(data []byte) (interface{}, error, int) {
+		return OutputFromStorageKey(data, optionalTargetObject...)
+	}); parseErr != nil {
+		err = parseErr
+
+		return
+	} else {
+		result = parsedObject.(*Output)
+	}
+
+	if _, err = marshalUtil.Parse(func(data []byte) (parseResult interface{}, parseErr error, parsedBytes int) {
+		parseErr, parsedBytes = result.UnmarshalObjectStorageValue(data)
+
+		return
+	}); err != nil {
+		return
+	}
+
+	return
+}
+
+// OutputFromStorageKey get's called when we restore a Output from the storage.
+// In contrast to other database models, it unmarshals some information from the key so we simply store the key before
+// it gets handed over to UnmarshalObjectStorageValue (by the ObjectStorage).
+func OutputFromStorageKey(keyBytes []byte, optionalTargetObject ...*Output) (result objectstorage.StorableObject, err error, consumedBytes int) {
 	// determine the target object that will hold the unmarshaled information
 	switch len(optionalTargetObject) {
 	case 0:
@@ -48,47 +87,22 @@ func OutputFromBytes(bytes []byte, optionalTargetObject ...*Output) (result *Out
 		panic("too many arguments in call to OutputFromBytes")
 	}
 
-	// parse the bytes
-	marshalUtil := marshalutil.New(bytes)
-	if result.address, err = address.Parse(marshalUtil); err != nil {
+	// parse information
+	marshalUtil := marshalutil.New(keyBytes)
+	result.(*Output).address, err = address.Parse(marshalUtil)
+	if err != nil {
 		return
 	}
-	if result.transactionId, err = ParseId(marshalUtil); err != nil {
+	result.(*Output).transactionId, err = ParseId(marshalUtil)
+	if err != nil {
 		return
 	}
-	if result.solid, err = marshalUtil.ReadBool(); err != nil {
-		return
-	}
-	if result.solidSince, err = marshalUtil.ReadTime(); err != nil {
-		return
-	}
-	var balanceCount uint32
-	if balanceCount, err = marshalUtil.ReadUint32(); err != nil {
-		return
-	} else {
-		result.balances = make([]*balance.Balance, balanceCount)
-		for i := uint32(0); i < balanceCount; i++ {
-			result.balances[i], err = balance.Parse(marshalUtil)
-			if err != nil {
-				return
-			}
-		}
-	}
-	result.storageKey = marshalutil.New().WriteBytes(result.address.Bytes()).WriteBytes(result.transactionId.Bytes()).Bytes()
+	result.(*Output).storageKey = marshalutil.New(keyBytes[:OutputIdLength]).Bytes(true)
 	consumedBytes = marshalUtil.ReadOffset()
 
 	return
 }
 
-// OutputFromStorageKey get's called when we restore a Output from the storage.
-// In contrast to other database models, it unmarshals some information from the key so we simply store the key before
-// it gets handed over to UnmarshalObjectStorageValue (by the ObjectStorage).
-func OutputFromStorageKey(keyBytes []byte) (result objectstorage.StorableObject, err error, consumedBytes int) {
-	return &Output{
-		storageKey: keyBytes[:OutputIdLength],
-	}, nil, OutputIdLength
-}
-
 // Address returns the address that this output belongs to.
 func (output *Output) Address() address.Address {
 	return output.address
@@ -99,11 +113,61 @@ func (output *Output) TransactionId() Id {
 	return output.transactionId
 }
 
+// Solid returns true if the output has been marked as solid.
+func (output *Output) Solid() bool {
+	output.solidMutex.RLock()
+	defer output.solidMutex.RUnlock()
+
+	return output.solid
+}
+
+func (output *Output) SetSolid(solid bool) (modified bool) {
+	output.solidMutex.RLock()
+	if output.solid != solid {
+		output.solidMutex.RUnlock()
+
+		output.solidMutex.Lock()
+		if output.solid != solid {
+			output.solid = solid
+			if solid {
+				output.solidificationTimeMutex.Lock()
+				output.solidificationTime = time.Now()
+				output.solidificationTimeMutex.Unlock()
+			}
+
+			output.SetModified()
+
+			modified = true
+		}
+		output.solidMutex.Unlock()
+
+	} else {
+		output.solidMutex.RUnlock()
+	}
+
+	return
+}
+
+func (output *Output) SolidificationTime() time.Time {
+	output.solidificationTimeMutex.RLock()
+	defer output.solidificationTimeMutex.RUnlock()
+
+	return output.solidificationTime
+}
+
 // Balances returns the colored balances (color + balance) that this output contains.
 func (output *Output) Balances() []*balance.Balance {
 	return output.balances
 }
 
+// Bytes marshals the object into a sequence of bytes.
+func (output *Output) Bytes() []byte {
+	return marshalutil.New().
+		WriteBytes(output.ObjectStorageKey()).
+		WriteBytes(output.ObjectStorageValue()).
+		Bytes()
+}
+
 // ObjectStorageKey returns the key that is used to store the object in the database.
 // It is required to match StorableObject interface.
 func (output *Output) ObjectStorageKey() []byte {
@@ -115,26 +179,45 @@ func (output *Output) ObjectStorageKey() []byte {
 
 // ObjectStorageValue marshals the balances into a sequence of bytes - the address and transaction id are stored inside the key
 // and are ignored here.
-func (output *Output) ObjectStorageValue() (data []byte) {
+func (output *Output) ObjectStorageValue() []byte {
 	// determine amount of balances in the output
 	balanceCount := len(output.balances)
 
 	// initialize helper
 	marshalUtil := marshalutil.New(marshalutil.BOOL_SIZE + marshalutil.TIME_SIZE + marshalutil.UINT32_SIZE + balanceCount*balance.Length)
 	marshalUtil.WriteBool(output.solid)
-	marshalUtil.WriteTime(output.solidSince)
+	marshalUtil.WriteTime(output.solidificationTime)
 	marshalUtil.WriteUint32(uint32(balanceCount))
 	for _, balanceToMarshal := range output.balances {
 		marshalUtil.WriteBytes(balanceToMarshal.Bytes())
 	}
 
-	return
+	return marshalUtil.Bytes()
 }
 
 // UnmarshalObjectStorageValue restores a Output from a serialized version in the ObjectStorage with parts of the object
 // being stored in its key rather than the content of the database to reduce storage requirements.
 func (output *Output) UnmarshalObjectStorageValue(data []byte) (err error, consumedBytes int) {
-	_, err, consumedBytes = OutputFromBytes(marshalutil.New(output.storageKey).WriteBytes(data).Bytes(), output)
+	marshalUtil := marshalutil.New(data)
+	if output.solid, err = marshalUtil.ReadBool(); err != nil {
+		return
+	}
+	if output.solidificationTime, err = marshalUtil.ReadTime(); err != nil {
+		return
+	}
+	var balanceCount uint32
+	if balanceCount, err = marshalUtil.ReadUint32(); err != nil {
+		return
+	} else {
+		output.balances = make([]*balance.Balance, balanceCount)
+		for i := uint32(0); i < balanceCount; i++ {
+			output.balances[i], err = balance.Parse(marshalUtil)
+			if err != nil {
+				return
+			}
+		}
+	}
+	consumedBytes = marshalUtil.ReadOffset()
 
 	return
 }
@@ -146,3 +229,41 @@ func (output *Output) Update(other objectstorage.StorableObject) {
 
 // define contract (ensure that the struct fulfills the given interface)
 var _ objectstorage.StorableObject = &Output{}
+
+// region CachedOutput /////////////////////////////////////////////////////////////////////////////////////////////////
+
+type CachedOutput struct {
+	objectstorage.CachedObject
+}
+
+func (cachedOutput *CachedOutput) Unwrap() *Output {
+	if untypedObject := cachedOutput.Get(); untypedObject == nil {
+		return nil
+	} else {
+		if typedObject := untypedObject.(*Output); typedObject == nil || typedObject.IsDeleted() {
+			return nil
+		} else {
+			return typedObject
+		}
+	}
+}
+
+func (cachedOutput *CachedOutput) Consume(consumer func(output *Output)) (consumed bool) {
+	return cachedOutput.CachedObject.Consume(func(object objectstorage.StorableObject) {
+		consumer(object.(*Output))
+	})
+}
+
+type CachedOutputs []*CachedOutput
+
+func (cachedOutputs CachedOutputs) Consume(consumer func(output *Output)) (consumed bool) {
+	for _, cachedOutput := range cachedOutputs {
+		consumed = cachedOutput.Consume(func(output *Output) {
+			consumer(output)
+		}) || consumed
+	}
+
+	return
+}
+
+// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/packages/binary/valuetransfer/transaction/output_test.go b/packages/binary/valuetransfer/transaction/output_test.go
index 305409e8..d926bc72 100644
--- a/packages/binary/valuetransfer/transaction/output_test.go
+++ b/packages/binary/valuetransfer/transaction/output_test.go
@@ -2,8 +2,43 @@ package transaction
 
 import (
 	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+
+	"github.com/iotaledger/goshimmer/packages/binary/valuetransfer/address"
+	"github.com/iotaledger/goshimmer/packages/binary/valuetransfer/balance"
 )
 
 func TestNew(t *testing.T) {
+	randomAddress := address.Random()
+	randomTransactionId := RandomId()
+
+	output := NewOutput(randomAddress, randomTransactionId, []*balance.Balance{
+		balance.New(balance.COLOR_IOTA, 1337),
+	})
+
+	assert.Equal(t, randomAddress, output.Address())
+	assert.Equal(t, randomTransactionId, output.TransactionId())
+	assert.Equal(t, false, output.Solid())
+	assert.Equal(t, time.Time{}, output.SolidificationTime())
+	assert.Equal(t, []*balance.Balance{
+		balance.New(balance.COLOR_IOTA, 1337),
+	}, output.Balances())
+
+	assert.Equal(t, true, output.SetSolid(true))
+	assert.Equal(t, false, output.SetSolid(true))
+	assert.Equal(t, true, output.Solid())
+	assert.NotEqual(t, time.Time{}, output.SolidificationTime())
+
+	clonedOutput, err, _ := OutputFromBytes(output.Bytes())
+	if err != nil {
+		panic(err)
+	}
 
+	assert.Equal(t, output.Address(), clonedOutput.Address())
+	assert.Equal(t, output.TransactionId(), clonedOutput.TransactionId())
+	assert.Equal(t, output.Solid(), clonedOutput.Solid())
+	assert.Equal(t, output.SolidificationTime().Round(time.Second), clonedOutput.SolidificationTime().Round(time.Second))
+	assert.Equal(t, output.Balances(), clonedOutput.Balances())
 }
diff --git a/packages/binary/valuetransfer/transaction/transaction.go b/packages/binary/valuetransfer/transaction/transaction.go
index 0d685c60..04e3ab01 100644
--- a/packages/binary/valuetransfer/transaction/transaction.go
+++ b/packages/binary/valuetransfer/transaction/transaction.go
@@ -133,6 +133,10 @@ func (transaction *Transaction) Inputs() *Inputs {
 	return transaction.inputs
 }
 
+func (transaction *Transaction) Outputs() *Outputs {
+	return transaction.outputs
+}
+
 func (transaction *Transaction) SignaturesValid() bool {
 	signaturesValid := true
 	transaction.inputs.ForEachAddress(func(address address.Address) bool {
-- 
GitLab