Skip to content
Snippets Groups Projects
Unverified Commit abcba5b4 authored by Acha Bill's avatar Acha Bill Committed by GitHub
Browse files

Merge pull request #947 from iotaledger/refactor/attachment

refactor: attachments
parents dae85296 bea5c328
Branches develop
No related tags found
No related merge requests found
package tangle
import (
"github.com/iotaledger/goshimmer/packages/ledgerstate"
"github.com/iotaledger/hive.go/byteutils"
"github.com/iotaledger/hive.go/marshalutil"
"github.com/iotaledger/hive.go/objectstorage"
"github.com/iotaledger/hive.go/stringify"
"golang.org/x/xerrors"
)
// Attachment stores the information which transaction was attached by which message. We need this to be able to perform
// reverse lookups from transactions to their corresponding messages that attach them.
type Attachment struct {
objectstorage.StorableObjectFlags
transactionID ledgerstate.TransactionID
messageID MessageID
}
// NewAttachment creates an attachment object with the given information.
func NewAttachment(transactionID ledgerstate.TransactionID, messageID MessageID) *Attachment {
return &Attachment{
transactionID: transactionID,
messageID: messageID,
}
}
// AttachmentFromBytes unmarshals an Attachment from a sequence of bytes - it either creates a new object or fills the
// optionally provided one with the parsed information.
func AttachmentFromBytes(bytes []byte) (result *Attachment, consumedBytes int, err error) {
marshalUtil := marshalutil.New(bytes)
result, err = ParseAttachment(marshalUtil)
consumedBytes = marshalUtil.ReadOffset()
return
}
// ParseAttachment is a wrapper for simplified unmarshaling of Attachments from a byte stream using the marshalUtil package.
func ParseAttachment(marshalUtil *marshalutil.MarshalUtil) (result *Attachment, err error) {
result = &Attachment{}
if result.transactionID, err = ledgerstate.TransactionIDFromMarshalUtil(marshalUtil); err != nil {
err = xerrors.Errorf("failed to parse transaction ID in attachment: %w", err)
return
}
if result.messageID, err = MessageIDFromMarshalUtil(marshalUtil); err != nil {
err = xerrors.Errorf("failed to parse message ID in attachment: %w", err)
return
}
return
}
// AttachmentFromObjectStorage gets called when we restore an Attachment from the storage - it parses the key bytes and
// returns the new object.
func AttachmentFromObjectStorage(key []byte, _ []byte) (result objectstorage.StorableObject, err error) {
result, _, err = AttachmentFromBytes(key)
if err != nil {
err = xerrors.Errorf("failed to parse attachment from object storage: %w", err)
}
return
}
// TransactionID returns the transactionID of this Attachment.
func (a *Attachment) TransactionID() ledgerstate.TransactionID {
return a.transactionID
}
// MessageID returns the messageID of this Attachment.
func (a *Attachment) MessageID() MessageID {
return a.messageID
}
// Bytes marshals the Attachment into a sequence of bytes.
func (a *Attachment) Bytes() []byte {
return a.ObjectStorageKey()
}
// String returns a human readable version of the Attachment.
func (a *Attachment) String() string {
return stringify.Struct("Attachment",
stringify.StructField("transactionId", a.TransactionID()),
stringify.StructField("messageID", a.MessageID()),
)
}
// ObjectStorageKey returns the key that is used to store the object in the database.
func (a *Attachment) ObjectStorageKey() []byte {
return byteutils.ConcatBytes(a.transactionID.Bytes(), a.MessageID().Bytes())
}
// ObjectStorageValue marshals the "content part" of an Attachment to a sequence of bytes. Since all of the information
// for this object are stored in its key, this method does nothing and is only required to conform with the interface.
func (a *Attachment) ObjectStorageValue() (data []byte) {
return
}
// Update is disabled - updates are supposed to happen through the setters (if existing).
func (a *Attachment) Update(other objectstorage.StorableObject) {
panic("update forbidden")
}
// Interface contract: make compiler warn if the interface is not implemented correctly.
var _ objectstorage.StorableObject = &Attachment{}
// AttachmentLength holds the length of a marshaled Attachment in bytes.
const AttachmentLength = ledgerstate.TransactionIDLength + MessageIDLength
// region CachedAttachment /////////////////////////////////////////////////////////////////////////////////////////////
// CachedAttachment is a wrapper for the generic CachedObject returned by the objectstorage, that overrides the accessor
// methods, with a type-casted one.
type CachedAttachment struct {
objectstorage.CachedObject
}
// Retain marks this CachedObject to still be in use by the program.
func (cachedAttachment *CachedAttachment) Retain() *CachedAttachment {
return &CachedAttachment{cachedAttachment.CachedObject.Retain()}
}
// Unwrap is the type-casted equivalent of Get. It returns nil if the object does not exist.
func (cachedAttachment *CachedAttachment) Unwrap() *Attachment {
untypedObject := cachedAttachment.Get()
if untypedObject == nil {
return nil
}
typedObject := untypedObject.(*Attachment)
if typedObject == nil || typedObject.IsDeleted() {
return nil
}
return typedObject
}
// Consume unwraps the CachedObject and passes a type-casted version to the consumer (if the object is not empty - it
// exists). It automatically releases the object when the consumer finishes.
func (cachedAttachment *CachedAttachment) Consume(consumer func(attachment *Attachment)) (consumed bool) {
return cachedAttachment.CachedObject.Consume(func(object objectstorage.StorableObject) {
consumer(object.(*Attachment))
})
}
// CachedAttachments represents a collection of CachedAttachments.
type CachedAttachments []*CachedAttachment
// Consume iterates over the CachedObjects, unwraps them and passes a type-casted version to the consumer (if the object
// is not empty - it exists). It automatically releases the object when the consumer finishes. It returns true, if at
// least one object was consumed.
func (cachedAttachments CachedAttachments) Consume(consumer func(attachment *Attachment)) (consumed bool) {
for _, cachedAttachment := range cachedAttachments {
consumed = cachedAttachment.Consume(func(output *Attachment) {
consumer(output)
}) || consumed
}
return
}
package tangle
import (
"testing"
"github.com/iotaledger/goshimmer/packages/ledgerstate"
"github.com/stretchr/testify/assert"
)
func TestAttachment(t *testing.T) {
transactionID, err := ledgerstate.TransactionIDFromRandomness()
assert.NoError(t, err)
// TODO: maybe make this public?
messageID := randomMessageID()
attachment := NewAttachment(transactionID, messageID)
assert.Equal(t, transactionID, attachment.TransactionID())
assert.Equal(t, messageID, attachment.MessageID())
clonedAttachment, consumedBytes, err := AttachmentFromBytes(attachment.Bytes())
if err != nil {
panic(err)
}
assert.Equal(t, AttachmentLength, consumedBytes)
assert.Equal(t, transactionID, clonedAttachment.TransactionID())
assert.Equal(t, messageID, clonedAttachment.MessageID())
}
......@@ -83,6 +83,8 @@ func (m *MessageBooker) bookMessageContainingTransaction(message *Message, messa
}
}
// store attachment
//m.markersManager.IsInPastCone()
// past cone check
......@@ -183,8 +185,9 @@ func (m *MessageBooker) referencedTransactionIDs(transaction *ledgerstate.Transa
return
}
// Attachments retrieves the attachments of a transaction.
func (m *MessageBooker) Attachments(transactionID ledgerstate.TransactionID) (attachments MessageIDs) {
return
return m.messageStore.AttachmentMessageIDs(transactionID)
}
func (m *MessageBooker) determineTargetBranch(branchIDsOfStrongParents ledgerstate.BranchIDs) (targetBranch ledgerstate.BranchID, err error) {
......
......@@ -5,6 +5,7 @@ import (
"time"
"github.com/iotaledger/goshimmer/packages/database"
"github.com/iotaledger/goshimmer/packages/ledgerstate"
"github.com/iotaledger/hive.go/byteutils"
"github.com/iotaledger/hive.go/kvstore"
"github.com/iotaledger/hive.go/objectstorage"
......@@ -23,6 +24,9 @@ const (
// PrefixMissingMessage defines the storage prefix for missing message.
PrefixMissingMessage
// PrefixAttachments defines the storage prefix for attachments.
PrefixAttachments
cacheTime = 20 * time.Second
// DBSequenceNumber defines the db sequence number.
......@@ -36,6 +40,7 @@ type MessageStore struct {
messageMetadataStorage *objectstorage.ObjectStorage
approverStorage *objectstorage.ObjectStorage
missingMessageStorage *objectstorage.ObjectStorage
attachmentStorage *objectstorage.ObjectStorage
Events *MessageStoreEvents
shutdown chan struct{}
......@@ -52,6 +57,7 @@ func NewMessageStore(tangle *Tangle, store kvstore.KVStore) (result *MessageStor
messageMetadataStorage: osFactory.New(PrefixMessageMetadata, MessageMetadataFromObjectStorage, objectstorage.CacheTime(cacheTime), objectstorage.LeakDetectionEnabled(false)),
approverStorage: osFactory.New(PrefixApprovers, ApproverFromObjectStorage, objectstorage.CacheTime(cacheTime), objectstorage.PartitionKey(MessageIDLength, ApproverTypeLength, MessageIDLength), objectstorage.LeakDetectionEnabled(false)),
missingMessageStorage: osFactory.New(PrefixMissingMessage, MissingMessageFromObjectStorage, objectstorage.CacheTime(cacheTime), objectstorage.LeakDetectionEnabled(false)),
attachmentStorage: osFactory.New(PrefixAttachments, AttachmentFromObjectStorage, objectstorage.CacheTime(cacheTime), objectstorage.PartitionKey(ledgerstate.TransactionIDLength, MessageIDLength), objectstorage.LeakDetectionEnabled(false)),
Events: newMessageStoreEvents(),
}
......@@ -155,6 +161,33 @@ func (m *MessageStore) MissingMessages() (ids []MessageID) {
return
}
// StoreAttachment stores a new attachment if not already stored.
func (m *MessageStore) StoreAttachment(transactionID ledgerstate.TransactionID, messageID MessageID) (cachedAttachment *CachedAttachment, stored bool) {
attachment, stored := m.attachmentStorage.StoreIfAbsent(NewAttachment(transactionID, messageID))
if !stored {
return
}
cachedAttachment = &CachedAttachment{CachedObject: attachment}
return
}
// Attachments retrieves the attachment of a transaction in attachmentStorage.
func (m *MessageStore) Attachments(transactionID ledgerstate.TransactionID) (cachedAttachments CachedAttachments) {
m.attachmentStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool {
cachedAttachments = append(cachedAttachments, &CachedAttachment{CachedObject: cachedObject})
return true
}, transactionID.Bytes())
return
}
// AttachmentMessageIDs returns the messageIDs of the transaction in attachmentStorage.
func (m *MessageStore) AttachmentMessageIDs(transactionID ledgerstate.TransactionID) (messageIDs MessageIDs) {
m.Attachments(transactionID).Consume(func(attachment *Attachment) {
messageIDs = append(messageIDs, attachment.MessageID())
})
return
}
// DeleteMessage deletes a message and its association to approvees by un-marking the given
// message as an approver.
func (m *MessageStore) DeleteMessage(messageID MessageID) {
......@@ -194,6 +227,7 @@ func (m *MessageStore) Shutdown() {
m.messageMetadataStorage.Shutdown()
m.approverStorage.Shutdown()
m.missingMessageStorage.Shutdown()
m.attachmentStorage.Shutdown()
close(m.shutdown)
}
......@@ -205,6 +239,7 @@ func (m *MessageStore) Prune() error {
m.messageMetadataStorage,
m.approverStorage,
m.missingMessageStorage,
m.attachmentStorage,
} {
if err := storage.Prune(); err != nil {
err = fmt.Errorf("failed to prune storage: %w", err)
......
package tangle
import (
"math/rand"
"testing"
"github.com/iotaledger/goshimmer/packages/ledgerstate"
"github.com/iotaledger/hive.go/kvstore/mapdb"
"github.com/stretchr/testify/assert"
)
func newMessageStore() *MessageStore {
tangle := &Tangle{
Events: newEvents(),
}
store := mapdb.NewMapDB()
return NewMessageStore(tangle, store)
}
func TestMessageStore_StoreAttachment(t *testing.T) {
msgStore := newMessageStore()
defer msgStore.Shutdown()
transactionID, err := ledgerstate.TransactionIDFromRandomness()
assert.NoError(t, err)
messageID := randomMessageID()
cachedAttachment, stored := msgStore.StoreAttachment(transactionID, messageID)
cachedAttachment.Release()
assert.True(t, stored)
cachedAttachment, stored = msgStore.StoreAttachment(transactionID, randomMessageID())
assert.True(t, stored)
cachedAttachment.Release()
cachedAttachment, stored = msgStore.StoreAttachment(transactionID, messageID)
assert.False(t, stored)
assert.Nil(t, cachedAttachment)
}
func TestMessageStore_Attachments(t *testing.T) {
msgStore := newMessageStore()
defer msgStore.Shutdown()
attachments := make(map[ledgerstate.TransactionID]int)
for i := 0; i < 2; i++ {
transactionID, err := ledgerstate.TransactionIDFromRandomness()
assert.NoError(t, err)
// for every tx, store random number of attachments.
for j := 0; j < rand.Intn(5)+1; j++ {
attachments[transactionID]++
cachedAttachment, _ := msgStore.StoreAttachment(transactionID, randomMessageID())
cachedAttachment.Release()
}
}
for transactionID := range attachments {
cachedAttachments := msgStore.Attachments(transactionID)
assert.Equal(t, attachments[transactionID], len(cachedAttachments))
for _, cachedAttachment := range cachedAttachments {
cachedAttachment.Release()
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment