diff --git a/go.mod b/go.mod index f0c5e1ebb725e9622202e8376b15c69a77f0da3d..039b84fd659eb41bad1e1954a95a33835344496c 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/googollee/go-engine.io v1.4.3-0.20190924125625-798118fc0dd2 github.com/googollee/go-socket.io v1.4.3-0.20191204093753-683f8725b6d0 github.com/gorilla/websocket v1.4.1 - github.com/iotaledger/hive.go v0.0.0-20200330121034-e4a505bcf2cd + github.com/iotaledger/hive.go v0.0.0-20200403132600-4c10556e08a0 github.com/iotaledger/iota.go v1.0.0-beta.14 github.com/labstack/echo v3.3.10+incompatible github.com/labstack/gommon v0.3.0 // indirect diff --git a/go.sum b/go.sum index 4faabd5a7c4372c60275144650b70deefcbbfe06..cbba30d821850b28f8d877e5ef254dff18422532 100644 --- a/go.sum +++ b/go.sum @@ -131,10 +131,12 @@ github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= -github.com/iotaledger/hive.go v0.0.0-20200329235804-a34899d73dc0 h1:15y1xve54VKQbtt27BYBTsPeGsUQ0vqg96AtS7lRLss= -github.com/iotaledger/hive.go v0.0.0-20200329235804-a34899d73dc0/go.mod h1:4sloxRutRhCuXgAgtOu1ZxVM95Na+ovK9MRDEQGZlzw= github.com/iotaledger/hive.go v0.0.0-20200330121034-e4a505bcf2cd h1:GZ9zGBj+tK1jHqTD5+OoPLVVlk/sB2pkKmQt9vjR8uY= github.com/iotaledger/hive.go v0.0.0-20200330121034-e4a505bcf2cd/go.mod h1:LYUD1U+BxF+OY6zCZ4xp38vzjp/QWbUdCw9iwmxkGnc= +github.com/iotaledger/hive.go v0.0.0-20200402231254-50e5bddb0da0 h1:Es3rPblh28a68LctnLwqhUOphmtkD8Q3UVKZoZYSlDM= +github.com/iotaledger/hive.go v0.0.0-20200402231254-50e5bddb0da0/go.mod h1:LYUD1U+BxF+OY6zCZ4xp38vzjp/QWbUdCw9iwmxkGnc= +github.com/iotaledger/hive.go v0.0.0-20200403132600-4c10556e08a0 h1:CyUsunZHlWuD1s9GVz+XqAIZVpRDxJBspb4DheJVknw= +github.com/iotaledger/hive.go v0.0.0-20200403132600-4c10556e08a0/go.mod h1:LYUD1U+BxF+OY6zCZ4xp38vzjp/QWbUdCw9iwmxkGnc= github.com/iotaledger/iota.go v1.0.0-beta.9/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8= github.com/iotaledger/iota.go v1.0.0-beta.14 h1:Oeb28MfBuJEeXcGrLhTCJFtbsnc8y1u7xidsAmiOD5A= github.com/iotaledger/iota.go v1.0.0-beta.14/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8= diff --git a/packages/binary/messagelayer/tangle/approver.go b/packages/binary/messagelayer/tangle/approver.go index 9c638ed1a6da843b5cc4f6f1d2bb077d0f0452ae..f705b45af8065e1d552513c12c73a3b5ac40846f 100644 --- a/packages/binary/messagelayer/tangle/approver.go +++ b/packages/binary/messagelayer/tangle/approver.go @@ -69,12 +69,10 @@ func ApproverFromStorageKey(key []byte, optionalTargetObject ...*Approver) (resu // parse the properties that are stored in the key marshalUtil := marshalutil.New(key) - result.(*Approver).referencedMessageId, err = message.ParseId(marshalUtil) - if err != nil { + if result.(*Approver).referencedMessageId, err = message.ParseId(marshalUtil); err != nil { return } - result.(*Approver).approvingMessageId, err = message.ParseId(marshalUtil) - if err != nil { + if result.(*Approver).approvingMessageId, err = message.ParseId(marshalUtil); err != nil { return } consumedBytes = marshalUtil.ReadOffset() diff --git a/packages/binary/messagelayer/tangle/missingmessage.go b/packages/binary/messagelayer/tangle/missingmessage.go index e05d11f047ff21258bea8a55ebdee012dcc6b633..929b56bf480fcba4e58bbf22ec95f30f59da4eac 100644 --- a/packages/binary/messagelayer/tangle/missingmessage.go +++ b/packages/binary/messagelayer/tangle/missingmessage.go @@ -27,7 +27,7 @@ func MissingMessageFromStorageKey(key []byte, optionalTargetObject ...*MissingMe // determine the target object that will hold the unmarshaled information switch len(optionalTargetObject) { case 0: - result = &Approver{} + result = &MissingMessage{} case 1: result = optionalTargetObject[0] default: diff --git a/packages/binary/messagelayer/tangle/storageprefixes.go b/packages/binary/messagelayer/tangle/storageprefixes.go new file mode 100644 index 0000000000000000000000000000000000000000..24a31eea08c886d9f649b490c1fa77e444896447 --- /dev/null +++ b/packages/binary/messagelayer/tangle/storageprefixes.go @@ -0,0 +1,8 @@ +package tangle + +const ( + PrefixMessage byte = iota + PrefixMessageMetadata + PrefixApprovers + PrefixMissingMessage +) diff --git a/packages/binary/messagelayer/tangle/tangle.go b/packages/binary/messagelayer/tangle/tangle.go index be3877532e711d6184a91f87b0fd53e702a59783..86ef2ab3f800594ecc010c361f69e29ac2b2f363 100644 --- a/packages/binary/messagelayer/tangle/tangle.go +++ b/packages/binary/messagelayer/tangle/tangle.go @@ -20,8 +20,6 @@ const ( ) type Tangle struct { - storageId []byte - messageStorage *objectstorage.ObjectStorage messageMetadataStorage *objectstorage.ObjectStorage approverStorage *objectstorage.ObjectStorage @@ -47,13 +45,14 @@ func missingMessageFactory(key []byte) (objectstorage.StorableObject, error, int } // Constructor for the tangle. -func New(badgerInstance *badger.DB, storageId []byte) (result *Tangle) { +func New(badgerInstance *badger.DB) (result *Tangle) { + osFactory := objectstorage.NewFactory(badgerInstance, storageprefix.MessageLayer) + result = &Tangle{ - storageId: storageId, - messageStorage: objectstorage.New(badgerInstance, append(storageId, storageprefix.Layer0Message...), messageFactory, objectstorage.CacheTime(10*time.Second), objectstorage.LeakDetectionEnabled(false)), - messageMetadataStorage: objectstorage.New(badgerInstance, append(storageId, storageprefix.Layer0MessageMetadata...), MessageMetadataFromStorageKey, objectstorage.CacheTime(10*time.Second), objectstorage.LeakDetectionEnabled(false)), - approverStorage: objectstorage.New(badgerInstance, append(storageId, storageprefix.Layer0Approvers...), approverFactory, objectstorage.CacheTime(10*time.Second), objectstorage.PartitionKey(message.IdLength, message.IdLength), objectstorage.LeakDetectionEnabled(false)), - missingMessageStorage: objectstorage.New(badgerInstance, append(storageId, storageprefix.Layer0MissingMessage...), missingMessageFactory, objectstorage.CacheTime(10*time.Second), objectstorage.LeakDetectionEnabled(false)), + messageStorage: osFactory.New(PrefixMessage, messageFactory, objectstorage.CacheTime(10*time.Second), objectstorage.LeakDetectionEnabled(false)), + messageMetadataStorage: osFactory.New(PrefixMessageMetadata, MessageMetadataFromStorageKey, objectstorage.CacheTime(10*time.Second), objectstorage.LeakDetectionEnabled(false)), + approverStorage: osFactory.New(PrefixApprovers, approverFactory, objectstorage.CacheTime(10*time.Second), objectstorage.PartitionKey(message.IdLength, message.IdLength), objectstorage.LeakDetectionEnabled(false)), + missingMessageStorage: osFactory.New(PrefixMissingMessage, missingMessageFactory, objectstorage.CacheTime(10*time.Second), objectstorage.LeakDetectionEnabled(false)), Events: *newEvents(), } @@ -63,11 +62,6 @@ func New(badgerInstance *badger.DB, storageId []byte) (result *Tangle) { return } -// Returns the storage id of this tangle (can be used to create ontologies that follow the storage of the main tangle). -func (tangle *Tangle) StorageId() []byte { - return tangle.storageId -} - // Attaches a new transaction to the tangle. func (tangle *Tangle) AttachMessage(transaction *message.Message) { tangle.storeMessageWorkerPool.Submit(func() { tangle.storeMessageWorker(transaction) }) diff --git a/packages/binary/messagelayer/tangle/tangle_test.go b/packages/binary/messagelayer/tangle/tangle_test.go index e16243a2ee94f022bc0bbec17791eddbc41022f6..af4bed90b7f0a6fe269649fd70c4d5f70fd65cb1 100644 --- a/packages/binary/messagelayer/tangle/tangle_test.go +++ b/packages/binary/messagelayer/tangle/tangle_test.go @@ -24,7 +24,7 @@ func BenchmarkTangle_AttachTransaction(b *testing.B) { // use the tempdir for the database config.Node.Set(database.CFG_DIRECTORY, dir) - tangle := New(database.GetBadgerInstance(), []byte("TEST_BINARY_TANGLE")) + tangle := New(database.GetBadgerInstance()) if err := tangle.Prune(); err != nil { b.Error(err) @@ -55,7 +55,7 @@ func TestTangle_AttachTransaction(t *testing.T) { // use the tempdir for the database config.Node.Set(database.CFG_DIRECTORY, dir) - messageTangle := New(database.GetBadgerInstance(), []byte("TEST_BINARY_TANGLE")) + messageTangle := New(database.GetBadgerInstance()) if err := messageTangle.Prune(); err != nil { t.Error(err) diff --git a/packages/binary/storageprefix/storageprefix.go b/packages/binary/storageprefix/storageprefix.go index efafc56ee9dd4aeefa34fe8db69ee583f8170ec3..28ad9403e71a214a1d956f0eeb8a90e3206c4bc1 100644 --- a/packages/binary/storageprefix/storageprefix.go +++ b/packages/binary/storageprefix/storageprefix.go @@ -1,22 +1,10 @@ package storageprefix -var ( - MainNet = []byte{0} +const ( + // the following values are a list of prefixes defined as an enum + _ byte = iota - Layer0Message = []byte{1} - Layer0MessageMetadata = []byte{2} - Layer0Approvers = []byte{3} - Layer0MissingMessage = []byte{4} - - ValueTransferPayload = []byte{5} - ValueTransferPayloadMetadata = []byte{6} - ValueTransferApprover = []byte{7} - ValueTransferMissingPayload = []byte{8} - ValueTransferAttachment = []byte{9} - ValueTransferConsumer = []byte{10} - - LedgerStateTransferOutput = []byte{11} - LedgerStateTransferOutputBooking = []byte{12} - LedgerStateReality = []byte{13} - LedgerStateConflictSet = []byte{14} + // package specific prefixes used for the objectstorage in the corresponding packages + MessageLayer + ValueTransfers ) diff --git a/packages/binary/valuetransfer/payload/payload.go b/packages/binary/valuetransfer/payload/payload.go index d41e143931d8d5d1ee98b578b7413f124e4a8315..33fdf15912f47b6876e0b823d66942697a89a6b4 100644 --- a/packages/binary/valuetransfer/payload/payload.go +++ b/packages/binary/valuetransfer/payload/payload.go @@ -43,8 +43,16 @@ func FromBytes(bytes []byte, optionalTargetObject ...*Payload) (result *Payload, return } -func StorableObjectFromKey(key []byte) (result objectstorage.StorableObject, err error, consumedBytes int) { - result = &Payload{} +func FromStorageKey(key []byte, optionalTargetObject ...*Payload) (result *Payload, err error, consumedBytes int) { + // determine the target object that will hold the unmarshaled information + switch len(optionalTargetObject) { + case 0: + result = &Payload{} + case 1: + result = optionalTargetObject[0] + default: + panic("too many arguments in call to MissingPayloadFromStorageKey") + } // parse the properties that are stored in the key marshalUtil := marshalutil.New(key) @@ -53,7 +61,7 @@ func StorableObjectFromKey(key []byte) (result objectstorage.StorableObject, err return } else { - result.(*Payload).id = &payloadId + result.id = &payloadId } consumedBytes = marshalUtil.ReadOffset() @@ -127,7 +135,28 @@ func (payload *Payload) Transaction() *transaction.Transaction { return payload.transaction } -func (payload *Payload) Bytes() (bytes []byte) { +func (payload *Payload) Bytes() []byte { + return payload.ObjectStorageValue() +} + +func (payload *Payload) String() string { + return stringify.Struct("Payload", + stringify.StructField("id", payload.Id()), + stringify.StructField("trunk", payload.TrunkId()), + stringify.StructField("branch", payload.BranchId()), + stringify.StructField("transfer", payload.Transaction()), + ) +} + +// region Payload implementation /////////////////////////////////////////////////////////////////////////////////////// + +var Type = payload.Type(1) + +func (payload *Payload) Type() payload.Type { + return Type +} + +func (payload *Payload) ObjectStorageValue() (bytes []byte) { // acquire lock for reading bytes payload.bytesMutex.RLock() @@ -167,27 +196,6 @@ func (payload *Payload) Bytes() (bytes []byte) { return } -func (payload *Payload) String() string { - return stringify.Struct("Payload", - stringify.StructField("id", payload.Id()), - stringify.StructField("trunk", payload.TrunkId()), - stringify.StructField("branch", payload.BranchId()), - stringify.StructField("transfer", payload.Transaction()), - ) -} - -// region Payload implementation /////////////////////////////////////////////////////////////////////////////////////// - -var Type = payload.Type(1) - -func (payload *Payload) Type() payload.Type { - return Type -} - -func (payload *Payload) ObjectStorageValue() []byte { - return payload.Bytes() -} - func (payload *Payload) UnmarshalObjectStorageValue(data []byte) (err error, consumedBytes int) { marshalUtil := marshalutil.New(data) @@ -248,9 +256,7 @@ var _ payload.Payload = &Payload{} // UnmarshalObjectStorageValue(data []byte) (err error) already implemented by Payload func (payload *Payload) ObjectStorageKey() []byte { - id := payload.Id() - - return id[:] + return payload.Id().Bytes() } func (payload *Payload) Update(other objectstorage.StorableObject) { diff --git a/packages/binary/valuetransfer/tangle/attachment.go b/packages/binary/valuetransfer/tangle/attachment.go index c7dcd5527dd54681e18f64806059aa3263bf4d43..ae79f9419866ac5f9fa448c42423f534cb959a81 100644 --- a/packages/binary/valuetransfer/tangle/attachment.go +++ b/packages/binary/valuetransfer/tangle/attachment.go @@ -36,6 +36,39 @@ func NewAttachment(transactionId transaction.Id, payloadId payload.Id) *Attachme // AttachmentFromBytes unmarshals an Attachment from a sequence of bytes - it either creates a new object or fills the // optionally provided one with the parsed information. func AttachmentFromBytes(bytes []byte, optionalTargetObject ...*Attachment) (result *Attachment, err error, consumedBytes int) { + marshalUtil := marshalutil.New(bytes) + result, err = ParseAttachment(marshalUtil, optionalTargetObject...) + consumedBytes = marshalUtil.ReadOffset() + + return +} + +// Parse is a wrapper for simplified unmarshaling of Attachments from a byte stream using the marshalUtil package. +func ParseAttachment(marshalUtil *marshalutil.MarshalUtil, optionalTargetObject ...*Attachment) (result *Attachment, err error) { + if parsedObject, parseErr := marshalUtil.Parse(func(data []byte) (interface{}, error, int) { + return AttachmentFromStorageKey(data, optionalTargetObject...) + }); parseErr != nil { + err = parseErr + + return + } else { + result = parsedObject.(*Attachment) + } + + if _, err = marshalUtil.Parse(func(data []byte) (parseResult interface{}, parseErr error, parsedBytes int) { + parseErr, parsedBytes = result.UnmarshalObjectStorageValue(data) + + return + }); err != nil { + return + } + + return +} + +// AttachmentFromStorageKey gets called when we restore an Attachment from the storage - it parses the key bytes and +// returns the new object. +func AttachmentFromStorageKey(key []byte, optionalTargetObject ...*Attachment) (result *Attachment, err error, consumedBytes int) { // determine the target object that will hold the unmarshaled information switch len(optionalTargetObject) { case 0: @@ -43,43 +76,23 @@ func AttachmentFromBytes(bytes []byte, optionalTargetObject ...*Attachment) (res case 1: result = optionalTargetObject[0] default: - panic("too many arguments in call to AttachmentFromBytes") + panic("too many arguments in call to AttachmentFromStorageKey") } - // parse the bytes - marshalUtil := marshalutil.New(bytes) + // parse the properties that are stored in the key + marshalUtil := marshalutil.New(key) if result.transactionId, err = transaction.ParseId(marshalUtil); err != nil { return } if result.payloadId, err = payload.ParseId(marshalUtil); err != nil { return } - result.storageKey = marshalutil.New(bytes[:AttachmentLength]).Bytes(true) consumedBytes = marshalUtil.ReadOffset() + result.storageKey = marshalutil.New(key[:consumedBytes]).Bytes(true) return } -// Parse is a wrapper for simplified unmarshaling of Attachments from a byte stream using the marshalUtil package. -func ParseAttachment(marshalUtil *marshalutil.MarshalUtil) (*Attachment, error) { - if attachment, err := marshalUtil.Parse(func(data []byte) (interface{}, error, int) { return AttachmentFromBytes(data) }); err != nil { - return nil, err - } else { - return attachment.(*Attachment), nil - } -} - -// AttachmentFromStorageKey gets called when we restore an Attachment from the storage - it parses the key bytes and -// returns the new object. -func AttachmentFromStorageKey(keyBytes []byte) (objectstorage.StorableObject, error, int) { - result, err, _ := AttachmentFromBytes(keyBytes) - if err != nil { - return nil, err, 0 - } - - return result, nil, 0 -} - // TransactionId returns the transaction id of this Attachment. func (attachment *Attachment) TransactionId() transaction.Id { return attachment.transactionId @@ -130,3 +143,41 @@ var _ objectstorage.StorableObject = &Attachment{} // AttachmentLength holds the length of a marshaled Attachment in bytes. const AttachmentLength = transaction.IdLength + payload.IdLength + +// region CachedAttachment ///////////////////////////////////////////////////////////////////////////////////////////// + +type CachedAttachment struct { + objectstorage.CachedObject +} + +func (cachedAttachment *CachedAttachment) Unwrap() *Attachment { + if untypedObject := cachedAttachment.Get(); untypedObject == nil { + return nil + } else { + if typedObject := untypedObject.(*Attachment); typedObject == nil || typedObject.IsDeleted() { + return nil + } else { + return typedObject + } + } +} + +func (cachedAttachment *CachedAttachment) Consume(consumer func(attachment *Attachment)) (consumed bool) { + return cachedAttachment.CachedObject.Consume(func(object objectstorage.StorableObject) { + consumer(object.(*Attachment)) + }) +} + +type CachedAttachments []*CachedAttachment + +func (cachedAttachments CachedAttachments) Consume(consumer func(attachment *Attachment)) (consumed bool) { + for _, cachedAttachment := range cachedAttachments { + consumed = cachedAttachment.Consume(func(output *Attachment) { + consumer(output) + }) || consumed + } + + return +} + +// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/packages/binary/valuetransfer/tangle/consumer.go b/packages/binary/valuetransfer/tangle/consumer.go index 9dadca65db17677ba70df2aed239817319aa32e5..b1ff096ce4649160a62d7276c4447ff57e06619e 100644 --- a/packages/binary/valuetransfer/tangle/consumer.go +++ b/packages/binary/valuetransfer/tangle/consumer.go @@ -5,9 +5,12 @@ import ( "github.com/iotaledger/hive.go/objectstorage" "github.com/iotaledger/hive.go/stringify" + "github.com/iotaledger/goshimmer/packages/binary/valuetransfer/address" "github.com/iotaledger/goshimmer/packages/binary/valuetransfer/transaction" ) +var ConsumerPartitionKeys = objectstorage.PartitionKey([]int{address.Length, transaction.IdLength, transaction.IdLength}...) + // Consumer stores the information which transaction output was consumed by which transaction. We need this to be able // to perform reverse lookups from transaction outputs to their corresponding consuming transactions. type Consumer struct { @@ -35,6 +38,36 @@ func NewConsumer(consumedInput transaction.OutputId, transactionId transaction.I // ConsumerFromBytes unmarshals a Consumer from a sequence of bytes - it either creates a new object or fills the // optionally provided one with the parsed information. func ConsumerFromBytes(bytes []byte, optionalTargetObject ...*Consumer) (result *Consumer, err error, consumedBytes int) { + marshalUtil := marshalutil.New(bytes) + result, err = ParseConsumer(marshalUtil, optionalTargetObject...) + consumedBytes = marshalUtil.ReadOffset() + + return +} + +func ParseConsumer(marshalUtil *marshalutil.MarshalUtil, optionalTargetObject ...*Consumer) (result *Consumer, err error) { + if parsedObject, parseErr := marshalUtil.Parse(func(data []byte) (interface{}, error, int) { + return ConsumerFromStorageKey(data, optionalTargetObject...) + }); parseErr != nil { + err = parseErr + + return + } else { + result = parsedObject.(*Consumer) + } + + if _, err = marshalUtil.Parse(func(data []byte) (parseResult interface{}, parseErr error, parsedBytes int) { + parseErr, parsedBytes = result.UnmarshalObjectStorageValue(data) + + return + }); err != nil { + return + } + + return +} + +func ConsumerFromStorageKey(key []byte, optionalTargetObject ...*Consumer) (result *Consumer, err error, consumedBytes int) { // determine the target object that will hold the unmarshaled information switch len(optionalTargetObject) { case 0: @@ -42,43 +75,23 @@ func ConsumerFromBytes(bytes []byte, optionalTargetObject ...*Consumer) (result case 1: result = optionalTargetObject[0] default: - panic("too many arguments in call to ConsumerFromBytes") + panic("too many arguments in call to ConsumerFromStorageKey") } - // parse the bytes - marshalUtil := marshalutil.New(bytes) + // parse the properties that are stored in the key + marshalUtil := marshalutil.New(key) if result.consumedInput, err = transaction.ParseOutputId(marshalUtil); err != nil { return } if result.transactionId, err = transaction.ParseId(marshalUtil); err != nil { return } - result.storageKey = marshalutil.New(bytes[:ConsumerLength]).Bytes(true) consumedBytes = marshalUtil.ReadOffset() + result.storageKey = marshalutil.New(key[:consumedBytes]).Bytes(true) return } -// Parse is a wrapper for simplified unmarshaling of Consumers from a byte stream using the marshalUtil package. -func ParseConsumer(marshalUtil *marshalutil.MarshalUtil) (*Consumer, error) { - if consumer, err := marshalUtil.Parse(func(data []byte) (interface{}, error, int) { return ConsumerFromBytes(data) }); err != nil { - return nil, err - } else { - return consumer.(*Consumer), nil - } -} - -// ConsumerFromStorage gets called when we restore an Consumer from the storage - it parses the key bytes and -// returns the new object. -func ConsumerFromStorage(keyBytes []byte) objectstorage.StorableObject { - result, err, _ := ConsumerFromBytes(keyBytes) - if err != nil { - panic(err) - } - - return result -} - // ConsumedInput returns the OutputId of the Consumer. func (consumer *Consumer) ConsumedInput() transaction.OutputId { return consumer.consumedInput @@ -129,3 +142,41 @@ var _ objectstorage.StorableObject = &Consumer{} // ConsumerLength holds the length of a marshaled Consumer in bytes. const ConsumerLength = transaction.OutputIdLength + transaction.IdLength + +// region CachedConsumer ///////////////////////////////////////////////////////////////////////////////////////////////// + +type CachedConsumer struct { + objectstorage.CachedObject +} + +func (cachedConsumer *CachedConsumer) Unwrap() *Consumer { + if untypedObject := cachedConsumer.Get(); untypedObject == nil { + return nil + } else { + if typedObject := untypedObject.(*Consumer); typedObject == nil || typedObject.IsDeleted() { + return nil + } else { + return typedObject + } + } +} + +func (cachedConsumer *CachedConsumer) Consume(consumer func(consumer *Consumer)) (consumed bool) { + return cachedConsumer.CachedObject.Consume(func(object objectstorage.StorableObject) { + consumer(object.(*Consumer)) + }) +} + +type CachedConsumers []*CachedConsumer + +func (cachedConsumers CachedConsumers) Consume(consumer func(consumer *Consumer)) (consumed bool) { + for _, cachedConsumer := range cachedConsumers { + consumed = cachedConsumer.Consume(func(output *Consumer) { + consumer(output) + }) || consumed + } + + return +} + +// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/packages/binary/valuetransfer/tangle/missingoutput.go b/packages/binary/valuetransfer/tangle/missingoutput.go index 6e5e05d5a67aa6724cf52a2d96b0a3a1f6aa1c6f..671eec1093690e7615ad36b7ebb19dac47f08aa9 100644 --- a/packages/binary/valuetransfer/tangle/missingoutput.go +++ b/packages/binary/valuetransfer/tangle/missingoutput.go @@ -6,9 +6,12 @@ import ( "github.com/iotaledger/hive.go/marshalutil" "github.com/iotaledger/hive.go/objectstorage" + "github.com/iotaledger/goshimmer/packages/binary/valuetransfer/address" "github.com/iotaledger/goshimmer/packages/binary/valuetransfer/transaction" ) +var MissingOutputKeyPartitions = objectstorage.PartitionKey([]int{address.Length, transaction.IdLength}...) + // MissingPayload represents an Output that was referenced by a Transaction, but that is missing in our object storage. type MissingOutput struct { objectstorage.StorableObjectFlags @@ -28,6 +31,38 @@ func NewMissingOutput(outputId transaction.OutputId) *MissingOutput { // MissingOutputFromBytes unmarshals a MissingOutput from a sequence of bytes - it either creates a new object or fills // the optionally provided one with the parsed information. func MissingOutputFromBytes(bytes []byte, optionalTargetObject ...*MissingOutput) (result *MissingOutput, err error, consumedBytes int) { + marshalUtil := marshalutil.New(bytes) + result, err = ParseMissingOutput(marshalUtil, optionalTargetObject...) + consumedBytes = marshalUtil.ReadOffset() + + return +} + +func ParseMissingOutput(marshalUtil *marshalutil.MarshalUtil, optionalTargetObject ...*MissingOutput) (result *MissingOutput, err error) { + if parsedObject, parseErr := marshalUtil.Parse(func(data []byte) (interface{}, error, int) { + return MissingOutputFromStorageKey(data, optionalTargetObject...) + }); parseErr != nil { + err = parseErr + + return + } else { + result = parsedObject.(*MissingOutput) + } + + if _, err = marshalUtil.Parse(func(data []byte) (parseResult interface{}, parseErr error, parsedBytes int) { + parseErr, parsedBytes = result.UnmarshalObjectStorageValue(data) + + return + }); err != nil { + return + } + + return +} + +// MissingOutputFromStorageKey gets called when we restore a MissingOutput from the storage. The content will be +// unmarshaled by an external caller using the binary.ObjectStorageValue interface. +func MissingOutputFromStorageKey(key []byte, optionalTargetObject ...*MissingOutput) (result *MissingOutput, err error, consumedBytes int) { // determine the target object that will hold the unmarshaled information switch len(optionalTargetObject) { case 0: @@ -35,35 +70,18 @@ func MissingOutputFromBytes(bytes []byte, optionalTargetObject ...*MissingOutput case 1: result = optionalTargetObject[0] default: - panic("too many arguments in call to MissingOutputFromBytes") + panic("too many arguments in call to MissingOutputFromStorageKey") } - // parse the bytes - marshalUtil := marshalutil.New(bytes) + // parse the properties that are stored in the key + marshalUtil := marshalutil.New(key) if result.outputId, err = transaction.ParseOutputId(marshalUtil); err != nil { return } - if result.missingSince, err = marshalUtil.ReadTime(); err != nil { - return - } - consumedBytes = marshalUtil.ReadOffset() return } -// MissingOutputFromStorageKey gets called when we restore a MissingOutput from the storage. The content will be -// unmarshaled by an external caller using the binary.ObjectStorageValue interface. -func MissingOutputFromStorageKey(keyBytes []byte) (objectstorage.StorableObject, error, int) { - outputId, err, _ := transaction.OutputIdFromBytes(keyBytes) - if err != nil { - panic(err) - } - - return &MissingOutput{ - outputId: outputId, - }, nil, transaction.OutputIdLength -} - // Id returns the id of the Output that is missing. func (missingOutput *MissingOutput) Id() transaction.OutputId { return missingOutput.outputId @@ -76,12 +94,10 @@ func (missingOutput *MissingOutput) MissingSince() time.Time { // Bytes marshals the MissingOutput into a sequence of bytes. func (missingOutput *MissingOutput) Bytes() []byte { - marshalUtil := marshalutil.New() - - marshalUtil.WriteBytes(missingOutput.outputId.Bytes()) - marshalUtil.WriteTime(missingOutput.missingSince) - - return marshalUtil.Bytes() + return marshalutil.New(transaction.OutputIdLength + marshalutil.TIME_SIZE). + WriteBytes(missingOutput.ObjectStorageKey()). + WriteBytes(missingOutput.ObjectStorageValue()). + Bytes() } // ObjectStorageKey returns the key that is used to store the object in the object storage. @@ -92,13 +108,19 @@ func (missingOutput *MissingOutput) ObjectStorageKey() []byte { // ObjectStorageValue returns a bytes representation of the Transaction by implementing the encoding.BinaryMarshaler // interface. func (missingOutput *MissingOutput) ObjectStorageValue() []byte { - return missingOutput.Bytes() + return marshalutil.New(marshalutil.TIME_SIZE). + WriteTime(missingOutput.missingSince). + Bytes() } // UnmarshalObjectStorageValue restores the values of a MissingOutput from a sequence of bytes using the encoding.BinaryUnmarshaler // interface. func (missingOutput *MissingOutput) UnmarshalObjectStorageValue(data []byte) (err error, consumedBytes int) { - _, err, consumedBytes = MissingOutputFromBytes(data, missingOutput) + marshalUtil := marshalutil.New(data) + if missingOutput.missingSince, err = marshalUtil.ReadTime(); err != nil { + return + } + consumedBytes = marshalUtil.ReadOffset() return } diff --git a/packages/binary/valuetransfer/tangle/missingpayload.go b/packages/binary/valuetransfer/tangle/missingpayload.go index 395365a2c6d4705bf670707001b5d23c3c659fb7..10a6dabb81d36bd917d9717d3c8a8c98cfe4cfda 100644 --- a/packages/binary/valuetransfer/tangle/missingpayload.go +++ b/packages/binary/valuetransfer/tangle/missingpayload.go @@ -29,6 +29,38 @@ func NewMissingPayload(payloadId payload.Id) *MissingPayload { // MissingPayloadFromBytes unmarshals an entry for a missing value transfer payload from a sequence of bytes. // It either creates a new entry or fills the optionally provided one with the parsed information. func MissingPayloadFromBytes(bytes []byte, optionalTargetObject ...*MissingPayload) (result *MissingPayload, err error, consumedBytes int) { + marshalUtil := marshalutil.New(bytes) + result, err = ParseMissingPayload(marshalUtil, optionalTargetObject...) + consumedBytes = marshalUtil.ReadOffset() + + return +} + +func ParseMissingPayload(marshalUtil *marshalutil.MarshalUtil, optionalTargetObject ...*MissingPayload) (result *MissingPayload, err error) { + if parsedObject, parseErr := marshalUtil.Parse(func(data []byte) (interface{}, error, int) { + return MissingPayloadFromStorageKey(data, optionalTargetObject...) + }); parseErr != nil { + err = parseErr + + return + } else { + result = parsedObject.(*MissingPayload) + } + + if _, err = marshalUtil.Parse(func(data []byte) (parseResult interface{}, parseErr error, parsedBytes int) { + parseErr, parsedBytes = result.UnmarshalObjectStorageValue(data) + + return + }); err != nil { + return + } + + return +} + +// MissingPayloadFromStorageKey gets called when we restore an entry for a missing value transfer payload from the storage. The bytes and +// the content will be unmarshaled by an external caller using the binary.ObjectStorageValue interface. +func MissingPayloadFromStorageKey(key []byte, optionalTargetObject ...*MissingPayload) (result *MissingPayload, err error, consumedBytes int) { // determine the target object that will hold the unmarshaled information switch len(optionalTargetObject) { case 0: @@ -36,28 +68,19 @@ func MissingPayloadFromBytes(bytes []byte, optionalTargetObject ...*MissingPaylo case 1: result = optionalTargetObject[0] default: - panic("too many arguments in call to MissingPayloadFromBytes") + panic("too many arguments in call to MissingPayloadFromStorageKey") } - // parse the bytes - marshalUtil := marshalutil.New(bytes) + // parse the properties that are stored in the key + marshalUtil := marshalutil.New(key) if result.payloadId, err = payload.ParseId(marshalUtil); err != nil { return } - if result.missingSince, err = marshalUtil.ReadTime(); err != nil { - return - } consumedBytes = marshalUtil.ReadOffset() return } -// MissingPayloadFromStorageKey gets called when we restore an entry for a missing value transfer payload from the storage. The bytes and -// the content will be unmarshaled by an external caller using the binary.ObjectStorageValue interface. -func MissingPayloadFromStorageKey([]byte) (result objectstorage.StorableObject, err error, consumedBytes int) { - return -} - // GetId returns the payload id, that is missing. func (missingPayload *MissingPayload) GetId() payload.Id { return missingPayload.payloadId @@ -70,12 +93,10 @@ func (missingPayload *MissingPayload) GetMissingSince() time.Time { // Bytes marshals the missing payload into a sequence of bytes. func (missingPayload *MissingPayload) Bytes() []byte { - marshalUtil := marshalutil.New() - - marshalUtil.WriteBytes(missingPayload.payloadId.Bytes()) - marshalUtil.WriteTime(missingPayload.missingSince) - - return marshalUtil.Bytes() + return marshalutil.New(payload.IdLength + marshalutil.TIME_SIZE). + WriteBytes(missingPayload.ObjectStorageKey()). + WriteBytes(missingPayload.ObjectStorageValue()). + Bytes() } // Update is disabled and panics if it ever gets called - updates are supposed to happen through the setters. @@ -92,12 +113,18 @@ func (missingPayload *MissingPayload) ObjectStorageKey() []byte { // ObjectStorageValue is required to match the encoding.BinaryMarshaler interface. func (missingPayload *MissingPayload) ObjectStorageValue() (data []byte) { - return missingPayload.Bytes() + return marshalutil.New(marshalutil.TIME_SIZE). + WriteTime(missingPayload.missingSince). + Bytes() } // UnmarshalObjectStorageValue is required to match the encoding.BinaryUnmarshaler interface. func (missingPayload *MissingPayload) UnmarshalObjectStorageValue(data []byte) (err error, consumedBytes int) { - _, err, consumedBytes = MissingPayloadFromBytes(data, missingPayload) + marshalUtil := marshalutil.New(data) + if missingPayload.missingSince, err = marshalUtil.ReadTime(); err != nil { + return + } + consumedBytes = marshalUtil.ReadOffset() return } diff --git a/packages/binary/valuetransfer/tangle/objectstorage.go b/packages/binary/valuetransfer/tangle/objectstorage.go new file mode 100644 index 0000000000000000000000000000000000000000..e02ee3b34b926503949f082b654e8dc4ae5ba5b5 --- /dev/null +++ b/packages/binary/valuetransfer/tangle/objectstorage.go @@ -0,0 +1,55 @@ +package tangle + +import ( + "github.com/iotaledger/hive.go/objectstorage" + + "github.com/iotaledger/goshimmer/packages/binary/valuetransfer/payload" + "github.com/iotaledger/goshimmer/packages/binary/valuetransfer/transaction" +) + +const ( + // the following values are a list of prefixes defined as an enum + _ byte = iota + + // prefixes used for the objectstorage + osPayload + osPayloadMetadata + osMissingPayload + osApprover + osAttachment + osOutput + osMissingOutput + osConsumer +) + +func osPayloadFactory(key []byte) (objectstorage.StorableObject, error, int) { + return payload.FromStorageKey(key) +} + +func osPayloadMetadataFactory(key []byte) (objectstorage.StorableObject, error, int) { + return PayloadMetadataFromStorageKey(key) +} + +func osMissingPayloadFactory(key []byte) (objectstorage.StorableObject, error, int) { + return MissingPayloadFromStorageKey(key) +} + +func osPayloadApproverFactory(key []byte) (objectstorage.StorableObject, error, int) { + return PayloadApproverFromStorageKey(key) +} + +func osAttachmentFactory(key []byte) (objectstorage.StorableObject, error, int) { + return AttachmentFromStorageKey(key) +} + +func osOutputFactory(key []byte) (objectstorage.StorableObject, error, int) { + return transaction.OutputFromStorageKey(key) +} + +func osMissingOutputFactory(key []byte) (objectstorage.StorableObject, error, int) { + return MissingOutputFromStorageKey(key) +} + +func osConsumerFactory(key []byte) (objectstorage.StorableObject, error, int) { + return ConsumerFromStorageKey(key) +} diff --git a/packages/binary/valuetransfer/tangle/payloadapprover.go b/packages/binary/valuetransfer/tangle/payloadapprover.go index 4664cb16a3aa51a8d600be49b73b03d7490401a0..26aa76f6eda9d37c4c3f616acf85cfa26a2a8163 100644 --- a/packages/binary/valuetransfer/tangle/payloadapprover.go +++ b/packages/binary/valuetransfer/tangle/payloadapprover.go @@ -31,28 +31,60 @@ func NewPayloadApprover(referencedPayload payload.Id, approvingPayload payload.I } } -// PayloadApproverFromStorageKey get's called when we restore transaction metadata from the storage. -// In contrast to other database models, it unmarshals the information from the key and does not use the UnmarshalObjectStorageValue -// method. -func PayloadApproverFromStorageKey(idBytes []byte) (result objectstorage.StorableObject, err error, consumedBytes int) { - marshalUtil := marshalutil.New(idBytes) +func PayloadApproverFromBytes(bytes []byte, optionalTargetObject ...*PayloadApprover) (result *PayloadApprover, err error, consumedBytes int) { + marshalUtil := marshalutil.New(bytes) + result, err = ParsePayloadApprover(marshalUtil, optionalTargetObject...) + consumedBytes = marshalUtil.ReadOffset() + + return +} + +func ParsePayloadApprover(marshalUtil *marshalutil.MarshalUtil, optionalTargetObject ...*PayloadApprover) (result *PayloadApprover, err error) { + if parsedObject, parseErr := marshalUtil.Parse(func(data []byte) (interface{}, error, int) { + return PayloadApproverFromStorageKey(data, optionalTargetObject...) + }); parseErr != nil { + err = parseErr - referencedPayloadId, err := payload.ParseId(marshalUtil) - if err != nil { return + } else { + result = parsedObject.(*PayloadApprover) } - approvingPayloadId, err := payload.ParseId(marshalUtil) - if err != nil { + + if _, err = marshalUtil.Parse(func(data []byte) (parseResult interface{}, parseErr error, parsedBytes int) { + parseErr, parsedBytes = result.UnmarshalObjectStorageValue(data) + + return + }); err != nil { return } - result = &PayloadApprover{ - referencedPayloadId: referencedPayloadId, - approvingPayloadId: approvingPayloadId, - storageKey: marshalUtil.Bytes(true), + return +} + +// PayloadApproverFromStorageKey get's called when we restore transaction metadata from the storage. +// In contrast to other database models, it unmarshals the information from the key and does not use the UnmarshalObjectStorageValue +// method. +func PayloadApproverFromStorageKey(key []byte, optionalTargetObject ...*PayloadApprover) (result *PayloadApprover, err error, consumedBytes int) { + // determine the target object that will hold the unmarshaled information + switch len(optionalTargetObject) { + case 0: + result = &PayloadApprover{} + case 1: + result = optionalTargetObject[0] + default: + panic("too many arguments in call to PayloadApproverFromStorageKey") } + // parse the properties that are stored in the key + marshalUtil := marshalutil.New(key) + if result.referencedPayloadId, err = payload.ParseId(marshalUtil); err != nil { + return + } + if result.approvingPayloadId, err = payload.ParseId(marshalUtil); err != nil { + return + } consumedBytes = marshalUtil.ReadOffset() + result.storageKey = marshalutil.New(key[:consumedBytes]).Bytes(true) return } diff --git a/packages/binary/valuetransfer/tangle/payloadmetadata.go b/packages/binary/valuetransfer/tangle/payloadmetadata.go index 4308e595612350142b745f365c96441ccc710bd3..981e64bcc7408720b7d96c45b71e91da47363f94 100644 --- a/packages/binary/valuetransfer/tangle/payloadmetadata.go +++ b/packages/binary/valuetransfer/tangle/payloadmetadata.go @@ -34,39 +34,52 @@ func NewPayloadMetadata(payloadId payload.Id) *PayloadMetadata { // PayloadMetadataFromBytes unmarshals a container with the metadata of a value transfer payload from a sequence of bytes. // It either creates a new container or fills the optionally provided container with the parsed information. func PayloadMetadataFromBytes(bytes []byte, optionalTargetObject ...*PayloadMetadata) (result *PayloadMetadata, err error, consumedBytes int) { - // determine the target object that will hold the unmarshaled information - switch len(optionalTargetObject) { - case 0: - result = &PayloadMetadata{} - case 1: - result = optionalTargetObject[0] - default: - panic("too many arguments in call to PayloadMetadataFromBytes") - } - - // parse the bytes marshalUtil := marshalutil.New(bytes) - if result.payloadId, err = payload.ParseId(marshalUtil); err != nil { + result, err = ParsePayloadMetadata(marshalUtil, optionalTargetObject...) + consumedBytes = marshalUtil.ReadOffset() + + return +} + +// ParsePayloadMetadata is a wrapper for simplified unmarshaling in a byte stream using the marshalUtil package. +func ParsePayloadMetadata(marshalUtil *marshalutil.MarshalUtil, optionalTargetObject ...*PayloadMetadata) (result *PayloadMetadata, err error) { + if parsedObject, parseErr := marshalUtil.Parse(func(data []byte) (interface{}, error, int) { + return PayloadMetadataFromStorageKey(data, optionalTargetObject...) + }); parseErr != nil { + err = parseErr + return + } else { + result = parsedObject.(*PayloadMetadata) } - if result.solidificationTime, err = marshalUtil.ReadTime(); err != nil { + + if _, err = marshalUtil.Parse(func(data []byte) (parseResult interface{}, parseErr error, parsedBytes int) { + parseErr, parsedBytes = result.UnmarshalObjectStorageValue(data) + return - } - if result.solid, err = marshalUtil.ReadBool(); err != nil { + }); err != nil { return } - consumedBytes = marshalUtil.ReadOffset() return } // PayloadMetadataFromStorageKey gets called when we restore transaction metadata from the storage. The bytes and the content will be // unmarshaled by an external caller using the binary.ObjectStorageValue interface. -func PayloadMetadataFromStorageKey(id []byte) (result objectstorage.StorableObject, err error, consumedBytes int) { - result = &PayloadMetadata{} +func PayloadMetadataFromStorageKey(id []byte, optionalTargetObject ...*PayloadMetadata) (result *PayloadMetadata, err error, consumedBytes int) { + // determine the target object that will hold the unmarshaled information + switch len(optionalTargetObject) { + case 0: + result = &PayloadMetadata{} + case 1: + result = optionalTargetObject[0] + default: + panic("too many arguments in call to PayloadMetadataFromStorageKey") + } + // parse the properties that are stored in the key marshalUtil := marshalutil.New(id) - if result.(*PayloadMetadata).payloadId, err = payload.ParseId(marshalUtil); err != nil { + if result.payloadId, err = payload.ParseId(marshalUtil); err != nil { return } consumedBytes = marshalUtil.ReadOffset() @@ -74,15 +87,6 @@ func PayloadMetadataFromStorageKey(id []byte) (result objectstorage.StorableObje return } -// ParsePayloadMetadata is a wrapper for simplified unmarshaling in a byte stream using the marshalUtil package. -func ParsePayloadMetadata(marshalUtil *marshalutil.MarshalUtil) (*PayloadMetadata, error) { - if payloadMetadata, err := marshalUtil.Parse(func(data []byte) (interface{}, error, int) { return PayloadMetadataFromBytes(data) }); err != nil { - return nil, err - } else { - return payloadMetadata.(*PayloadMetadata), nil - } -} - // GetPayloadId return the id of the payload that this metadata is associated to. func (payloadMetadata *PayloadMetadata) GetPayloadId() payload.Id { return payloadMetadata.payloadId @@ -136,13 +140,10 @@ func (payloadMetadata *PayloadMetadata) GetSoldificationTime() time.Time { // Bytes marshals the metadata into a sequence of bytes. func (payloadMetadata *PayloadMetadata) Bytes() []byte { - marshalUtil := marshalutil.New() - - marshalUtil.WriteBytes(payloadMetadata.payloadId.Bytes()) - marshalUtil.WriteTime(payloadMetadata.solidificationTime) - marshalUtil.WriteBool(payloadMetadata.solid) - - return marshalUtil.Bytes() + return marshalutil.New(payload.IdLength + marshalutil.TIME_SIZE + marshalutil.BOOL_SIZE). + WriteBytes(payloadMetadata.ObjectStorageKey()). + WriteBytes(payloadMetadata.ObjectStorageValue()). + Bytes() } // String creates a human readable version of the metadata (for debug purposes). @@ -168,12 +169,22 @@ func (payloadMetadata *PayloadMetadata) Update(other objectstorage.StorableObjec // ObjectStorageValue is required to match the encoding.BinaryMarshaler interface. func (payloadMetadata *PayloadMetadata) ObjectStorageValue() []byte { - return payloadMetadata.Bytes() + return marshalutil.New(marshalutil.TIME_SIZE + marshalutil.BOOL_SIZE). + WriteTime(payloadMetadata.solidificationTime). + WriteBool(payloadMetadata.solid). + Bytes() } // UnmarshalObjectStorageValue is required to match the encoding.BinaryUnmarshaler interface. func (payloadMetadata *PayloadMetadata) UnmarshalObjectStorageValue(data []byte) (err error, consumedBytes int) { - _, err, consumedBytes = PayloadMetadataFromBytes(data, payloadMetadata) + marshalUtil := marshalutil.New(data) + if payloadMetadata.solidificationTime, err = marshalUtil.ReadTime(); err != nil { + return + } + if payloadMetadata.solid, err = marshalUtil.ReadBool(); err != nil { + return + } + consumedBytes = marshalUtil.ReadOffset() return } diff --git a/packages/binary/valuetransfer/tangle/tangle.go b/packages/binary/valuetransfer/tangle/tangle.go index 01b00b5050ab4c3a8561f7b626a4f636b3d19282..f19490fdbc289cf388f1aba4f58b35ad9919cfd3 100644 --- a/packages/binary/valuetransfer/tangle/tangle.go +++ b/packages/binary/valuetransfer/tangle/tangle.go @@ -7,8 +7,11 @@ import ( "github.com/dgraph-io/badger/v2" "github.com/iotaledger/hive.go/async" "github.com/iotaledger/hive.go/objectstorage" + "github.com/iotaledger/hive.go/types" "github.com/iotaledger/goshimmer/packages/binary/storageprefix" + "github.com/iotaledger/goshimmer/packages/binary/valuetransfer/address" + "github.com/iotaledger/goshimmer/packages/binary/valuetransfer/balance" "github.com/iotaledger/goshimmer/packages/binary/valuetransfer/payload" "github.com/iotaledger/goshimmer/packages/binary/valuetransfer/transaction" ) @@ -16,17 +19,15 @@ import ( // Tangle represents the value tangle that consists out of value payloads. // It is an independent ontology, that lives inside the tangle. type Tangle struct { - storageId []byte - payloadStorage *objectstorage.ObjectStorage payloadMetadataStorage *objectstorage.ObjectStorage approverStorage *objectstorage.ObjectStorage missingPayloadStorage *objectstorage.ObjectStorage attachmentStorage *objectstorage.ObjectStorage - consumerStorage *objectstorage.ObjectStorage - transactionOutputMetadataStorage *objectstorage.ObjectStorage - missingOutputStorage *objectstorage.ObjectStorage + outputStorage *objectstorage.ObjectStorage + consumerStorage *objectstorage.ObjectStorage + missingOutputStorage *objectstorage.ObjectStorage Events Events @@ -35,24 +36,21 @@ type Tangle struct { cleanupWorkerPool async.WorkerPool } -func New(badgerInstance *badger.DB, storageId []byte) (result *Tangle) { - result = &Tangle{ - storageId: storageId, +func New(badgerInstance *badger.DB) (result *Tangle) { + osFactory := objectstorage.NewFactory(badgerInstance, storageprefix.ValueTransfers) + result = &Tangle{ // payload related storage - payloadStorage: objectstorage.New(badgerInstance, append(storageId, storageprefix.ValueTransferPayload...), payload.StorableObjectFromKey, objectstorage.CacheTime(time.Second)), - payloadMetadataStorage: objectstorage.New(badgerInstance, append(storageId, storageprefix.ValueTransferPayloadMetadata...), PayloadMetadataFromStorageKey, objectstorage.CacheTime(time.Second)), - missingPayloadStorage: objectstorage.New(badgerInstance, append(storageId, storageprefix.ValueTransferMissingPayload...), MissingPayloadFromStorageKey, objectstorage.CacheTime(time.Second)), - approverStorage: objectstorage.New(badgerInstance, append(storageId, storageprefix.ValueTransferApprover...), PayloadApproverFromStorageKey, objectstorage.CacheTime(time.Second), objectstorage.PartitionKey(payload.IdLength, payload.IdLength), objectstorage.KeysOnly(true)), - - // transaction related storage - transactionOutputMetadataStorage: objectstorage.New(badgerInstance, append(storageId, storageprefix.Layer0Approvers...), transaction.OutputFromStorageKey, objectstorage.CacheTime(time.Second)), - missingOutputStorage: objectstorage.New(badgerInstance, append(storageId, storageprefix.ValueTransferMissingPayload...), MissingOutputFromStorageKey, objectstorage.CacheTime(time.Second)), - consumerStorage: objectstorage.New(badgerInstance, append(storageId, storageprefix.ValueTransferConsumer...), transaction.OutputFromStorageKey, objectstorage.CacheTime(time.Second)), - - attachmentStorage: objectstorage.New(badgerInstance, append(storageId, storageprefix.ValueTransferAttachment...), AttachmentFromStorageKey, objectstorage.CacheTime(time.Second)), + payloadStorage: osFactory.New(osPayload, osPayloadFactory, objectstorage.CacheTime(time.Second)), + payloadMetadataStorage: osFactory.New(osPayloadMetadata, osPayloadMetadataFactory, objectstorage.CacheTime(time.Second)), + missingPayloadStorage: osFactory.New(osMissingPayload, osMissingPayloadFactory, objectstorage.CacheTime(time.Second)), + approverStorage: osFactory.New(osApprover, osPayloadApproverFactory, objectstorage.CacheTime(time.Second), objectstorage.PartitionKey(payload.IdLength, payload.IdLength), objectstorage.KeysOnly(true)), // transaction related storage + attachmentStorage: osFactory.New(osAttachment, osAttachmentFactory, objectstorage.CacheTime(time.Second)), + outputStorage: osFactory.New(osOutput, osOutputFactory, transaction.OutputKeyPartitions, objectstorage.CacheTime(time.Second)), + missingOutputStorage: osFactory.New(osMissingOutput, osMissingOutputFactory, MissingOutputKeyPartitions, objectstorage.CacheTime(time.Second)), + consumerStorage: osFactory.New(osConsumer, osConsumerFactory, ConsumerPartitionKeys, objectstorage.CacheTime(time.Second)), Events: *newEvents(), } @@ -80,18 +78,46 @@ func (tangle *Tangle) GetTransactionMetadata(transactionId transaction.Id) *Cach return &CachedTransactionMetadata{CachedObject: tangle.missingOutputStorage.Load(transactionId.Bytes())} } +func (tangle *Tangle) GetTransactionOutput(outputId transaction.OutputId) *transaction.CachedOutput { + return &transaction.CachedOutput{CachedObject: tangle.outputStorage.Load(outputId.Bytes())} +} + // GetApprovers retrieves the approvers of a payload from the object storage. -func (tangle *Tangle) GetApprovers(transactionId payload.Id) CachedApprovers { +func (tangle *Tangle) GetApprovers(payloadId payload.Id) CachedApprovers { approvers := make(CachedApprovers, 0) tangle.approverStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool { approvers = append(approvers, &CachedPayloadApprover{CachedObject: cachedObject}) return true - }, transactionId[:]) + }, payloadId.Bytes()) return approvers } +// GetApprovers retrieves the approvers of a payload from the object storage. +func (tangle *Tangle) GetConsumers(outputId transaction.OutputId) CachedConsumers { + consumers := make(CachedConsumers, 0) + tangle.consumerStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool { + consumers = append(consumers, &CachedConsumer{CachedObject: cachedObject}) + + return true + }, outputId.Bytes()) + + return consumers +} + +// GetApprovers retrieves the approvers of a payload from the object storage. +func (tangle *Tangle) GetAttachments(transactionId transaction.Id) CachedAttachments { + attachments := make(CachedAttachments, 0) + tangle.attachmentStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool { + attachments = append(attachments, &CachedAttachment{CachedObject: cachedObject}) + + return true + }, transactionId.Bytes()) + + return attachments +} + // Shutdown stops the worker pools and shuts down the object storage instances. func (tangle *Tangle) Shutdown() *Tangle { tangle.storePayloadWorkerPool.ShutdownGracefully() @@ -124,61 +150,85 @@ func (tangle *Tangle) Prune() error { // storePayloadWorker is the worker function that stores the payload and calls the corresponding storage events. func (tangle *Tangle) storePayloadWorker(payloadToStore *payload.Payload) { - // store payload - var cachedPayload *payload.CachedPayload + // store the payload and transaction models + cachedPayload, cachedPayloadMetadata, payloadStored := tangle.storePayload(payloadToStore) + if !payloadStored { + // abort if we have seen the payload already + return + } + cachedTransactionMetadata, transactionStored := tangle.storeTransaction(payloadToStore.Transaction()) + + // store the references between the different entities (we do this after the actual entities were stored, so that + // all the metadata models exist in the database as soon as the entities are reachable by walks). + tangle.storePayloadReferences(payloadToStore) + if transactionStored { + tangle.storeTransactionReferences(payloadToStore.Transaction()) + } + + // trigger events + if tangle.missingPayloadStorage.DeleteIfPresent(payloadToStore.Id().Bytes()) { + tangle.Events.MissingPayloadReceived.Trigger(cachedPayload, cachedPayloadMetadata) + } + tangle.Events.PayloadAttached.Trigger(cachedPayload, cachedPayloadMetadata) + + // check solidity + tangle.solidifierWorkerPool.Submit(func() { + tangle.solidifyTransactionWorker(cachedPayload, cachedPayloadMetadata, cachedTransactionMetadata) + }) +} + +func (tangle *Tangle) storePayload(payloadToStore *payload.Payload) (cachedPayload *payload.CachedPayload, cachedMetadata *CachedPayloadMetadata, payloadStored bool) { if _tmp, transactionIsNew := tangle.payloadStorage.StoreIfAbsent(payloadToStore); !transactionIsNew { return } else { cachedPayload = &payload.CachedPayload{CachedObject: _tmp} - } + cachedMetadata = &CachedPayloadMetadata{CachedObject: tangle.payloadMetadataStorage.Store(NewPayloadMetadata(payloadToStore.Id()))} + payloadStored = true - // store payload metadata - payloadId := payloadToStore.Id() - cachedMetadata := &CachedPayloadMetadata{CachedObject: tangle.payloadMetadataStorage.Store(NewPayloadMetadata(payloadId))} + return + } +} - // retrieve or store TransactionMetadata - newTransaction := false - transactionId := cachedPayload.Unwrap().Transaction().Id() - cachedTransactionMetadata := &CachedTransactionMetadata{CachedObject: tangle.payloadMetadataStorage.ComputeIfAbsent(transactionId.Bytes(), func(key []byte) objectstorage.StorableObject { - newTransaction = true +func (tangle *Tangle) storeTransaction(tx *transaction.Transaction) (cachedTransactionMetadata *CachedTransactionMetadata, transactionStored bool) { + cachedTransactionMetadata = &CachedTransactionMetadata{CachedObject: tangle.payloadMetadataStorage.ComputeIfAbsent(tx.Id().Bytes(), func(key []byte) objectstorage.StorableObject { + transactionStored = true - result := NewTransactionMetadata(transactionId) + result := NewTransactionMetadata(tx.Id()) result.Persist() result.SetModified() return result })} + if transactionStored { + tx.Outputs().ForEach(func(address address.Address, balances []*balance.Balance) { + tangle.outputStorage.Store(transaction.NewOutput(address, tx.Id(), balances)) + }) + } + + return +} + +func (tangle *Tangle) storePayloadReferences(payload *payload.Payload) { // store trunk approver - trunkId := payloadToStore.TrunkId() - tangle.approverStorage.Store(NewPayloadApprover(trunkId, payloadId)).Release() + trunkId := payload.TrunkId() + tangle.approverStorage.Store(NewPayloadApprover(trunkId, payload.Id())).Release() // store branch approver - if branchId := payloadToStore.BranchId(); branchId != trunkId { + if branchId := payload.BranchId(); branchId != trunkId { tangle.approverStorage.Store(NewPayloadApprover(branchId, trunkId)).Release() } - // store the consumers, the first time we see a Transaction - if newTransaction { - payloadToStore.Transaction().Inputs().ForEach(func(outputId transaction.OutputId) bool { - tangle.consumerStorage.Store(NewConsumer(outputId, transactionId)) - - return true - }) - } + // store a reference from the transaction to the payload that attached it + tangle.attachmentStorage.Store(NewAttachment(payload.Transaction().Id(), payload.Id())) +} - // store attachment - tangle.attachmentStorage.StoreIfAbsent(NewAttachment(payloadToStore.Transaction().Id(), payloadToStore.Id())) +func (tangle *Tangle) storeTransactionReferences(tx *transaction.Transaction) { + // store references to the consumed outputs + tx.Inputs().ForEach(func(outputId transaction.OutputId) bool { + tangle.consumerStorage.Store(NewConsumer(outputId, tx.Id())) - // trigger events - if tangle.missingPayloadStorage.DeleteIfPresent(payloadId.Bytes()) { - tangle.Events.MissingPayloadReceived.Trigger(cachedPayload, cachedMetadata) - } - tangle.Events.PayloadAttached.Trigger(cachedPayload, cachedMetadata) - - // check solidity - tangle.solidifierWorkerPool.Submit(func() { - tangle.solidifyTransactionWorker(cachedPayload, cachedMetadata, cachedTransactionMetadata) + return true }) } @@ -200,57 +250,94 @@ func (tangle *Tangle) solidifyTransactionWorker(cachedPayload *payload.CachedPay // process payloads that are supposed to be checked for solidity recursively for solidificationStack.Len() > 0 { - currentCachedPayload, currentCachedMetadata, currentCachedTransactionMetadata := popElementsFromStack(solidificationStack) - - currentPayload := currentCachedPayload.Unwrap() - currentPayloadMetadata := currentCachedMetadata.Unwrap() - currentTransaction := currentPayload.Transaction() - currentTransactionMetadata := currentCachedTransactionMetadata.Unwrap() - if currentPayload == nil || currentPayloadMetadata == nil || currentTransactionMetadata == nil { - currentCachedPayload.Release() - currentCachedMetadata.Release() - currentCachedTransactionMetadata.Release() - - continue - } + // execute logic inside a func, so we can use defer to release the objects + func() { + // retrieve cached objects + currentCachedPayload, currentCachedMetadata, currentCachedTransactionMetadata := popElementsFromStack(solidificationStack) + defer currentCachedPayload.Release() + defer currentCachedMetadata.Release() + defer currentCachedTransactionMetadata.Release() + + // unwrap cached objects + currentPayload := currentCachedPayload.Unwrap() + currentPayloadMetadata := currentCachedMetadata.Unwrap() + currentTransactionMetadata := currentCachedTransactionMetadata.Unwrap() + currentTransaction := currentPayload.Transaction() + + // abort if any of the retrieved models is nil + if currentPayload == nil || currentPayloadMetadata == nil || currentTransactionMetadata == nil { + return + } + + // abort if the entities are not solid + if !tangle.isPayloadSolid(currentPayload, currentPayloadMetadata) || !tangle.isTransactionSolid(currentTransaction, currentTransactionMetadata) { + return + } - // if current transaction and payload are solid ... - if tangle.isPayloadSolid(currentPayload, currentPayloadMetadata) && tangle.isTransactionSolid(currentTransaction, currentTransactionMetadata) { + // abort if the payload was marked as solid already (if a payload is solid already then the tx is also solid) payloadBecameSolid := currentPayloadMetadata.SetSolid(true) + if !payloadBecameSolid { + return + } + + // set the transaction related entities to be solid transactionBecameSolid := currentTransactionMetadata.SetSolid(true) + if transactionBecameSolid { + currentTransaction.Outputs().ForEach(func(address address.Address, balances []*balance.Balance) { + tangle.GetTransactionOutput(transaction.NewOutputId(address, currentTransaction.Id())).Consume(func(output *transaction.Output) { + output.SetSolid(true) + }) + }) + } - // if payload was marked as solid the first time ... - if payloadBecameSolid { - tangle.Events.PayloadSolid.Trigger(currentCachedPayload, currentCachedMetadata) + // ... trigger solid event ... + tangle.Events.PayloadSolid.Trigger(currentCachedPayload, currentCachedMetadata) - tangle.GetApprovers(currentPayload.Id()).Consume(func(approver *PayloadApprover) { - approvingPayloadId := approver.GetApprovingPayloadId() - approvingCachedPayload := tangle.GetPayload(approvingPayloadId) + // ... and schedule check of approvers + tangle.GetApprovers(currentPayload.Id()).Consume(func(approver *PayloadApprover) { + approvingPayloadId := approver.GetApprovingPayloadId() + approvingCachedPayload := tangle.GetPayload(approvingPayloadId) - approvingCachedPayload.Consume(func(payload *payload.Payload) { - solidificationStack.PushBack([3]interface{}{ - approvingCachedPayload, - tangle.GetPayloadMetadata(approvingPayloadId), - tangle.GetTransactionMetadata(payload.Transaction().Id()), - }) + approvingCachedPayload.Consume(func(payload *payload.Payload) { + solidificationStack.PushBack([3]interface{}{ + approvingCachedPayload, + tangle.GetPayloadMetadata(approvingPayloadId), + tangle.GetTransactionMetadata(payload.Transaction().Id()), }) }) + }) + + if !transactionBecameSolid { + return } - if transactionBecameSolid { - tangle.Events.TransactionSolid.Trigger(currentTransaction, currentTransactionMetadata) + tangle.Events.TransactionSolid.Trigger(currentTransaction, currentTransactionMetadata) - currentTransaction.Inputs().ForEach(func(outputId transaction.OutputId) bool { - return true + seenTransactions := make(map[transaction.Id]types.Empty) + currentTransaction.Outputs().ForEach(func(address address.Address, balances []*balance.Balance) { + tangle.GetTransactionOutput(transaction.NewOutputId(address, currentTransaction.Id())).Consume(func(output *transaction.Output) { + // trigger events }) - //tangle.GetConsumers(outputId) - } - } - // release cached objects - currentCachedPayload.Release() - currentCachedMetadata.Release() - currentCachedTransactionMetadata.Release() + tangle.GetConsumers(transaction.NewOutputId(address, currentTransaction.Id())).Consume(func(consumer *Consumer) { + // keep track of the processed transactions (the same transaction can consume multiple outputs) + if _, transactionSeen := seenTransactions[consumer.TransactionId()]; transactionSeen { + seenTransactions[consumer.TransactionId()] = types.Void + + transactionMetadata := tangle.GetTransactionMetadata(consumer.TransactionId()) + + // retrieve all the payloads that attached the transaction + tangle.GetAttachments(consumer.TransactionId()).Consume(func(attachment *Attachment) { + solidificationStack.PushBack([3]interface{}{ + tangle.GetPayload(attachment.PayloadId()), + tangle.GetPayloadMetadata(attachment.PayloadId()), + transactionMetadata, + }) + }) + } + }) + }) + }() } } @@ -271,17 +358,13 @@ func (tangle *Tangle) isTransactionSolid(transaction *transaction.Transaction, m return transaction.Inputs().ForEach(tangle.isOutputMarkedAsSolid) } -func (tangle *Tangle) GetTransferOutputMetadata(transactionOutputId transaction.OutputId) *CachedTransactionOutputMetadata { - return &CachedTransactionOutputMetadata{CachedObject: tangle.transactionOutputMetadataStorage.Load(transactionOutputId.Bytes())} -} - -func (tangle *Tangle) isOutputMarkedAsSolid(transferOutputId transaction.OutputId) (result bool) { - objectConsumed := tangle.GetTransferOutputMetadata(transferOutputId).Consume(func(transferOutputMetadata *TransactionOutputMetadata) { - result = transferOutputMetadata.Solid() +func (tangle *Tangle) isOutputMarkedAsSolid(transactionOutputId transaction.OutputId) (result bool) { + outputExists := tangle.GetTransactionOutput(transactionOutputId).Consume(func(output *transaction.Output) { + result = output.Solid() }) - if !objectConsumed { - if cachedMissingOutput, missingOutputStored := tangle.missingOutputStorage.StoreIfAbsent(NewMissingOutput(transferOutputId)); missingOutputStored { + if !outputExists { + if cachedMissingOutput, missingOutputStored := tangle.missingOutputStorage.StoreIfAbsent(NewMissingOutput(transactionOutputId)); missingOutputStored { cachedMissingOutput.Consume(func(object objectstorage.StorableObject) { tangle.Events.OutputMissing.Trigger(object.(*MissingOutput).Id()) }) diff --git a/packages/binary/valuetransfer/tangle/tangle_test.go b/packages/binary/valuetransfer/tangle/tangle_test.go index f2f371ecfa423785c3258768f6012c384a79dced..2b21059b6437736a6dca02037aef47004ebf9aa7 100644 --- a/packages/binary/valuetransfer/tangle/tangle_test.go +++ b/packages/binary/valuetransfer/tangle/tangle_test.go @@ -45,7 +45,7 @@ func TestTangle_AttachPayload(t *testing.T) { config.Node.Set(database.CFG_DIRECTORY, dir) - tangle := New(database.GetBadgerInstance(), []byte("TEST_BINARY_TANGLE")) + tangle := New(database.GetBadgerInstance()) if err := tangle.Prune(); err != nil { t.Error(err) diff --git a/packages/binary/valuetransfer/tangle/transactionoutputmetadata.go b/packages/binary/valuetransfer/tangle/transactionoutputmetadata.go deleted file mode 100644 index a4c7e97684cdb1cd3defa5fcf04622f059869f5d..0000000000000000000000000000000000000000 --- a/packages/binary/valuetransfer/tangle/transactionoutputmetadata.go +++ /dev/null @@ -1,215 +0,0 @@ -package tangle - -import ( - "sync" - "time" - - "github.com/iotaledger/hive.go/marshalutil" - "github.com/iotaledger/hive.go/objectstorage" - "github.com/iotaledger/hive.go/stringify" - - "github.com/iotaledger/goshimmer/packages/binary/valuetransfer/transaction" -) - -// TransactionOutputMetadata contains the information of a transaction output, that are based on our local perception of things (i.e. if it -// is solid, or when we it became solid). -type TransactionOutputMetadata struct { - objectstorage.StorableObjectFlags - - id transaction.OutputId - solid bool - solidificationTime time.Time - - solidMutex sync.RWMutex - solidificationTimeMutex sync.RWMutex -} - -// NewOutputMetadata is the constructor for the TransactionOutputMetadata type. -func NewTransactionOutputMetadata(outputId transaction.OutputId) *TransactionOutputMetadata { - return &TransactionOutputMetadata{ - id: outputId, - } -} - -// TransactionOutputMetadataFromBytes unmarshals a TransactionOutputMetadata object from a sequence of bytes. -// It either creates a new object or fills the optionally provided object with the parsed information. -func TransactionOutputMetadataFromBytes(bytes []byte, optionalTargetObject ...*TransactionOutputMetadata) (result *TransactionOutputMetadata, err error, consumedBytes int) { - // determine the target object that will hold the unmarshaled information - switch len(optionalTargetObject) { - case 0: - result = &TransactionOutputMetadata{} - case 1: - result = optionalTargetObject[0] - default: - panic("too many arguments in call to TransactionOutputMetadataFromBytes") - } - - // parse the bytes - marshalUtil := marshalutil.New(bytes) - if result.id, err = transaction.ParseOutputId(marshalUtil); err != nil { - return - } - if result.solidificationTime, err = marshalUtil.ReadTime(); err != nil { - return - } - if result.solid, err = marshalUtil.ReadBool(); err != nil { - return - } - consumedBytes = marshalUtil.ReadOffset() - - return -} - -// TransactionOutputMetadataFromStorage is the factory method for TransactionOutputMetadata objects stored in the objectstorage. The bytes and the content -// will be filled by the objectstorage, by subsequently calling ObjectStorageValue. -func TransactionOutputMetadataFromStorage(storageKey []byte) objectstorage.StorableObject { - result := &TransactionOutputMetadata{} - - var err error - if result.id, err = transaction.ParseOutputId(marshalutil.New(storageKey)); err != nil { - panic(err) - } - - return result -} - -// Parse is a wrapper for simplified unmarshaling of TransactionOutputMetadata objects from a byte stream using the marshalUtil package. -func ParseTransactionOutputMetadata(marshalUtil *marshalutil.MarshalUtil) (*TransactionOutputMetadata, error) { - if outputMetadata, err := marshalUtil.Parse(func(data []byte) (interface{}, error, int) { return TransactionOutputMetadataFromBytes(data) }); err != nil { - return nil, err - } else { - return outputMetadata.(*TransactionOutputMetadata), nil - } -} - -// OutputId returns the id of the Output that this TransactionOutputMetadata is associated to. -func (transactionOutputMetadata *TransactionOutputMetadata) Id() transaction.OutputId { - return transactionOutputMetadata.id -} - -// Solid returns true if the Output has been marked as solid. -func (transactionOutputMetadata *TransactionOutputMetadata) Solid() (result bool) { - transactionOutputMetadata.solidMutex.RLock() - result = transactionOutputMetadata.solid - transactionOutputMetadata.solidMutex.RUnlock() - - return -} - -// SetSolid marks a Output as either solid or not solid. -// It returns true if the solid flag was changes and automatically updates the solidificationTime as well. -func (transactionOutputMetadata *TransactionOutputMetadata) SetSolid(solid bool) (modified bool) { - transactionOutputMetadata.solidMutex.RLock() - if transactionOutputMetadata.solid != solid { - transactionOutputMetadata.solidMutex.RUnlock() - - transactionOutputMetadata.solidMutex.Lock() - if transactionOutputMetadata.solid != solid { - transactionOutputMetadata.solid = solid - if solid { - transactionOutputMetadata.solidificationTimeMutex.Lock() - transactionOutputMetadata.solidificationTime = time.Now() - transactionOutputMetadata.solidificationTimeMutex.Unlock() - } - - transactionOutputMetadata.SetModified() - - modified = true - } - transactionOutputMetadata.solidMutex.Unlock() - - } else { - transactionOutputMetadata.solidMutex.RUnlock() - } - - return -} - -// SoldificationTime returns the time when the Output was marked to be solid. -func (transactionOutputMetadata *TransactionOutputMetadata) SoldificationTime() time.Time { - transactionOutputMetadata.solidificationTimeMutex.RLock() - defer transactionOutputMetadata.solidificationTimeMutex.RUnlock() - - return transactionOutputMetadata.solidificationTime -} - -// Bytes marshals the TransactionOutputMetadata object into a sequence of bytes. -func (transactionOutputMetadata *TransactionOutputMetadata) Bytes() []byte { - marshalUtil := marshalutil.New() - - marshalUtil.WriteBytes(transactionOutputMetadata.id.Bytes()) - marshalUtil.WriteTime(transactionOutputMetadata.solidificationTime) - marshalUtil.WriteBool(transactionOutputMetadata.solid) - - return marshalUtil.Bytes() -} - -// String creates a human readable version of the metadata (for debug purposes). -func (transactionOutputMetadata *TransactionOutputMetadata) String() string { - return stringify.Struct("transaction.TransactionOutputMetadata", - stringify.StructField("payloadId", transactionOutputMetadata.Id()), - stringify.StructField("solid", transactionOutputMetadata.Solid()), - stringify.StructField("solidificationTime", transactionOutputMetadata.SoldificationTime()), - ) -} - -// ObjectStorageKey returns the key that is used to identify the TransactionOutputMetadata in the objectstorage. -func (transactionOutputMetadata *TransactionOutputMetadata) ObjectStorageKey() []byte { - return transactionOutputMetadata.id.Bytes() -} - -// ObjectStorageValue returns the bytes, that are stored in the value part of the k/v store. -func (transactionOutputMetadata *TransactionOutputMetadata) ObjectStorageValue() []byte { - return transactionOutputMetadata.Bytes() -} - -// UnmarshalObjectStorageValue restores the values of a TransactionOutputMetadata object from a sequence of bytes and matches the -// encoding.BinaryUnmarshaler interface. -func (transactionOutputMetadata *TransactionOutputMetadata) UnmarshalObjectStorageValue(data []byte) (err error, consumedBytes int) { - _, err, consumedBytes = TransactionOutputMetadataFromBytes(data, transactionOutputMetadata) - - return -} - -// Update is disabled and panics if it ever gets called - updates are supposed to happen through the setters. -func (transactionOutputMetadata *TransactionOutputMetadata) Update(other objectstorage.StorableObject) { - panic("update forbidden") -} - -// Interface contract: make compiler warn if the interface is not implemented correctly. -var _ objectstorage.StorableObject = &TransactionOutputMetadata{} - -// CachedTransactionOutputMetadata is a wrapper for the object storage, that takes care of type casting the TransactionOutputMetadata objects. -// Since go does not have generics (yet), the object storage works based on the generic "interface{}" type, which means -// that we have to regularly type cast the returned objects, to match the expected type. To reduce the burden of -// manually managing these type, we create a wrapper that does this for us. This way, we can consistently handle the -// specialized types of TransactionOutputMetadata, without having to manually type cast over and over again. -type CachedTransactionOutputMetadata struct { - objectstorage.CachedObject -} - -// Retain overrides the underlying method to return a new CachedTransactionOutputMetadata instead of a generic CachedObject. -func (cachedOutputMetadata *CachedTransactionOutputMetadata) Retain() *CachedTransactionOutputMetadata { - return &CachedTransactionOutputMetadata{cachedOutputMetadata.CachedObject.Retain()} -} - -// Consume overrides the underlying method to use a CachedTransactionOutputMetadata object instead of a generic CachedObject in the -// consumer). -func (cachedOutputMetadata *CachedTransactionOutputMetadata) Consume(consumer func(outputMetadata *TransactionOutputMetadata)) bool { - return cachedOutputMetadata.CachedObject.Consume(func(object objectstorage.StorableObject) { - consumer(object.(*TransactionOutputMetadata)) - }) -} - -// Unwrap provides a way to retrieve a type casted version of the underlying object. -func (cachedOutputMetadata *CachedTransactionOutputMetadata) Unwrap() *TransactionOutputMetadata { - if untypedTransaction := cachedOutputMetadata.Get(); untypedTransaction == nil { - return nil - } else { - if typeCastedTransaction := untypedTransaction.(*TransactionOutputMetadata); typeCastedTransaction == nil || typeCastedTransaction.IsDeleted() { - return nil - } else { - return typeCastedTransaction - } - } -} diff --git a/packages/binary/valuetransfer/transaction/output.go b/packages/binary/valuetransfer/transaction/output.go index a5d0aab1908952bc29b8634f666c083579504fce..78535f4f246b161c54bd13dad520ca3f9e0bad36 100644 --- a/packages/binary/valuetransfer/transaction/output.go +++ b/packages/binary/valuetransfer/transaction/output.go @@ -1,6 +1,7 @@ package transaction import ( + "sync" "time" "github.com/iotaledger/hive.go/marshalutil" @@ -10,13 +11,18 @@ import ( "github.com/iotaledger/goshimmer/packages/binary/valuetransfer/balance" ) +var OutputKeyPartitions = objectstorage.PartitionKey([]int{address.Length, IdLength}...) + // Output represents the output of a Transaction and contains the balances and the identifiers for this output. type Output struct { - address address.Address - transactionId Id - solid bool - solidSince time.Time - balances []*balance.Balance + address address.Address + transactionId Id + solid bool + solidificationTime time.Time + balances []*balance.Balance + + solidMutex sync.RWMutex + solidificationTimeMutex sync.RWMutex objectstorage.StorableObjectFlags storageKey []byte @@ -25,11 +31,11 @@ type Output struct { // NewOutput creates an Output that contains the balances and identifiers of a Transaction. func NewOutput(address address.Address, transactionId Id, balances []*balance.Balance) *Output { return &Output{ - address: address, - transactionId: transactionId, - solid: false, - solidSince: time.Time{}, - balances: balances, + address: address, + transactionId: transactionId, + solid: false, + solidificationTime: time.Time{}, + balances: balances, storageKey: marshalutil.New().WriteBytes(address.Bytes()).WriteBytes(transactionId.Bytes()).Bytes(), } @@ -38,6 +44,39 @@ func NewOutput(address address.Address, transactionId Id, balances []*balance.Ba // OutputFromBytes unmarshals an Output object from a sequence of bytes. // It either creates a new object or fills the optionally provided object with the parsed information. func OutputFromBytes(bytes []byte, optionalTargetObject ...*Output) (result *Output, err error, consumedBytes int) { + marshalUtil := marshalutil.New(bytes) + result, err = ParseOutput(marshalUtil, optionalTargetObject...) + consumedBytes = marshalUtil.ReadOffset() + + return +} + +func ParseOutput(marshalUtil *marshalutil.MarshalUtil, optionalTargetObject ...*Output) (result *Output, err error) { + if parsedObject, parseErr := marshalUtil.Parse(func(data []byte) (interface{}, error, int) { + return OutputFromStorageKey(data, optionalTargetObject...) + }); parseErr != nil { + err = parseErr + + return + } else { + result = parsedObject.(*Output) + } + + if _, err = marshalUtil.Parse(func(data []byte) (parseResult interface{}, parseErr error, parsedBytes int) { + parseErr, parsedBytes = result.UnmarshalObjectStorageValue(data) + + return + }); err != nil { + return + } + + return +} + +// OutputFromStorageKey get's called when we restore a Output from the storage. +// In contrast to other database models, it unmarshals some information from the key so we simply store the key before +// it gets handed over to UnmarshalObjectStorageValue (by the ObjectStorage). +func OutputFromStorageKey(keyBytes []byte, optionalTargetObject ...*Output) (result *Output, err error, consumedBytes int) { // determine the target object that will hold the unmarshaled information switch len(optionalTargetObject) { case 0: @@ -48,47 +87,22 @@ func OutputFromBytes(bytes []byte, optionalTargetObject ...*Output) (result *Out panic("too many arguments in call to OutputFromBytes") } - // parse the bytes - marshalUtil := marshalutil.New(bytes) - if result.address, err = address.Parse(marshalUtil); err != nil { + // parse information + marshalUtil := marshalutil.New(keyBytes) + result.address, err = address.Parse(marshalUtil) + if err != nil { return } - if result.transactionId, err = ParseId(marshalUtil); err != nil { + result.transactionId, err = ParseId(marshalUtil) + if err != nil { return } - if result.solid, err = marshalUtil.ReadBool(); err != nil { - return - } - if result.solidSince, err = marshalUtil.ReadTime(); err != nil { - return - } - var balanceCount uint32 - if balanceCount, err = marshalUtil.ReadUint32(); err != nil { - return - } else { - result.balances = make([]*balance.Balance, balanceCount) - for i := uint32(0); i < balanceCount; i++ { - result.balances[i], err = balance.Parse(marshalUtil) - if err != nil { - return - } - } - } - result.storageKey = marshalutil.New().WriteBytes(result.address.Bytes()).WriteBytes(result.transactionId.Bytes()).Bytes() + result.storageKey = marshalutil.New(keyBytes[:OutputIdLength]).Bytes(true) consumedBytes = marshalUtil.ReadOffset() return } -// OutputFromStorageKey get's called when we restore a Output from the storage. -// In contrast to other database models, it unmarshals some information from the key so we simply store the key before -// it gets handed over to UnmarshalObjectStorageValue (by the ObjectStorage). -func OutputFromStorageKey(keyBytes []byte) (result objectstorage.StorableObject, err error, consumedBytes int) { - return &Output{ - storageKey: keyBytes[:OutputIdLength], - }, nil, OutputIdLength -} - // Address returns the address that this output belongs to. func (output *Output) Address() address.Address { return output.address @@ -99,11 +113,61 @@ func (output *Output) TransactionId() Id { return output.transactionId } +// Solid returns true if the output has been marked as solid. +func (output *Output) Solid() bool { + output.solidMutex.RLock() + defer output.solidMutex.RUnlock() + + return output.solid +} + +func (output *Output) SetSolid(solid bool) (modified bool) { + output.solidMutex.RLock() + if output.solid != solid { + output.solidMutex.RUnlock() + + output.solidMutex.Lock() + if output.solid != solid { + output.solid = solid + if solid { + output.solidificationTimeMutex.Lock() + output.solidificationTime = time.Now() + output.solidificationTimeMutex.Unlock() + } + + output.SetModified() + + modified = true + } + output.solidMutex.Unlock() + + } else { + output.solidMutex.RUnlock() + } + + return +} + +func (output *Output) SolidificationTime() time.Time { + output.solidificationTimeMutex.RLock() + defer output.solidificationTimeMutex.RUnlock() + + return output.solidificationTime +} + // Balances returns the colored balances (color + balance) that this output contains. func (output *Output) Balances() []*balance.Balance { return output.balances } +// Bytes marshals the object into a sequence of bytes. +func (output *Output) Bytes() []byte { + return marshalutil.New(). + WriteBytes(output.ObjectStorageKey()). + WriteBytes(output.ObjectStorageValue()). + Bytes() +} + // ObjectStorageKey returns the key that is used to store the object in the database. // It is required to match StorableObject interface. func (output *Output) ObjectStorageKey() []byte { @@ -115,26 +179,45 @@ func (output *Output) ObjectStorageKey() []byte { // ObjectStorageValue marshals the balances into a sequence of bytes - the address and transaction id are stored inside the key // and are ignored here. -func (output *Output) ObjectStorageValue() (data []byte) { +func (output *Output) ObjectStorageValue() []byte { // determine amount of balances in the output balanceCount := len(output.balances) // initialize helper marshalUtil := marshalutil.New(marshalutil.BOOL_SIZE + marshalutil.TIME_SIZE + marshalutil.UINT32_SIZE + balanceCount*balance.Length) marshalUtil.WriteBool(output.solid) - marshalUtil.WriteTime(output.solidSince) + marshalUtil.WriteTime(output.solidificationTime) marshalUtil.WriteUint32(uint32(balanceCount)) for _, balanceToMarshal := range output.balances { marshalUtil.WriteBytes(balanceToMarshal.Bytes()) } - return + return marshalUtil.Bytes() } // UnmarshalObjectStorageValue restores a Output from a serialized version in the ObjectStorage with parts of the object // being stored in its key rather than the content of the database to reduce storage requirements. func (output *Output) UnmarshalObjectStorageValue(data []byte) (err error, consumedBytes int) { - _, err, consumedBytes = OutputFromBytes(marshalutil.New(output.storageKey).WriteBytes(data).Bytes(), output) + marshalUtil := marshalutil.New(data) + if output.solid, err = marshalUtil.ReadBool(); err != nil { + return + } + if output.solidificationTime, err = marshalUtil.ReadTime(); err != nil { + return + } + var balanceCount uint32 + if balanceCount, err = marshalUtil.ReadUint32(); err != nil { + return + } else { + output.balances = make([]*balance.Balance, balanceCount) + for i := uint32(0); i < balanceCount; i++ { + output.balances[i], err = balance.Parse(marshalUtil) + if err != nil { + return + } + } + } + consumedBytes = marshalUtil.ReadOffset() return } @@ -146,3 +229,41 @@ func (output *Output) Update(other objectstorage.StorableObject) { // define contract (ensure that the struct fulfills the given interface) var _ objectstorage.StorableObject = &Output{} + +// region CachedOutput ///////////////////////////////////////////////////////////////////////////////////////////////// + +type CachedOutput struct { + objectstorage.CachedObject +} + +func (cachedOutput *CachedOutput) Unwrap() *Output { + if untypedObject := cachedOutput.Get(); untypedObject == nil { + return nil + } else { + if typedObject := untypedObject.(*Output); typedObject == nil || typedObject.IsDeleted() { + return nil + } else { + return typedObject + } + } +} + +func (cachedOutput *CachedOutput) Consume(consumer func(output *Output)) (consumed bool) { + return cachedOutput.CachedObject.Consume(func(object objectstorage.StorableObject) { + consumer(object.(*Output)) + }) +} + +type CachedOutputs []*CachedOutput + +func (cachedOutputs CachedOutputs) Consume(consumer func(output *Output)) (consumed bool) { + for _, cachedOutput := range cachedOutputs { + consumed = cachedOutput.Consume(func(output *Output) { + consumer(output) + }) || consumed + } + + return +} + +// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/packages/binary/valuetransfer/transaction/output_test.go b/packages/binary/valuetransfer/transaction/output_test.go index 305409e848b1e01d756a196f19f8380fce180d61..d926bc72f4ac9aa42a5e498b25d0cecca6222de9 100644 --- a/packages/binary/valuetransfer/transaction/output_test.go +++ b/packages/binary/valuetransfer/transaction/output_test.go @@ -2,8 +2,43 @@ package transaction import ( "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/iotaledger/goshimmer/packages/binary/valuetransfer/address" + "github.com/iotaledger/goshimmer/packages/binary/valuetransfer/balance" ) func TestNew(t *testing.T) { + randomAddress := address.Random() + randomTransactionId := RandomId() + + output := NewOutput(randomAddress, randomTransactionId, []*balance.Balance{ + balance.New(balance.COLOR_IOTA, 1337), + }) + + assert.Equal(t, randomAddress, output.Address()) + assert.Equal(t, randomTransactionId, output.TransactionId()) + assert.Equal(t, false, output.Solid()) + assert.Equal(t, time.Time{}, output.SolidificationTime()) + assert.Equal(t, []*balance.Balance{ + balance.New(balance.COLOR_IOTA, 1337), + }, output.Balances()) + + assert.Equal(t, true, output.SetSolid(true)) + assert.Equal(t, false, output.SetSolid(true)) + assert.Equal(t, true, output.Solid()) + assert.NotEqual(t, time.Time{}, output.SolidificationTime()) + + clonedOutput, err, _ := OutputFromBytes(output.Bytes()) + if err != nil { + panic(err) + } + assert.Equal(t, output.Address(), clonedOutput.Address()) + assert.Equal(t, output.TransactionId(), clonedOutput.TransactionId()) + assert.Equal(t, output.Solid(), clonedOutput.Solid()) + assert.Equal(t, output.SolidificationTime().Round(time.Second), clonedOutput.SolidificationTime().Round(time.Second)) + assert.Equal(t, output.Balances(), clonedOutput.Balances()) } diff --git a/packages/binary/valuetransfer/transaction/transaction.go b/packages/binary/valuetransfer/transaction/transaction.go index 0d685c6055cddad72ce21c3897e35c4302744a0a..04e3ab016273f6e34586f595f58aa4b8aefdbfc4 100644 --- a/packages/binary/valuetransfer/transaction/transaction.go +++ b/packages/binary/valuetransfer/transaction/transaction.go @@ -133,6 +133,10 @@ func (transaction *Transaction) Inputs() *Inputs { return transaction.inputs } +func (transaction *Transaction) Outputs() *Outputs { + return transaction.outputs +} + func (transaction *Transaction) SignaturesValid() bool { signaturesValid := true transaction.inputs.ForEachAddress(func(address address.Address) bool { diff --git a/plugins/messagelayer/plugin.go b/plugins/messagelayer/plugin.go index 1957f750d66ee2a73200b2381542fb2dc10a25d8..7e0ad510256e4cc765b7311a07d291baa56319b9 100644 --- a/plugins/messagelayer/plugin.go +++ b/plugins/messagelayer/plugin.go @@ -14,7 +14,6 @@ import ( "github.com/iotaledger/goshimmer/packages/binary/messagelayer/tipselector" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/transactionparser" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/transactionrequester" - "github.com/iotaledger/goshimmer/packages/binary/storageprefix" "github.com/iotaledger/goshimmer/packages/database" "github.com/iotaledger/goshimmer/packages/shutdown" "github.com/iotaledger/goshimmer/plugins/autopeering/local" @@ -46,7 +45,7 @@ func configure(*node.Plugin) { TransactionParser = transactionparser.New() TransactionRequester = transactionrequester.New() TipSelector = tipselector.New() - Tangle = tangle.New(database.GetBadgerInstance(), storageprefix.MainNet) + Tangle = tangle.New(database.GetBadgerInstance()) // Setup MessageFactory (behavior + logging)) MessageFactory = messagefactory.New(database.GetBadgerInstance(), local.GetInstance().LocalIdentity(), TipSelector, []byte(DB_SEQUENCE_NUMBER))