diff --git a/dapps/faucet/packages/payload/payload.go b/dapps/faucet/packages/payload/payload.go index a949f043ade356dac8521ab71f974895af4ab9e8..3d2b39e8a5c3d285d1fd1231daa4b2d368e8fd4f 100644 --- a/dapps/faucet/packages/payload/payload.go +++ b/dapps/faucet/packages/payload/payload.go @@ -3,6 +3,7 @@ package faucetpayload import ( "context" "crypto" + "fmt" // Only want to use init _ "golang.org/x/crypto/blake2b" @@ -43,6 +44,7 @@ func New(addr address.Address, powTarget int) (*Payload, error) { powRelevantBytes := payloadBytes[:len(payloadBytes)-pow.NonceBytes] nonce, err := powWorker.Mine(context.Background(), powRelevantBytes, powTarget) if err != nil { + err = fmt.Errorf("failed to do PoW for faucet payload: %w", err) return nil, err } p.nonce = nonce @@ -62,23 +64,28 @@ func FromBytes(bytes []byte) (result *Payload, consumedBytes int, err error) { // read data result = &Payload{} if _, err = marshalUtil.ReadUint32(); err != nil { + err = fmt.Errorf("failed to unmarshal payload size of faucet payload from bytes: %w", err) return } result.payloadType, err = marshalUtil.ReadUint32() if err != nil { + err = fmt.Errorf("failed to unmarshal payload type of faucet payload from bytes: %w", err) return } addr, err := marshalUtil.ReadBytes(address.Length) if err != nil { + err = fmt.Errorf("failed to unmarshal address of faucet payload from bytes: %w", err) return } result.address, _, err = address.FromBytes(addr) if err != nil { + err = fmt.Errorf("failed to unmarshal address of faucet payload from bytes: %w", err) return } result.nonce, err = marshalUtil.ReadUint64() if err != nil { + err = fmt.Errorf("failed to unmarshal nonce of faucet payload from bytes: %w", err) return } @@ -123,6 +130,9 @@ func (faucetPayload *Payload) String() string { // PayloadUnmarshaler sets the generic unmarshaler. func PayloadUnmarshaler(data []byte) (payload payload.Payload, err error) { payload, _, err = FromBytes(data) + if err != nil { + err = fmt.Errorf("failed to unmarshal faucet payload from bytes: %w", err) + } return } diff --git a/dapps/networkdelay/object.go b/dapps/networkdelay/object.go index 99464032fcc1de789379f205f3d1f4e1a1847bf7..3b5c2980436555da77b02f96a7b5857a44f55e6f 100644 --- a/dapps/networkdelay/object.go +++ b/dapps/networkdelay/object.go @@ -1,6 +1,7 @@ package networkdelay import ( + "fmt" "sync" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/payload" @@ -53,9 +54,11 @@ func FromBytes(bytes []byte) (result *Object, consumedBytes int, err error) { func Parse(marshalUtil *marshalutil.MarshalUtil) (result *Object, err error) { // read information that are required to identify the object from the outside if _, err = marshalUtil.ReadUint32(); err != nil { + err = fmt.Errorf("failed to parse payload size of networkdelay object: %w", err) return } if _, err = marshalUtil.ReadUint32(); err != nil { + err = fmt.Errorf("failed to parse payload type of networkdelay object: %w", err) return } @@ -63,12 +66,14 @@ func Parse(marshalUtil *marshalutil.MarshalUtil) (result *Object, err error) { result = &Object{} id, err := marshalUtil.ReadBytes(32) if err != nil { + err = fmt.Errorf("failed to parse id of networkdelay object: %w", err) return } copy(result.id[:], id) // parse sent time if result.sentTime, err = marshalUtil.ReadInt64(); err != nil { + err = fmt.Errorf("failed to parse sent time of networkdelay object: %w", err) return } diff --git a/dapps/valuetransfers/packages/branchmanager/branch.go b/dapps/valuetransfers/packages/branchmanager/branch.go index 1fce39bf6944f1e5a581ab6c13499fd277011bc3..cd7815e0ff43a7ac009a09cc5b2118bd6fe76418 100644 --- a/dapps/valuetransfers/packages/branchmanager/branch.go +++ b/dapps/valuetransfers/packages/branchmanager/branch.go @@ -56,6 +56,7 @@ func BranchFromBytes(bytes []byte) (result *Branch, consumedBytes int, err error // BranchFromObjectStorage is a factory method that creates a new Branch instance from the ObjectStorage. func BranchFromObjectStorage(key []byte, data []byte) (result objectstorage.StorableObject, err error) { if result, _, err = BranchFromBytes(byteutils.ConcatBytes(key, data)); err != nil { + err = fmt.Errorf("failed to unmarshal branch from bytes: %w", err) return } @@ -67,36 +68,44 @@ func ParseBranch(marshalUtil *marshalutil.MarshalUtil) (result *Branch, err erro result = &Branch{} if result.id, err = ParseBranchID(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse branch ID: %w", err) return } result.preferred, err = marshalUtil.ReadBool() if err != nil { + err = fmt.Errorf("failed to parse 'preferred' of the branch: %w", err) return } result.liked, err = marshalUtil.ReadBool() if err != nil { + err = fmt.Errorf("failed to parse 'liked' of the branch: %w", err) return } result.finalized, err = marshalUtil.ReadBool() if err != nil { + err = fmt.Errorf("failed to parse 'finalized' of the branch: %w", err) return } result.confirmed, err = marshalUtil.ReadBool() if err != nil { + err = fmt.Errorf("failed to parse 'confirmed' of the branch: %w", err) return } result.rejected, err = marshalUtil.ReadBool() if err != nil { + err = fmt.Errorf("failed to parse 'rejected' of the branch: %w", err) return } parentBranchCount, err := marshalUtil.ReadUint32() if err != nil { + err = fmt.Errorf("failed to parse parentBranchCount of the branch: %w", err) return } result.parentBranches = make([]BranchID, parentBranchCount) for i := uint32(0); i < parentBranchCount; i++ { result.parentBranches[i], err = ParseBranchID(marshalUtil) if err != nil { + err = fmt.Errorf("failed to parse ID of the parent branch: %w", err) return } } diff --git a/dapps/valuetransfers/packages/branchmanager/child_branch.go b/dapps/valuetransfers/packages/branchmanager/child_branch.go index 4f4d0504947f3717e82aa79be97a90bd3be0240c..7138033cbc6099b5f057deb7cc95c08458a7879c 100644 --- a/dapps/valuetransfers/packages/branchmanager/child_branch.go +++ b/dapps/valuetransfers/packages/branchmanager/child_branch.go @@ -1,6 +1,8 @@ package branchmanager import ( + "fmt" + "github.com/iotaledger/hive.go/byteutils" "github.com/iotaledger/hive.go/marshalutil" "github.com/iotaledger/hive.go/objectstorage" @@ -48,9 +50,11 @@ func ParseChildBranch(marshalUtil *marshalutil.MarshalUtil) (result *ChildBranch result = &ChildBranch{} if result.parentID, err = ParseBranchID(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse parent branch ID: %w", err) return } if result.childID, err = ParseBranchID(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse child branch ID: %w", err) return } diff --git a/dapps/valuetransfers/packages/branchmanager/conflict.go b/dapps/valuetransfers/packages/branchmanager/conflict.go index 304c78fcd57df73d49af443488e5146f352e7c5d..3a16f45ae27269c7128c1b7e7b4b64288419172a 100644 --- a/dapps/valuetransfers/packages/branchmanager/conflict.go +++ b/dapps/valuetransfers/packages/branchmanager/conflict.go @@ -1,6 +1,7 @@ package branchmanager import ( + "fmt" "sync" "github.com/iotaledger/hive.go/byteutils" @@ -49,9 +50,11 @@ func ParseConflict(marshalUtil *marshalutil.MarshalUtil) (result *Conflict, err result = &Conflict{} if result.id, err = ParseConflictID(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse conflict ID: %w", err) return } if result.memberCount, err = marshalUtil.ReadUint32(); err != nil { + err = fmt.Errorf("failed to parse memberCount of conflict: %w", err) return } diff --git a/dapps/valuetransfers/packages/branchmanager/conflict_member.go b/dapps/valuetransfers/packages/branchmanager/conflict_member.go index 9e4e45b03c1b408eaef3d65cade392d52b06785e..b381de24b3abda6cf1b3e0306631fd96701c7543 100644 --- a/dapps/valuetransfers/packages/branchmanager/conflict_member.go +++ b/dapps/valuetransfers/packages/branchmanager/conflict_member.go @@ -1,6 +1,8 @@ package branchmanager import ( + "fmt" + "github.com/iotaledger/hive.go/marshalutil" "github.com/iotaledger/hive.go/objectstorage" ) @@ -45,9 +47,11 @@ func ParseConflictMember(marshalUtil *marshalutil.MarshalUtil) (result *Conflict result = &ConflictMember{} if result.conflictID, err = ParseConflictID(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse conflict ID: %w", err) return } if result.branchID, err = ParseBranchID(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse conflict branch ID: %w", err) return } diff --git a/dapps/valuetransfers/packages/payload/id.go b/dapps/valuetransfers/packages/payload/id.go index 7b901ff92440e97dad81b5790e4c37edc98720fb..06a5d383b3165837db6c92cb69192341961273be 100644 --- a/dapps/valuetransfers/packages/payload/id.go +++ b/dapps/valuetransfers/packages/payload/id.go @@ -33,6 +33,7 @@ func NewID(base58EncodedString string) (result ID, err error) { func ParseID(marshalUtil *marshalutil.MarshalUtil) (ID, error) { id, err := marshalUtil.Parse(func(data []byte) (interface{}, int, error) { return IDFromBytes(data) }) if err != nil { + err = fmt.Errorf("failed to parse value payload ID: %w", err) return ID{}, err } diff --git a/dapps/valuetransfers/packages/payload/payload.go b/dapps/valuetransfers/packages/payload/payload.go index 4b4c0c0c50d73e49cbd99bd203896b30ac5ce690..f191d2060ad7a51a769a376318c781c34b51915c 100644 --- a/dapps/valuetransfers/packages/payload/payload.go +++ b/dapps/valuetransfers/packages/payload/payload.go @@ -57,12 +57,14 @@ func FromObjectStorage(key []byte, data []byte) (result objectstorage.StorableOb // parse the message parsedPayload, err := Parse(marshalutil.New(data)) if err != nil { + err = fmt.Errorf("failed to parse value payload from object storage: %w", err) return } - // parse the ID from they key + // parse the ID from the key payloadID, err := ParseID(marshalutil.New(key)) if err != nil { + err = fmt.Errorf("failed to parse value payload ID from object storage: %w", err) return } parsedPayload.id = &payloadID @@ -81,10 +83,12 @@ func Parse(marshalUtil *marshalutil.MarshalUtil) (result *Payload, err error) { // read information that are required to identify the payload from the outside _, err = marshalUtil.ReadUint32() if err != nil { + err = fmt.Errorf("failed to parse payload size of value payload: %w", err) return } _, err = marshalUtil.ReadUint32() if err != nil { + err = fmt.Errorf("failed to parse payload type of value payload: %w", err) return } diff --git a/dapps/valuetransfers/packages/tangle/attachment.go b/dapps/valuetransfers/packages/tangle/attachment.go index 7fa2b81a3c6828541262984077fb6064e0816366..4bb84fcadb3912ce0a939f4170ef99b365b91d26 100644 --- a/dapps/valuetransfers/packages/tangle/attachment.go +++ b/dapps/valuetransfers/packages/tangle/attachment.go @@ -1,6 +1,8 @@ package tangle import ( + "fmt" + "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/payload" "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/transaction" "github.com/iotaledger/hive.go/byteutils" @@ -40,9 +42,11 @@ func AttachmentFromBytes(bytes []byte) (result *Attachment, consumedBytes int, e func ParseAttachment(marshalUtil *marshalutil.MarshalUtil) (result *Attachment, err error) { result = &Attachment{} if result.transactionID, err = transaction.ParseID(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse transaction ID in attachment: %w", err) return } if result.payloadID, err = payload.ParseID(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse value payload ID in attachment: %w", err) return } @@ -53,6 +57,9 @@ func ParseAttachment(marshalUtil *marshalutil.MarshalUtil) (result *Attachment, // returns the new object. func AttachmentFromObjectStorage(key []byte, data []byte) (result objectstorage.StorableObject, err error) { result, _, err = AttachmentFromBytes(byteutils.ConcatBytes(key, data)) + if err != nil { + err = fmt.Errorf("failed to parse attachment from object storage: %w", err) + } return } diff --git a/dapps/valuetransfers/packages/tangle/consumer.go b/dapps/valuetransfers/packages/tangle/consumer.go index 2afc4099fd18d17aae4dd2fedab6b7b78911828a..38934671436496b76a01fda567c3f0533e0cc778 100644 --- a/dapps/valuetransfers/packages/tangle/consumer.go +++ b/dapps/valuetransfers/packages/tangle/consumer.go @@ -1,6 +1,8 @@ package tangle import ( + "fmt" + "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/address" "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/transaction" "github.com/iotaledger/hive.go/byteutils" @@ -44,9 +46,11 @@ func ParseConsumer(marshalUtil *marshalutil.MarshalUtil) (result *Consumer, err result = &Consumer{} if result.consumedInput, err = transaction.ParseOutputID(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse output ID of consumer: %w", err) return } if result.transactionID, err = transaction.ParseID(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse transaction ID of consumer: %w", err) return } @@ -57,6 +61,9 @@ func ParseConsumer(marshalUtil *marshalutil.MarshalUtil) (result *Consumer, err // objectstorage. It is used by the objectstorage, to create new instances of this entity. func ConsumerFromObjectStorage(key []byte, _ []byte) (result objectstorage.StorableObject, err error) { result, _, err = ConsumerFromBytes(key) + if err != nil { + err = fmt.Errorf("failed to parse consumer from object storage: %w", err) + } return } diff --git a/dapps/valuetransfers/packages/tangle/missingpayload.go b/dapps/valuetransfers/packages/tangle/missingpayload.go index 075b88b1a0dd693dea2376f70a8590c0f96ac842..bd828290aa4fe2868c30df75cd651b58ebe0c8e1 100644 --- a/dapps/valuetransfers/packages/tangle/missingpayload.go +++ b/dapps/valuetransfers/packages/tangle/missingpayload.go @@ -1,6 +1,7 @@ package tangle import ( + "fmt" "time" "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/payload" @@ -41,9 +42,11 @@ func ParseMissingPayload(marshalUtil *marshalutil.MarshalUtil) (result *MissingP result = &MissingPayload{} if result.payloadID, err = payload.ParseID(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse payload id of missing payload: %w", err) return } if result.missingSince, err = marshalUtil.ReadTime(); err != nil { + err = fmt.Errorf("failed to parse missing time of missing payload: %w", err) return } @@ -54,6 +57,9 @@ func ParseMissingPayload(marshalUtil *marshalutil.MarshalUtil) (result *MissingP // the content will be unmarshaled by an external caller using the binary.ObjectStorageValue interface. func MissingPayloadFromObjectStorage(key []byte, data []byte) (result objectstorage.StorableObject, err error) { result, _, err = MissingPayloadFromBytes(byteutils.ConcatBytes(key, data)) + if err != nil { + err = fmt.Errorf("failed to parse missing payload from object storage: %w", err) + } return } diff --git a/dapps/valuetransfers/packages/tangle/output.go b/dapps/valuetransfers/packages/tangle/output.go index dd1f4c4b3cdff90efc619c0c49ef01fd2952ec2d..afe04068cc11ba056956b3675800002753ce0bd1 100644 --- a/dapps/valuetransfers/packages/tangle/output.go +++ b/dapps/valuetransfers/packages/tangle/output.go @@ -1,6 +1,7 @@ package tangle import ( + "fmt" "sync" "time" @@ -73,51 +74,65 @@ func ParseOutput(marshalUtil *marshalutil.MarshalUtil) (result *Output, err erro result = &Output{} if result.address, err = address.Parse(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse address of output: %w", err) return } if result.transactionID, err = transaction.ParseID(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse transaction ID of output: %w", err) return } if result.branchID, err = branchmanager.ParseBranchID(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse branch ID of output: %w", err) return } if result.solid, err = marshalUtil.ReadBool(); err != nil { + err = fmt.Errorf("failed to parse 'solid' of output: %w", err) return } if result.solidificationTime, err = marshalUtil.ReadTime(); err != nil { + err = fmt.Errorf("failed to parse solidification time of output: %w", err) return } if result.firstConsumer, err = transaction.ParseID(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse transaction ID of first consumer of output: %w", err) return } consumerCount, err := marshalUtil.ReadUint32() if err != nil { + err = fmt.Errorf("failed to parse consumer count of output: %w", err) return } if result.preferred, err = marshalUtil.ReadBool(); err != nil { + err = fmt.Errorf("failed to parse 'preferred' of output: %w", err) return } if result.finalized, err = marshalUtil.ReadBool(); err != nil { + err = fmt.Errorf("failed to parse 'finalized' of output: %w", err) return } if result.liked, err = marshalUtil.ReadBool(); err != nil { + err = fmt.Errorf("failed to parse 'liked' of output: %w", err) return } if result.confirmed, err = marshalUtil.ReadBool(); err != nil { + err = fmt.Errorf("failed to parse 'confirmed' of output: %w", err) return } if result.rejected, err = marshalUtil.ReadBool(); err != nil { + err = fmt.Errorf("failed to parse 'rejected' of output: %w", err) return } result.consumerCount = int(consumerCount) balanceCount, err := marshalUtil.ReadUint32() if err != nil { + err = fmt.Errorf("failed to parse balance count of output: %w", err) return } result.balances = make([]*balance.Balance, balanceCount) for i := uint32(0); i < balanceCount; i++ { result.balances[i], err = balance.Parse(marshalUtil) if err != nil { + err = fmt.Errorf("failed to parse balance of output: %w", err) return } } @@ -130,6 +145,9 @@ func ParseOutput(marshalUtil *marshalutil.MarshalUtil) (result *Output, err erro // it gets handed over to UnmarshalObjectStorageValue (by the ObjectStorage). func OutputFromObjectStorage(key []byte, data []byte) (result objectstorage.StorableObject, err error) { result, _, err = OutputFromBytes(byteutils.ConcatBytes(key, data)) + if err != nil { + err = fmt.Errorf("failed to parse output from object storage: %w", err) + } return } diff --git a/dapps/valuetransfers/packages/tangle/payloadapprover.go b/dapps/valuetransfers/packages/tangle/payloadapprover.go index e5f0973a82b0c03fa78551d24d0f418a2c0596d1..b24c65400cd28f4b7cec6ad2c7cba51a7c10bf45 100644 --- a/dapps/valuetransfers/packages/tangle/payloadapprover.go +++ b/dapps/valuetransfers/packages/tangle/payloadapprover.go @@ -1,6 +1,8 @@ package tangle import ( + "fmt" + "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/payload" "github.com/iotaledger/hive.go/byteutils" "github.com/iotaledger/hive.go/marshalutil" @@ -38,9 +40,11 @@ func PayloadApproverFromBytes(bytes []byte) (result *PayloadApprover, consumedBy func ParsePayloadApprover(marshalUtil *marshalutil.MarshalUtil) (result *PayloadApprover, err error) { result = &PayloadApprover{} if result.referencedPayloadID, err = payload.ParseID(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse payload id of approver: %w", err) return } if result.approvingPayloadID, err = payload.ParseID(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse payload id of approver: %w", err) return } @@ -52,6 +56,9 @@ func ParsePayloadApprover(marshalUtil *marshalutil.MarshalUtil) (result *Payload // method. func PayloadApproverFromObjectStorage(key []byte, _ []byte) (result objectstorage.StorableObject, err error) { result, _, err = PayloadMetadataFromBytes(key) + if err != nil { + err = fmt.Errorf("failed to parse approver from object storage: %w", err) + } return } diff --git a/dapps/valuetransfers/packages/tangle/payloadmetadata.go b/dapps/valuetransfers/packages/tangle/payloadmetadata.go index dc7b1fb86366b3f8b034c0e6a710f9f7a0c90f2e..4aeba437e86d91c1e021f486b99498c724f4f342 100644 --- a/dapps/valuetransfers/packages/tangle/payloadmetadata.go +++ b/dapps/valuetransfers/packages/tangle/payloadmetadata.go @@ -1,6 +1,7 @@ package tangle import ( + "fmt" "sync" "time" @@ -55,24 +56,31 @@ func PayloadMetadataFromBytes(bytes []byte) (result *PayloadMetadata, consumedBy func ParsePayloadMetadata(marshalUtil *marshalutil.MarshalUtil) (result *PayloadMetadata, err error) { result = &PayloadMetadata{} if result.payloadID, err = payload.ParseID(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse payload id of payload metadata: %w", err) return } if result.solidificationTime, err = marshalUtil.ReadTime(); err != nil { + err = fmt.Errorf("failed to parse solidification time of payload metadata: %w", err) return } if result.solid, err = marshalUtil.ReadBool(); err != nil { + err = fmt.Errorf("failed to parse 'solid' of payload metadata: %w", err) return } if result.liked, err = marshalUtil.ReadBool(); err != nil { + err = fmt.Errorf("failed to parse 'liked' of payload metadata: %w", err) return } if result.confirmed, err = marshalUtil.ReadBool(); err != nil { + err = fmt.Errorf("failed to parse 'confirmed' of payload metadata: %w", err) return } if result.rejected, err = marshalUtil.ReadBool(); err != nil { + err = fmt.Errorf("failed to parse 'rejected' of payload metadata: %w", err) return } if result.branchID, err = branchmanager.ParseBranchID(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse branch ID of payload metadata: %w", err) return } @@ -83,6 +91,9 @@ func ParsePayloadMetadata(marshalUtil *marshalutil.MarshalUtil) (result *Payload // unmarshaled by an external caller using the binary.ObjectStorageValue interface. func PayloadMetadataFromObjectStorage(key []byte, data []byte) (result objectstorage.StorableObject, err error) { result, _, err = PayloadMetadataFromBytes(byteutils.ConcatBytes(key, data)) + if err != nil { + err = fmt.Errorf("failed to parse payload metadata from object storage: %w", err) + } return } diff --git a/dapps/valuetransfers/packages/tangle/transactionmetadata.go b/dapps/valuetransfers/packages/tangle/transactionmetadata.go index f3e966bbb3e03a377c52dee586350023045ef2ed..2f9c0543c7a5a91f3ceb9538959d520e969a508f 100644 --- a/dapps/valuetransfers/packages/tangle/transactionmetadata.go +++ b/dapps/valuetransfers/packages/tangle/transactionmetadata.go @@ -1,6 +1,7 @@ package tangle import ( + "fmt" "sync" "time" @@ -60,6 +61,9 @@ func TransactionMetadataFromBytes(bytes []byte) (result *TransactionMetadata, co // it gets handed over to UnmarshalObjectStorageValue (by the ObjectStorage). func TransactionMetadataFromObjectStorage(key []byte, data []byte) (result objectstorage.StorableObject, err error) { result, _, err = TransactionMetadataFromBytes(byteutils.ConcatBytes(key, data)) + if err != nil { + err = fmt.Errorf("failed to parse transaction metadata from object storage: %w", err) + } return } @@ -69,33 +73,43 @@ func ParseTransactionMetadata(marshalUtil *marshalutil.MarshalUtil) (result *Tra result = &TransactionMetadata{} if result.id, err = transaction.ParseID(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse transaction ID of transaction metadata: %w", err) return } if result.branchID, err = branchmanager.ParseBranchID(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse branch ID of transaction metadata: %w", err) return } if result.solidificationTime, err = marshalUtil.ReadTime(); err != nil { + err = fmt.Errorf("failed to parse solidification time of transaction metadata: %w", err) return } if result.finalizationTime, err = marshalUtil.ReadTime(); err != nil { + err = fmt.Errorf("failed to parse finalization time of transaction metadata: %w", err) return } if result.solid, err = marshalUtil.ReadBool(); err != nil { + err = fmt.Errorf("failed to parse 'solid' of transaction metadata: %w", err) return } if result.preferred, err = marshalUtil.ReadBool(); err != nil { + err = fmt.Errorf("failed to parse 'preferred' of transaction metadata: %w", err) return } if result.finalized, err = marshalUtil.ReadBool(); err != nil { + err = fmt.Errorf("failed to parse 'finalized' of transaction metadata: %w", err) return } if result.liked, err = marshalUtil.ReadBool(); err != nil { + err = fmt.Errorf("failed to parse 'liked' of transaction metadata: %w", err) return } if result.confirmed, err = marshalUtil.ReadBool(); err != nil { + err = fmt.Errorf("failed to parse 'confirmed' of transaction metadata: %w", err) return } if result.rejected, err = marshalUtil.ReadBool(); err != nil { + err = fmt.Errorf("failed to parse 'rejected' of transaction metadata: %w", err) return } diff --git a/dapps/valuetransfers/packages/transaction/transaction.go b/dapps/valuetransfers/packages/transaction/transaction.go index bd552869dd4fee9ec007a1ee1a4cebec9ffe426e..348eb71876bad2692fd3bcb146315fbffb6b8d21 100644 --- a/dapps/valuetransfers/packages/transaction/transaction.go +++ b/dapps/valuetransfers/packages/transaction/transaction.go @@ -78,11 +78,13 @@ func FromObjectStorage(key []byte, data []byte) (result objectstorage.StorableOb // parse the message transaction, err := Parse(marshalutil.New(data)) if err != nil { + err = fmt.Errorf("failed to parse transaction from object storage: %w", err) return } id, err := ParseID(marshalutil.New(key)) if err != nil { + err = fmt.Errorf("failed to parse transaction ID from object storage: %w", err) return } transaction.id = &id @@ -104,6 +106,7 @@ func Parse(marshalUtil *marshalutil.MarshalUtil) (result *Transaction, err error // unmarshal inputs parsedInputs, err := marshalUtil.Parse(func(data []byte) (interface{}, int, error) { return InputsFromBytes(data) }) if err != nil { + err = fmt.Errorf("failed to parse inputs of transaction: %w", err) return } result.inputs = parsedInputs.(*Inputs) @@ -111,6 +114,7 @@ func Parse(marshalUtil *marshalutil.MarshalUtil) (result *Transaction, err error // unmarshal outputs parsedOutputs, err := marshalUtil.Parse(func(data []byte) (interface{}, int, error) { return OutputsFromBytes(data) }) if err != nil { + err = fmt.Errorf("failed to parse outputs of transaction: %w", err) return } result.outputs = parsedOutputs.(*Outputs) @@ -119,12 +123,14 @@ func Parse(marshalUtil *marshalutil.MarshalUtil) (result *Transaction, err error var dataPayloadSize uint32 dataPayloadSize, err = marshalUtil.ReadUint32() if err != nil { + err = fmt.Errorf("failed to parse data payload size of transaction: %w", err) return } // unmarshal data payload result.dataPayload, err = marshalUtil.ReadBytes(int(dataPayloadSize)) if err != nil { + err = fmt.Errorf("failed to parse data payload of transaction: %w", err) return } @@ -132,12 +138,14 @@ func Parse(marshalUtil *marshalutil.MarshalUtil) (result *Transaction, err error essenceBytesCount := marshalUtil.ReadOffset() - readOffsetStart result.essenceBytes, err = marshalUtil.ReadBytes(essenceBytesCount, readOffsetStart) if err != nil { + err = fmt.Errorf("failed to parse essence bytes of transaction: %w", err) return } - // unmarshal outputs + // unmarshal signatures parsedSignatures, err := marshalUtil.Parse(func(data []byte) (interface{}, int, error) { return SignaturesFromBytes(data) }) if err != nil { + err = fmt.Errorf("failed to parse signatures of transaction: %w", err) return } result.signatures = parsedSignatures.(*Signatures) @@ -146,12 +154,14 @@ func Parse(marshalUtil *marshalutil.MarshalUtil) (result *Transaction, err error signatureBytesCount := marshalUtil.ReadOffset() - readOffsetStart - essenceBytesCount result.signatureBytes, err = marshalUtil.ReadBytes(signatureBytesCount, readOffsetStart+essenceBytesCount) if err != nil { + err = fmt.Errorf("failed to parse signature bytes of transaction: %w", err) return } // store bytes, so we don't have to marshal manually result.bytes, err = marshalUtil.ReadBytes(essenceBytesCount+signatureBytesCount, readOffsetStart) if err != nil { + err = fmt.Errorf("failed to parse bytes of transaction: %w", err) return } diff --git a/packages/binary/drng/payload/payload.go b/packages/binary/drng/payload/payload.go index 1849b55b447f11b76641313855cbca19dcb71ff3..e3f1c0a6de78e4db23921745105c83ab96709b97 100644 --- a/packages/binary/drng/payload/payload.go +++ b/packages/binary/drng/payload/payload.go @@ -1,6 +1,7 @@ package payload import ( + "fmt" "sync" "github.com/iotaledger/goshimmer/packages/binary/drng/payload/header" @@ -35,6 +36,7 @@ func New(header header.Header, data []byte) *Payload { func Parse(marshalUtil *marshalutil.MarshalUtil) (*Payload, error) { payload, err := marshalUtil.Parse(func(data []byte) (interface{}, int, error) { return FromBytes(data) }) if err != nil { + err = fmt.Errorf("failed to parse drng payload: %w", err) return &Payload{}, err } return payload.(*Payload), nil @@ -50,20 +52,24 @@ func FromBytes(bytes []byte) (result *Payload, consumedBytes int, err error) { result = &Payload{} len, err := marshalUtil.ReadUint32() if err != nil { + err = fmt.Errorf("failed to parse payload size of drng payload: %w", err) return } if _, err = marshalUtil.ReadUint32(); err != nil { + err = fmt.Errorf("failed to parse payload type of drng payload: %w", err) return } // parse header if result.Header, err = header.Parse(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse header of drng payload: %w", err) return } // parse data if result.Data, err = marshalUtil.ReadBytes(int(len - header.Length)); err != nil { + err = fmt.Errorf("failed to parse data of drng payload: %w", err) return } diff --git a/packages/binary/drng/subtypes/collectivebeacon/payload/payload.go b/packages/binary/drng/subtypes/collectivebeacon/payload/payload.go index 273f419738f49bde12db58b405e84503eb7751e0..46bed289ed81ca17a6bf6d99b2e5b956c1825566 100644 --- a/packages/binary/drng/subtypes/collectivebeacon/payload/payload.go +++ b/packages/binary/drng/subtypes/collectivebeacon/payload/payload.go @@ -1,6 +1,7 @@ package payload import ( + "fmt" "sync" "github.com/iotaledger/hive.go/stringify" @@ -43,6 +44,7 @@ func New(instanceID uint32, round uint64, prevSignature, signature, dpk []byte) func Parse(marshalUtil *marshalutil.MarshalUtil) (*Payload, error) { unmarshalledPayload, err := marshalUtil.Parse(func(data []byte) (interface{}, int, error) { return FromBytes(data) }) if err != nil { + err = fmt.Errorf("failed to parse collective beacon payload: %w", err) return nil, err } _payload := unmarshalledPayload.(*Payload) @@ -58,35 +60,42 @@ func FromBytes(bytes []byte) (result *Payload, consumedBytes int, err error) { // read information that are required to identify the payload from the outside if _, err = marshalUtil.ReadUint32(); err != nil { + err = fmt.Errorf("failed to parse payload size of collective beacon payload: %w", err) return } if _, err = marshalUtil.ReadUint32(); err != nil { + err = fmt.Errorf("failed to parse payload type of collective beacon payload: %w", err) return } // parse header result = &Payload{} if result.Header, err = header.Parse(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse header of collective beacon payload: %w", err) return } // parse round if result.Round, err = marshalUtil.ReadUint64(); err != nil { + err = fmt.Errorf("failed to parse round of collective beacon payload: %w", err) return } // parse prevSignature if result.PrevSignature, err = marshalUtil.ReadBytes(SignatureSize); err != nil { + err = fmt.Errorf("failed to parse prevSignature of collective beacon payload: %w", err) return } // parse current signature if result.Signature, err = marshalUtil.ReadBytes(SignatureSize); err != nil { + err = fmt.Errorf("failed to parse current signature of collective beacon payload: %w", err) return } // parse distributed public key if result.Dpk, err = marshalUtil.ReadBytes(PublicKeySize); err != nil { + err = fmt.Errorf("failed to parse distributed public key of collective beacon payload: %w", err) return } diff --git a/packages/tangle/approver.go b/packages/tangle/approver.go new file mode 100644 index 0000000000000000000000000000000000000000..16f7981a53bf561b236d78a8d3f82fd8915c64de --- /dev/null +++ b/packages/tangle/approver.go @@ -0,0 +1,152 @@ +package tangle + +import ( + "fmt" + + "github.com/iotaledger/hive.go/marshalutil" + "github.com/iotaledger/hive.go/objectstorage" + "github.com/iotaledger/hive.go/stringify" +) + +// Approver is an approver of a given referenced message. +type Approver struct { + objectstorage.StorableObjectFlags + // the message which got referenced by the approver message. + referencedMessageID MessageID + // the message which approved/referenced the given referenced message. + approverMessageID MessageID +} + +// NewApprover creates a new approver relation to the given approved/referenced message. +func NewApprover(referencedMessageID MessageID, approverMessageID MessageID) *Approver { + approver := &Approver{ + referencedMessageID: referencedMessageID, + approverMessageID: approverMessageID, + } + return approver +} + +// ApproverFromBytes parses the given bytes into an approver. +func ApproverFromBytes(bytes []byte) (result *Approver, consumedBytes int, err error) { + marshalUtil := marshalutil.New(bytes) + result, err = ApproverFromMarshalUtil(marshalUtil) + consumedBytes = marshalUtil.ReadOffset() + return +} + +// ApproverFromMarshalUtil parses a new approver from the given marshal util. +func ApproverFromMarshalUtil(marshalUtil *marshalutil.MarshalUtil) (result *Approver, err error) { + result = &Approver{} + + if result.referencedMessageID, err = MessageIDFromMarshalUtil(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse referenced message ID of approver: %w", err) + return + } + if result.approverMessageID, err = MessageIDFromMarshalUtil(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse approver message ID of approver: %w", err) + return + } + + return +} + +// ApproverFromObjectStorage is the factory method for Approvers stored in the ObjectStorage. +func ApproverFromObjectStorage(key []byte, _ []byte) (result objectstorage.StorableObject, err error) { + result, _, err = ApproverFromBytes(key) + if err != nil { + err = fmt.Errorf("failed to parse approver from object storage: %w", err) + } + + return +} + +// ReferencedMessageID returns the ID of the message which is referenced by the approver. +func (a *Approver) ReferencedMessageID() MessageID { + return a.referencedMessageID +} + +// ApproverMessageID returns the ID of the message which referenced the given approved message. +func (a *Approver) ApproverMessageID() MessageID { + return a.approverMessageID +} + +// Bytes returns the bytes of the approver. +func (a *Approver) Bytes() []byte { + return a.ObjectStorageKey() +} + +// String returns the string representation of the approver. +func (a *Approver) String() string { + return stringify.Struct("Approver", + stringify.StructField("referencedMessageID", a.ReferencedMessageID()), + stringify.StructField("approverMessageID", a.ApproverMessageID()), + ) +} + +// ObjectStorageKey marshals the keys of the stored approver into a byte array. +// This includes the referencedMessageID and the approverMessageID. +func (a *Approver) ObjectStorageKey() []byte { + return marshalutil.New(). + WriteBytes(a.referencedMessageID.Bytes()). + WriteBytes(a.approverMessageID.Bytes()). + Bytes() +} + +// ObjectStorageValue returns the value of the stored approver object. +func (a *Approver) ObjectStorageValue() (result []byte) { + return +} + +// Update updates the approver. +// This should should never happen and will panic if attempted. +func (a *Approver) Update(other objectstorage.StorableObject) { + panic("approvers should never be overwritten and only stored once to optimize IO") +} + +// interface contract (allow the compiler to check if the implementation has all of the required methods). +var _ objectstorage.StorableObject = &Approver{} + +// CachedApprover is a wrapper for a stored cached object representing an approver. +type CachedApprover struct { + objectstorage.CachedObject +} + +// Unwrap unwraps the cached approver into the underlying approver. +// If stored object cannot be cast into an approver or has been deleted, it returns nil. +func (c *CachedApprover) Unwrap() *Approver { + untypedObject := c.Get() + if untypedObject == nil { + return nil + } + + typedObject := untypedObject.(*Approver) + if typedObject == nil || typedObject.IsDeleted() { + return nil + } + + return typedObject + +} + +// Consume consumes the cachedApprover. +// It releases the object when the callback is done. +// It returns true if the callback was called. +func (c *CachedApprover) Consume(consumer func(approver *Approver)) (consumed bool) { + return c.CachedObject.Consume(func(object objectstorage.StorableObject) { + consumer(object.(*Approver)) + }) +} + +// CachedApprovers defines a slice of *CachedApprover. +type CachedApprovers []*CachedApprover + +// Consume calls *CachedApprover.Consume on element in the list. +func (c CachedApprovers) Consume(consumer func(approver *Approver)) (consumed bool) { + for _, cachedApprover := range c { + consumed = cachedApprover.Consume(func(approver *Approver) { + consumer(approver) + }) || consumed + } + + return +} diff --git a/packages/tangle/events.go b/packages/tangle/events.go new file mode 100644 index 0000000000000000000000000000000000000000..d527f3fff753f7ea3bd546c2de952e80dcea2873 --- /dev/null +++ b/packages/tangle/events.go @@ -0,0 +1,171 @@ +package tangle + +import ( + "github.com/iotaledger/hive.go/autopeering/peer" + "github.com/iotaledger/hive.go/events" +) + +// Events represents events happening on the base layer Tangle. +type Events struct { + // Fired when a message has been attached. + MessageAttached *events.Event + // Fired when a message has been solid, i.e. its past cone + // is known and in the database. + MessageSolid *events.Event + // Fired when a message which was previously marked as missing was received. + MissingMessageReceived *events.Event + // Fired when a message is missing which is needed to solidify a given approver message. + MessageMissing *events.Event + // Fired when a message was missing for too long and is + // therefore considered to be unsolidifiable. + MessageUnsolidifiable *events.Event + // Fired when a message was removed from storage. + MessageRemoved *events.Event +} + +// CachedMessageEvent represents the parameters of cachedMessageEvent +type CachedMessageEvent struct { + Message *CachedMessage + MessageMetadata *CachedMessageMetadata +} + +func newEvents() *Events { + return &Events{ + MessageAttached: events.NewEvent(cachedMessageEvent), + MessageSolid: events.NewEvent(cachedMessageEvent), + MissingMessageReceived: events.NewEvent(cachedMessageEvent), + MessageMissing: events.NewEvent(messageIDEvent), + MessageUnsolidifiable: events.NewEvent(messageIDEvent), + MessageRemoved: events.NewEvent(messageIDEvent), + } +} + +// MessageTipSelectorEvents represents event happening on the tip-selector. +type MessageTipSelectorEvents struct { + // Fired when a tip is added. + TipAdded *events.Event + // Fired when a tip is removed. + TipRemoved *events.Event +} + +func newMessageTipSelectorEvents() *MessageTipSelectorEvents { + return &MessageTipSelectorEvents{ + TipAdded: events.NewEvent(messageIDEvent), + TipRemoved: events.NewEvent(messageIDEvent), + } +} + +// MessageFactoryEvents represents events happening on a message factory. +type MessageFactoryEvents struct { + // Fired when a message is built including tips, sequence number and other metadata. + MessageConstructed *events.Event + // Fired when an error occurred. + Error *events.Event +} + +func newMessageFactoryEvents() *MessageFactoryEvents { + return &MessageFactoryEvents{ + MessageConstructed: events.NewEvent(messageConstructedEvent), + Error: events.NewEvent(events.ErrorCaller), + } +} + +// MessageParserEvents represents events happening on a message parser. +type MessageParserEvents struct { + // Fired when a message was parsed. + MessageParsed *events.Event + // Fired when submitted bytes are rejected by a filter. + BytesRejected *events.Event + // Fired when a message got rejected by a filter. + MessageRejected *events.Event +} + +// MessageParsedEvent represents the parameters of messageParsedEvent +type MessageParsedEvent struct { + Message *Message + Peer *peer.Peer +} + +// BytesRejectedEvent represents the parameters of bytesRejectedEvent +type BytesRejectedEvent struct { + Bytes []byte + Peer *peer.Peer +} + +// MessageRejectedEvent represents the parameters of messageRejectedEvent +type MessageRejectedEvent struct { + Message *Message + Peer *peer.Peer +} + +func newMessageParserEvents() *MessageParserEvents { + return &MessageParserEvents{ + MessageParsed: events.NewEvent(messageParsedEvent), + BytesRejected: events.NewEvent(bytesRejectedEvent), + MessageRejected: events.NewEvent(messageRejectedEvent), + } +} + +// MessageRequesterEvents represents events happening on a message requester. +type MessageRequesterEvents struct { + // Fired when a request for a given message should be sent. + SendRequest *events.Event + // MissingMessageAppeared is triggered when a message is actually present in the node's db although it was still being requested. + MissingMessageAppeared *events.Event +} + +// SendRequestEvent represents the parameters of sendRequestEvent +type SendRequestEvent struct { + ID MessageID +} + +// MissingMessageAppearedEvent represents the parameters of missingMessageAppearedEvent +type MissingMessageAppearedEvent struct { + ID MessageID +} + +func newMessageRequesterEvents() *MessageRequesterEvents { + return &MessageRequesterEvents{ + SendRequest: events.NewEvent(sendRequestEvent), + MissingMessageAppeared: events.NewEvent(missingMessageAppearedEvent), + } +} + +func sendRequestEvent(handler interface{}, params ...interface{}) { + handler.(func(*SendRequestEvent))(params[0].(*SendRequestEvent)) +} + +func missingMessageAppearedEvent(handler interface{}, params ...interface{}) { + handler.(func(*MissingMessageAppearedEvent))(params[0].(*MissingMessageAppearedEvent)) +} + +func messageParsedEvent(handler interface{}, params ...interface{}) { + handler.(func(*MessageParsedEvent))(params[0].(*MessageParsedEvent)) +} + +func bytesRejectedEvent(handler interface{}, params ...interface{}) { + handler.(func(*BytesRejectedEvent, error))(params[0].(*BytesRejectedEvent), params[1].(error)) +} + +func messageRejectedEvent(handler interface{}, params ...interface{}) { + handler.(func(*MessageRejectedEvent, error))(params[0].(*MessageRejectedEvent), params[1].(error)) +} + +func messageConstructedEvent(handler interface{}, params ...interface{}) { + handler.(func(*Message))(params[0].(*Message)) +} + +func messageIDEvent(handler interface{}, params ...interface{}) { + handler.(func(MessageID))(params[0].(MessageID)) +} + +func cachedMessageEvent(handler interface{}, params ...interface{}) { + handler.(func(*CachedMessageEvent))(cachedMessageRetain(params[0].(*CachedMessageEvent))) +} + +func cachedMessageRetain(object *CachedMessageEvent) *CachedMessageEvent { + return &CachedMessageEvent{ + Message: object.Message.Retain(), + MessageMetadata: object.MessageMetadata.Retain(), + } +} diff --git a/packages/tangle/filter.go b/packages/tangle/filter.go new file mode 100644 index 0000000000000000000000000000000000000000..948d8acba77fcd4c88aed12e595879dfbe38aef0 --- /dev/null +++ b/packages/tangle/filter.go @@ -0,0 +1,236 @@ +package tangle + +import ( + "crypto/ed25519" + "errors" + "fmt" + "sync" + + "github.com/iotaledger/goshimmer/packages/pow" + "github.com/iotaledger/hive.go/autopeering/peer" + "github.com/iotaledger/hive.go/bytesfilter" +) + +var ( + // ErrInvalidPOWDifficultly is returned when the nonce of a message does not fulfill the PoW difficulty. + ErrInvalidPOWDifficultly = errors.New("invalid PoW") + + // ErrMessageTooSmall is returned when the message does not contain enough data for the PoW. + ErrMessageTooSmall = errors.New("message too small") + + // ErrInvalidSignature is returned when a message contains an invalid signature. + ErrInvalidSignature = fmt.Errorf("invalid signature") + + // ErrReceivedDuplicateBytes is returned when duplicated bytes are rejected. + ErrReceivedDuplicateBytes = fmt.Errorf("received duplicate bytes") +) + +// BytesFilter filters based on byte slices and peers. +type BytesFilter interface { + // Filter filters up on the given bytes and peer and calls the acceptance callback + // if the input passes or the rejection callback if the input is rejected. + Filter(bytes []byte, peer *peer.Peer) + // OnAccept registers the given callback as the acceptance function of the filter. + OnAccept(callback func(bytes []byte, peer *peer.Peer)) + // OnReject registers the given callback as the rejection function of the filter. + OnReject(callback func(bytes []byte, err error, peer *peer.Peer)) +} + +// MessageFilter filters based on messages and peers. +type MessageFilter interface { + // Filter filters up on the given message and peer and calls the acceptance callback + // if the input passes or the rejection callback if the input is rejected. + Filter(msg *Message, peer *peer.Peer) + // OnAccept registers the given callback as the acceptance function of the filter. + OnAccept(callback func(msg *Message, peer *peer.Peer)) + // OnAccept registers the given callback as the rejection function of the filter. + OnReject(callback func(msg *Message, err error, peer *peer.Peer)) +} + +// MessageSignatureFilter filters messages based on whether their signatures are valid. +type MessageSignatureFilter struct { + onAcceptCallback func(msg *Message, peer *peer.Peer) + onRejectCallback func(msg *Message, err error, peer *peer.Peer) + + onAcceptCallbackMutex sync.RWMutex + onRejectCallbackMutex sync.RWMutex +} + +// NewMessageSignatureFilter creates a new message signature filter. +func NewMessageSignatureFilter() *MessageSignatureFilter { + return &MessageSignatureFilter{} +} + +// Filter filters up on the given bytes and peer and calls the acceptance callback +// if the input passes or the rejection callback if the input is rejected. +func (f *MessageSignatureFilter) Filter(msg *Message, peer *peer.Peer) { + if msg.VerifySignature() { + f.getAcceptCallback()(msg, peer) + return + } + f.getRejectCallback()(msg, ErrInvalidSignature, peer) +} + +// OnAccept registers the given callback as the acceptance function of the filter. +func (f *MessageSignatureFilter) OnAccept(callback func(msg *Message, peer *peer.Peer)) { + f.onAcceptCallbackMutex.Lock() + f.onAcceptCallback = callback + f.onAcceptCallbackMutex.Unlock() +} + +// OnReject registers the given callback as the rejection function of the filter. +func (f *MessageSignatureFilter) OnReject(callback func(msg *Message, err error, peer *peer.Peer)) { + f.onRejectCallbackMutex.Lock() + f.onRejectCallback = callback + f.onRejectCallbackMutex.Unlock() +} + +func (f *MessageSignatureFilter) getAcceptCallback() (result func(msg *Message, peer *peer.Peer)) { + f.onAcceptCallbackMutex.RLock() + result = f.onAcceptCallback + f.onAcceptCallbackMutex.RUnlock() + return +} + +func (f *MessageSignatureFilter) getRejectCallback() (result func(msg *Message, err error, peer *peer.Peer)) { + f.onRejectCallbackMutex.RLock() + result = f.onRejectCallback + f.onRejectCallbackMutex.RUnlock() + return +} + +// PowFilter is a message bytes filter validating the PoW nonce. +type PowFilter struct { + worker *pow.Worker + difficulty int + + mu sync.Mutex + acceptCallback func([]byte, *peer.Peer) + rejectCallback func([]byte, error, *peer.Peer) +} + +// NewPowFilter creates a new PoW bytes filter. +func NewPowFilter(worker *pow.Worker, difficulty int) *PowFilter { + return &PowFilter{ + worker: worker, + difficulty: difficulty, + } +} + +// Filter checks whether the given bytes pass the PoW validation and calls the corresponding callback. +func (f *PowFilter) Filter(msgBytes []byte, p *peer.Peer) { + if err := f.validate(msgBytes); err != nil { + f.reject(msgBytes, err, p) + return + } + f.accept(msgBytes, p) +} + +// OnAccept registers the given callback as the acceptance function of the filter. +func (f *PowFilter) OnAccept(callback func([]byte, *peer.Peer)) { + f.mu.Lock() + defer f.mu.Unlock() + f.acceptCallback = callback +} + +// OnReject registers the given callback as the rejection function of the filter. +func (f *PowFilter) OnReject(callback func([]byte, error, *peer.Peer)) { + f.mu.Lock() + defer f.mu.Unlock() + f.rejectCallback = callback +} + +func (f *PowFilter) accept(msgBytes []byte, p *peer.Peer) { + f.mu.Lock() + defer f.mu.Unlock() + if f.acceptCallback != nil { + f.acceptCallback(msgBytes, p) + } +} + +func (f *PowFilter) reject(msgBytes []byte, err error, p *peer.Peer) { + f.mu.Lock() + defer f.mu.Unlock() + if f.rejectCallback != nil { + f.rejectCallback(msgBytes, err, p) + } +} + +func (f *PowFilter) validate(msgBytes []byte) error { + content, err := powData(msgBytes) + if err != nil { + return err + } + zeros, err := f.worker.LeadingZeros(content) + if err != nil { + return err + } + if zeros < f.difficulty { + return fmt.Errorf("%w: leading zeros %d for difficulty %d", ErrInvalidPOWDifficultly, zeros, f.difficulty) + } + return nil +} + +// powData returns the bytes over which PoW should be computed. +func powData(msgBytes []byte) ([]byte, error) { + contentLength := len(msgBytes) - ed25519.SignatureSize + if contentLength < pow.NonceBytes { + return nil, ErrMessageTooSmall + } + return msgBytes[:contentLength], nil +} + +// RecentlySeenBytesFilter filters so that bytes which were recently seen don't pass the filter. +type RecentlySeenBytesFilter struct { + bytesFilter *bytesfilter.BytesFilter + onAcceptCallback func(bytes []byte, peer *peer.Peer) + onRejectCallback func(bytes []byte, err error, peer *peer.Peer) + + onAcceptCallbackMutex sync.RWMutex + onRejectCallbackMutex sync.RWMutex +} + +// NewRecentlySeenBytesFilter creates a new recently seen bytes filter. +func NewRecentlySeenBytesFilter() *RecentlySeenBytesFilter { + return &RecentlySeenBytesFilter{ + bytesFilter: bytesfilter.New(100000), + } +} + +// Filter filters up on the given bytes and peer and calls the acceptance callback +// if the input passes or the rejection callback if the input is rejected. +func (f *RecentlySeenBytesFilter) Filter(bytes []byte, peer *peer.Peer) { + if f.bytesFilter.Add(bytes) { + f.getAcceptCallback()(bytes, peer) + return + } + f.getRejectCallback()(bytes, ErrReceivedDuplicateBytes, peer) +} + +// OnAccept registers the given callback as the acceptance function of the filter. +func (f *RecentlySeenBytesFilter) OnAccept(callback func(bytes []byte, peer *peer.Peer)) { + f.onAcceptCallbackMutex.Lock() + f.onAcceptCallback = callback + f.onAcceptCallbackMutex.Unlock() +} + +// OnReject registers the given callback as the rejection function of the filter. +func (f *RecentlySeenBytesFilter) OnReject(callback func(bytes []byte, err error, peer *peer.Peer)) { + f.onRejectCallbackMutex.Lock() + f.onRejectCallback = callback + f.onRejectCallbackMutex.Unlock() +} + +func (f *RecentlySeenBytesFilter) getAcceptCallback() (result func(bytes []byte, peer *peer.Peer)) { + f.onAcceptCallbackMutex.Lock() + result = f.onAcceptCallback + f.onAcceptCallbackMutex.Unlock() + return +} + +func (f *RecentlySeenBytesFilter) getRejectCallback() (result func(bytes []byte, err error, peer *peer.Peer)) { + f.onRejectCallbackMutex.Lock() + result = f.onRejectCallback + f.onRejectCallbackMutex.Unlock() + return +} diff --git a/packages/tangle/filter_test.go b/packages/tangle/filter_test.go new file mode 100644 index 0000000000000000000000000000000000000000..a36808e9be895739b0cbfd36a8188a6798cc0955 --- /dev/null +++ b/packages/tangle/filter_test.go @@ -0,0 +1,64 @@ +package tangle + +import ( + "context" + "crypto" + "errors" + "testing" + + "github.com/iotaledger/goshimmer/packages/pow" + "github.com/iotaledger/hive.go/autopeering/peer" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + _ "golang.org/x/crypto/blake2b" // required by crypto.BLAKE2b_512 +) + +var ( + testPeer *peer.Peer + testWorker = pow.New(crypto.BLAKE2b_512, 1) + testDifficulty = 10 +) + +func TestPowFilter_Filter(t *testing.T) { + filter := NewPowFilter(testWorker, testDifficulty) + + // set callbacks + m := &callbackMock{} + filter.OnAccept(m.Accept) + filter.OnReject(m.Reject) + + t.Run("reject small message", func(t *testing.T) { + m.On("Reject", mock.Anything, mock.MatchedBy(func(err error) bool { return errors.Is(err, ErrMessageTooSmall) }), testPeer) + filter.Filter(nil, testPeer) + }) + + msg := newTestNonceMessage(0) + msgBytes := msg.Bytes() + + t.Run("reject invalid nonce", func(t *testing.T) { + m.On("Reject", msgBytes, mock.MatchedBy(func(err error) bool { return errors.Is(err, ErrInvalidPOWDifficultly) }), testPeer) + filter.Filter(msgBytes, testPeer) + }) + + nonce, err := testWorker.Mine(context.Background(), msgBytes[:len(msgBytes)-len(msg.Signature())-pow.NonceBytes], testDifficulty) + require.NoError(t, err) + + msgPOW := newTestNonceMessage(nonce) + msgPOWBytes := msgPOW.Bytes() + + t.Run("accept valid nonce", func(t *testing.T) { + zeroes, err := testWorker.LeadingZeros(msgPOWBytes[:len(msgPOWBytes)-len(msgPOW.Signature())]) + require.NoError(t, err) + require.GreaterOrEqual(t, zeroes, testDifficulty) + + m.On("Accept", msgPOWBytes, testPeer) + filter.Filter(msgPOWBytes, testPeer) + }) + + m.AssertExpectations(t) +} + +type callbackMock struct{ mock.Mock } + +func (m *callbackMock) Accept(msg []byte, p *peer.Peer) { m.Called(msg, p) } +func (m *callbackMock) Reject(msg []byte, err error, p *peer.Peer) { m.Called(msg, err, p) } diff --git a/packages/tangle/message.go b/packages/tangle/message.go new file mode 100644 index 0000000000000000000000000000000000000000..fecb7c7a68c2508d9700b4f143be742585cb48e2 --- /dev/null +++ b/packages/tangle/message.go @@ -0,0 +1,435 @@ +package tangle + +import ( + "fmt" + "sync" + "time" + + "github.com/iotaledger/hive.go/crypto/ed25519" + "github.com/iotaledger/hive.go/marshalutil" + "github.com/iotaledger/hive.go/objectstorage" + "github.com/iotaledger/hive.go/stringify" + "github.com/mr-tron/base58" + "golang.org/x/crypto/blake2b" +) + +const ( + // MaxMessageSize defines the maximum size of a message. + MaxMessageSize = 64 * 1024 + + // MessageIDLength defines the length of an MessageID. + MessageIDLength = 64 +) + +// ContentID identifies the content of a message without its parent1/parent2 ids. +type ContentID = MessageID + +// MessageID identifies a message in its entirety. Unlike the sole content id, it also incorporates +// the parent1 and parent2 ids. +type MessageID [MessageIDLength]byte + +// EmptyMessageID is an empty id. +var EmptyMessageID = MessageID{} + +// NewMessageID creates a new message id. +func NewMessageID(base58EncodedString string) (result MessageID, err error) { + bytes, err := base58.Decode(base58EncodedString) + if err != nil { + err = fmt.Errorf("failed to decode base58 encoded string '%s': %w", base58EncodedString, err) + + return + } + + if len(bytes) != MessageIDLength { + err = fmt.Errorf("length of base58 formatted message id is wrong") + + return + } + + copy(result[:], bytes) + + return +} + +// MessageIDFromBytes unmarshals a message id from a sequence of bytes. +func MessageIDFromBytes(bytes []byte) (result MessageID, consumedBytes int, err error) { + // check arguments + if len(bytes) < MessageIDLength { + err = fmt.Errorf("bytes not long enough to encode a valid message id") + } + + // calculate result + copy(result[:], bytes) + + // return the number of bytes we processed + consumedBytes = MessageIDLength + + return +} + +// MessageIDFromMarshalUtil is a wrapper for simplified unmarshaling in a byte stream using the marshalUtil package. +func MessageIDFromMarshalUtil(marshalUtil *marshalutil.MarshalUtil) (MessageID, error) { + id, err := marshalUtil.Parse(func(data []byte) (interface{}, int, error) { return MessageIDFromBytes(data) }) + if err != nil { + err = fmt.Errorf("failed to parse message ID: %w", err) + return MessageID{}, err + } + return id.(MessageID), nil +} + +// MarshalBinary marshals the MessageID into bytes. +func (id *MessageID) MarshalBinary() (result []byte, err error) { + return id.Bytes(), nil +} + +// UnmarshalBinary unmarshals the bytes into an MessageID. +func (id *MessageID) UnmarshalBinary(data []byte) (err error) { + copy(id[:], data) + + return +} + +// Bytes returns the bytes of the MessageID. +func (id MessageID) Bytes() []byte { + return id[:] +} + +// String returns the base58 encode of the MessageID. +func (id MessageID) String() string { + return base58.Encode(id[:]) +} + +// Message represents the core message for the base layer Tangle. +type Message struct { + // base functionality of StorableObject + objectstorage.StorableObjectFlags + + // core properties (get sent over the wire) + parent1ID MessageID + parent2ID MessageID + issuerPublicKey ed25519.PublicKey + issuingTime time.Time + sequenceNumber uint64 + payload Payload + nonce uint64 + signature ed25519.Signature + + // derived properties + id *MessageID + idMutex sync.RWMutex + contentID *ContentID + contentIDMutex sync.RWMutex + bytes []byte + bytesMutex sync.RWMutex +} + +// NewMessage creates a new message with the details provided by the issuer. +func NewMessage(parent1ID MessageID, parent2ID MessageID, issuingTime time.Time, issuerPublicKey ed25519.PublicKey, sequenceNumber uint64, payload Payload, nonce uint64, signature ed25519.Signature) (result *Message) { + return &Message{ + parent1ID: parent1ID, + parent2ID: parent2ID, + issuerPublicKey: issuerPublicKey, + issuingTime: issuingTime, + sequenceNumber: sequenceNumber, + payload: payload, + nonce: nonce, + signature: signature, + } +} + +// MessageFromBytes parses the given bytes into a message. +func MessageFromBytes(bytes []byte) (result *Message, consumedBytes int, err error) { + marshalUtil := marshalutil.New(bytes) + result, err = MessageFromMarshalUtil(marshalUtil) + consumedBytes = marshalUtil.ReadOffset() + return +} + +// MessageFromMarshalUtil parses a message from the given marshal util. +func MessageFromMarshalUtil(marshalUtil *marshalutil.MarshalUtil) (result *Message, err error) { + // determine read offset before starting to parse + readOffsetStart := marshalUtil.ReadOffset() + + // parse information + result = &Message{} + if result.parent1ID, err = MessageIDFromMarshalUtil(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse parent1 message ID of the message: %w", err) + return + } + if result.parent2ID, err = MessageIDFromMarshalUtil(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse parent1 message ID of the message: %w", err) + return + } + if result.issuerPublicKey, err = ed25519.ParsePublicKey(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse issuer public key of the message: %w", err) + return + } + if result.issuingTime, err = marshalUtil.ReadTime(); err != nil { + err = fmt.Errorf("failed to parse issuing time of the message: %w", err) + return + } + if result.sequenceNumber, err = marshalUtil.ReadUint64(); err != nil { + err = fmt.Errorf("failed to parse sequence number of the message: %w", err) + return + } + if result.payload, err = PayloadFromMarshalUtil(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse payload of the message: %w", err) + return + } + if result.nonce, err = marshalUtil.ReadUint64(); err != nil { + err = fmt.Errorf("failed to parse nonce of the message: %w", err) + return + } + if result.signature, err = ed25519.ParseSignature(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse signature of the message: %w", err) + return + } + + // retrieve the number of bytes we processed + readOffsetEnd := marshalUtil.ReadOffset() + + // store marshaled version as a copy + result.bytes, err = marshalUtil.ReadBytes(readOffsetEnd-readOffsetStart, readOffsetStart) + if err != nil { + err = fmt.Errorf("error trying to copy raw source bytes: %w", err) + return + } + + return +} + +// MessageFromObjectStorage restores a Message from the ObjectStorage. +func MessageFromObjectStorage(key []byte, data []byte) (result objectstorage.StorableObject, err error) { + // parse the message + message, err := MessageFromMarshalUtil(marshalutil.New(data)) + if err != nil { + err = fmt.Errorf("failed to parse message from object storage: %w", err) + return + } + + // parse the ID from they key + id, err := MessageIDFromMarshalUtil(marshalutil.New(key)) + if err != nil { + err = fmt.Errorf("failed to parse message ID from object storage: %w", err) + return + } + message.id = &id + + // assign result + result = message + + return +} + +// VerifySignature verifies the signature of the message. +func (m *Message) VerifySignature() bool { + msgBytes := m.Bytes() + signature := m.Signature() + + contentLength := len(msgBytes) - len(signature) + content := msgBytes[:contentLength] + + return m.issuerPublicKey.VerifySignature(content, signature) +} + +// ID returns the id of the message which is made up of the content id and parent1/parent2 ids. +// This id can be used for merkle proofs. +func (m *Message) ID() (result MessageID) { + m.idMutex.RLock() + + if m.id == nil { + m.idMutex.RUnlock() + + m.idMutex.Lock() + defer m.idMutex.Unlock() + if m.id != nil { + result = *m.id + return + } + result = m.calculateID() + m.id = &result + return + } + + result = *m.id + m.idMutex.RUnlock() + return +} + +// Parent1ID returns the id of the parent1 message. +func (m *Message) Parent1ID() MessageID { + return m.parent1ID +} + +// Parent2ID returns the id of the parent2 message. +func (m *Message) Parent2ID() MessageID { + return m.parent2ID +} + +// IssuerPublicKey returns the public key of the message issuer. +func (m *Message) IssuerPublicKey() ed25519.PublicKey { + return m.issuerPublicKey +} + +// IssuingTime returns the time when this message was created. +func (m *Message) IssuingTime() time.Time { + return m.issuingTime +} + +// SequenceNumber returns the sequence number of this message. +func (m *Message) SequenceNumber() uint64 { + return m.sequenceNumber +} + +// Payload returns the payload of the message. +func (m *Message) Payload() Payload { + return m.payload +} + +// Nonce returns the nonce of the message. +func (m *Message) Nonce() uint64 { + return m.nonce +} + +// Signature returns the signature of the message. +func (m *Message) Signature() ed25519.Signature { + return m.signature +} + +// ContentID returns the content id of the message which is made up of all the +// parts of the message minus the parent1 and parent2 ids. +func (m *Message) ContentID() (result ContentID) { + m.contentIDMutex.RLock() + if m.contentID == nil { + m.contentIDMutex.RUnlock() + + m.contentIDMutex.Lock() + defer m.contentIDMutex.Unlock() + if m.contentID != nil { + result = *m.contentID + return + } + result = m.calculateContentID() + m.contentID = &result + return + } + + result = *m.contentID + m.contentIDMutex.RUnlock() + return +} + +// calculates the message id. +func (m *Message) calculateID() MessageID { + return blake2b.Sum512( + marshalutil.New(MessageIDLength + MessageIDLength + PayloadIDLength). + WriteBytes(m.parent1ID.Bytes()). + WriteBytes(m.parent2ID.Bytes()). + WriteBytes(m.ContentID().Bytes()). + Bytes(), + ) +} + +// calculates the content id of the message. +func (m *Message) calculateContentID() ContentID { + // compute content id from the message data (except parent1 and parent2 ids) + return blake2b.Sum512(m.Bytes()[2*MessageIDLength:]) +} + +// Bytes returns the message in serialized byte form. +func (m *Message) Bytes() []byte { + m.bytesMutex.RLock() + if m.bytes != nil { + defer m.bytesMutex.RUnlock() + + return m.bytes + } + + m.bytesMutex.RUnlock() + m.bytesMutex.RLock() + defer m.bytesMutex.RUnlock() + + if m.bytes != nil { + return m.bytes + } + + // marshal result + marshalUtil := marshalutil.New() + marshalUtil.WriteBytes(m.parent1ID.Bytes()) + marshalUtil.WriteBytes(m.parent2ID.Bytes()) + marshalUtil.WriteBytes(m.issuerPublicKey.Bytes()) + marshalUtil.WriteTime(m.issuingTime) + marshalUtil.WriteUint64(m.sequenceNumber) + marshalUtil.WriteBytes(m.payload.Bytes()) + marshalUtil.WriteUint64(m.nonce) + marshalUtil.WriteBytes(m.signature.Bytes()) + + m.bytes = marshalUtil.Bytes() + + return m.bytes +} + +// ObjectStorageKey returns the key of the stored message object. +// This returns the bytes of the message ID. +func (m *Message) ObjectStorageKey() []byte { + return m.ID().Bytes() +} + +// ObjectStorageValue returns the value stored in object storage. +// This returns the bytes of message. +func (m *Message) ObjectStorageValue() []byte { + return m.Bytes() +} + +// Update updates the object with the values of another object. +// Since a Message is immutable, this function is not implemented and panics. +func (m *Message) Update(objectstorage.StorableObject) { + panic("messages should never be overwritten and only stored once to optimize IO") +} + +func (m *Message) String() string { + return stringify.Struct("Message", + stringify.StructField("id", m.ID()), + stringify.StructField("parent1Id", m.Parent1ID()), + stringify.StructField("parent2Id", m.Parent2ID()), + stringify.StructField("issuer", m.IssuerPublicKey()), + stringify.StructField("issuingTime", m.IssuingTime()), + stringify.StructField("sequenceNumber", m.SequenceNumber()), + stringify.StructField("payload", m.Payload()), + stringify.StructField("nonce", m.Nonce()), + stringify.StructField("signature", m.Signature()), + ) +} + +// CachedMessage defines a cached message. +// A wrapper for a cached object. +type CachedMessage struct { + objectstorage.CachedObject +} + +// Retain registers a new consumer for the cached message. +func (c *CachedMessage) Retain() *CachedMessage { + return &CachedMessage{c.CachedObject.Retain()} +} + +// Consume consumes the cached object and releases it when the callback is done. +// It returns true if the callback was called. +func (c *CachedMessage) Consume(consumer func(msg *Message)) bool { + return c.CachedObject.Consume(func(object objectstorage.StorableObject) { + consumer(object.(*Message)) + }) +} + +// Unwrap returns the message wrapped by the cached message. +// If the wrapped object cannot be cast to a Message or has been deleted, it returns nil. +func (c *CachedMessage) Unwrap() *Message { + untypedMessage := c.Get() + if untypedMessage == nil { + return nil + } + typeCastedMessage := untypedMessage.(*Message) + if typeCastedMessage == nil || typeCastedMessage.IsDeleted() { + return nil + } + return typeCastedMessage +} diff --git a/packages/tangle/message_test.go b/packages/tangle/message_test.go new file mode 100644 index 0000000000000000000000000000000000000000..afb737e8950ee3875b528ca84a648cd142b7f6c3 --- /dev/null +++ b/packages/tangle/message_test.go @@ -0,0 +1,51 @@ +package tangle + +import ( + "testing" + "time" + + "github.com/iotaledger/goshimmer/plugins/messagelayer" + "github.com/iotaledger/hive.go/crypto/ed25519" + "github.com/iotaledger/hive.go/identity" + "github.com/iotaledger/hive.go/kvstore/mapdb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMessage_VerifySignature(t *testing.T) { + keyPair := ed25519.GenerateKeyPair() + pl := NewDataPayload([]byte("test")) + + unsigned := NewMessage(EmptyMessageID, EmptyMessageID, time.Time{}, keyPair.PublicKey, 0, pl, 0, ed25519.Signature{}) + assert.False(t, unsigned.VerifySignature()) + + unsignedBytes := unsigned.Bytes() + signature := keyPair.PrivateKey.Sign(unsignedBytes[:len(unsignedBytes)-ed25519.SignatureSize]) + + signed := NewMessage(EmptyMessageID, EmptyMessageID, time.Time{}, keyPair.PublicKey, 0, pl, 0, signature) + assert.True(t, signed.VerifySignature()) +} + +func TestMessage_MarshalUnmarshal(t *testing.T) { + msgFactory := NewMessageFactory(mapdb.NewMapDB(), []byte(messagelayer.DBSequenceNumber), identity.GenerateLocalIdentity(), NewMessageTipSelector()) + defer msgFactory.Shutdown() + + testMessage, err := msgFactory.IssuePayload(NewDataPayload([]byte("test"))) + require.NoError(t, err) + assert.Equal(t, true, testMessage.VerifySignature()) + + t.Log(testMessage) + + restoredMessage, _, err := MessageFromBytes(testMessage.Bytes()) + if assert.NoError(t, err, err) { + assert.Equal(t, testMessage.ID(), restoredMessage.ID()) + assert.Equal(t, testMessage.Parent1ID(), restoredMessage.Parent1ID()) + assert.Equal(t, testMessage.Parent2ID(), restoredMessage.Parent2ID()) + assert.Equal(t, testMessage.IssuerPublicKey(), restoredMessage.IssuerPublicKey()) + assert.Equal(t, testMessage.IssuingTime().Round(time.Second), restoredMessage.IssuingTime().Round(time.Second)) + assert.Equal(t, testMessage.SequenceNumber(), restoredMessage.SequenceNumber()) + assert.Equal(t, testMessage.Nonce(), restoredMessage.Nonce()) + assert.Equal(t, testMessage.Signature(), restoredMessage.Signature()) + assert.Equal(t, true, restoredMessage.VerifySignature()) + } +} diff --git a/packages/tangle/messagefactory.go b/packages/tangle/messagefactory.go new file mode 100644 index 0000000000000000000000000000000000000000..f517dd6e5cf81514d1818b1dcfe1c543009f1217 --- /dev/null +++ b/packages/tangle/messagefactory.go @@ -0,0 +1,153 @@ +package tangle + +import ( + "fmt" + "sync" + "time" + + "github.com/iotaledger/hive.go/crypto/ed25519" + "github.com/iotaledger/hive.go/identity" + "github.com/iotaledger/hive.go/kvstore" +) + +const storeSequenceInterval = 100 + +var ( + // ZeroWorker is a PoW worker that always returns 0 as the nonce. + ZeroWorker = WorkerFunc(func([]byte) (uint64, error) { return 0, nil }) +) + +// A TipSelector selects two tips, parent2 and parent1, for a new message to attach to. +type TipSelector interface { + Tips() (parent1 MessageID, parent2 MessageID) +} + +// A Worker performs the PoW for the provided message in serialized byte form. +type Worker interface { + DoPOW([]byte) (nonce uint64, err error) +} + +// MessageFactory acts as a factory to create new messages. +type MessageFactory struct { + Events *MessageFactoryEvents + sequence *kvstore.Sequence + localIdentity *identity.LocalIdentity + selector TipSelector + + worker Worker + workerMutex sync.RWMutex + issuanceMutex sync.Mutex +} + +// NewMessageFactory creates a new message factory. +func NewMessageFactory(store kvstore.KVStore, sequenceKey []byte, localIdentity *identity.LocalIdentity, selector TipSelector) *MessageFactory { + sequence, err := kvstore.NewSequence(store, sequenceKey, storeSequenceInterval) + if err != nil { + panic(fmt.Sprintf("could not create message sequence number: %v", err)) + } + + return &MessageFactory{ + Events: newMessageFactoryEvents(), + sequence: sequence, + localIdentity: localIdentity, + selector: selector, + worker: ZeroWorker, + } +} + +// SetWorker sets the PoW worker to be used for the messages. +func (f *MessageFactory) SetWorker(worker Worker) { + f.workerMutex.Lock() + defer f.workerMutex.Unlock() + f.worker = worker +} + +// IssuePayload creates a new message including sequence number and tip selection and returns it. +// It also triggers the MessageConstructed event once it's done, which is for example used by the plugins to listen for +// messages that shall be attached to the tangle. +func (f *MessageFactory) IssuePayload(p Payload) (*Message, error) { + payloadLen := len(p.Bytes()) + if payloadLen > MaxPayloadSize { + err := fmt.Errorf("%w: %d bytes", ErrMaxPayloadSizeExceeded, payloadLen) + f.Events.Error.Trigger(err) + return nil, err + } + + f.issuanceMutex.Lock() + defer f.issuanceMutex.Unlock() + sequenceNumber, err := f.sequence.Next() + if err != nil { + err = fmt.Errorf("could not create sequence number: %w", err) + f.Events.Error.Trigger(err) + return nil, err + } + + parent1ID, parent2ID := f.selector.Tips() + issuingTime := time.Now() + issuerPublicKey := f.localIdentity.PublicKey() + + // do the PoW + nonce, err := f.doPOW(parent1ID, parent2ID, issuingTime, issuerPublicKey, sequenceNumber, p) + if err != nil { + err = fmt.Errorf("pow failed: %w", err) + f.Events.Error.Trigger(err) + return nil, err + } + + // create the signature + signature := f.sign(parent1ID, parent2ID, issuingTime, issuerPublicKey, sequenceNumber, p, nonce) + + msg := NewMessage( + parent1ID, + parent2ID, + issuingTime, + issuerPublicKey, + sequenceNumber, + p, + nonce, + signature, + ) + f.Events.MessageConstructed.Trigger(msg) + return msg, nil +} + +// Shutdown closes the MessageFactory and persists the sequence number. +func (f *MessageFactory) Shutdown() { + if err := f.sequence.Release(); err != nil { + f.Events.Error.Trigger(fmt.Errorf("could not release message sequence number: %w", err)) + } +} + +func (f *MessageFactory) doPOW(parent1ID MessageID, parent2ID MessageID, issuingTime time.Time, key ed25519.PublicKey, seq uint64, payload Payload) (uint64, error) { + // create a dummy message to simplify marshaling + dummy := NewMessage(parent1ID, parent2ID, issuingTime, key, seq, payload, 0, ed25519.EmptySignature).Bytes() + + f.workerMutex.RLock() + defer f.workerMutex.RUnlock() + return f.worker.DoPOW(dummy) +} + +func (f *MessageFactory) sign(parent1ID MessageID, parent2ID MessageID, issuingTime time.Time, key ed25519.PublicKey, seq uint64, payload Payload, nonce uint64) ed25519.Signature { + // create a dummy message to simplify marshaling + dummy := NewMessage(parent1ID, parent2ID, issuingTime, key, seq, payload, nonce, ed25519.EmptySignature) + dummyBytes := dummy.Bytes() + + contentLength := len(dummyBytes) - len(dummy.Signature()) + return f.localIdentity.Sign(dummyBytes[:contentLength]) +} + +// The TipSelectorFunc type is an adapter to allow the use of ordinary functions as tip selectors. +type TipSelectorFunc func() (MessageID, MessageID) + +// Tips calls f(). +func (f TipSelectorFunc) Tips() (MessageID, MessageID) { + return f() +} + +// The WorkerFunc type is an adapter to allow the use of ordinary functions as a PoW performer. +type WorkerFunc func([]byte) (uint64, error) + +// DoPOW calls f(msg). +func (f WorkerFunc) DoPOW(msg []byte) (uint64, error) { + return f(msg) +} diff --git a/packages/tangle/messagefactory_test.go b/packages/tangle/messagefactory_test.go new file mode 100644 index 0000000000000000000000000000000000000000..53ac21e84a082377970498c7384b804213214b83 --- /dev/null +++ b/packages/tangle/messagefactory_test.go @@ -0,0 +1,161 @@ +package tangle + +import ( + "context" + "crypto" + "crypto/ed25519" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/iotaledger/goshimmer/packages/pow" + "github.com/iotaledger/hive.go/events" + "github.com/iotaledger/hive.go/identity" + "github.com/iotaledger/hive.go/kvstore/mapdb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + _ "golang.org/x/crypto/blake2b" +) + +const ( + sequenceKey = "seq" + targetPOW = 10 + totalMessages = 2000 +) + +func TestMessageFactory_BuildMessage(t *testing.T) { + msgFactory := NewMessageFactory( + mapdb.NewMapDB(), + []byte(sequenceKey), + identity.GenerateLocalIdentity(), + TipSelectorFunc(func() (MessageID, MessageID) { return EmptyMessageID, EmptyMessageID }), + ) + defer msgFactory.Shutdown() + + // keep track of sequence numbers + sequenceNumbers := sync.Map{} + + // attach to event and count + countEvents := uint64(0) + msgFactory.Events.MessageConstructed.Attach(events.NewClosure(func(msg *Message) { + atomic.AddUint64(&countEvents, 1) + })) + + t.Run("CheckProperties", func(t *testing.T) { + p := NewDataPayload([]byte("TestCheckProperties")) + msg, err := msgFactory.IssuePayload(p) + require.NoError(t, err) + + assert.NotNil(t, msg.Parent1ID()) + assert.NotNil(t, msg.Parent2ID()) + + // time in range of 0.1 seconds + assert.InDelta(t, time.Now().UnixNano(), msg.IssuingTime().UnixNano(), 100000000) + + // check payload + assert.Equal(t, p, msg.Payload()) + + // check total events and sequence number + assert.EqualValues(t, 1, countEvents) + assert.EqualValues(t, 0, msg.SequenceNumber()) + + sequenceNumbers.Store(msg.SequenceNumber(), true) + }) + + // create messages in parallel + t.Run("ParallelCreation", func(t *testing.T) { + for i := 1; i < totalMessages; i++ { + t.Run("test", func(t *testing.T) { + t.Parallel() + + p := NewDataPayload([]byte("TestParallelCreation")) + msg, err := msgFactory.IssuePayload(p) + require.NoError(t, err) + + assert.NotNil(t, msg.Parent1ID()) + assert.NotNil(t, msg.Parent2ID()) + + // time in range of 0.1 seconds + assert.InDelta(t, time.Now().UnixNano(), msg.IssuingTime().UnixNano(), 100000000) + + // check payload + assert.Equal(t, p, msg.Payload()) + + sequenceNumbers.Store(msg.SequenceNumber(), true) + }) + } + }) + + // check total events and sequence number + assert.EqualValues(t, totalMessages, countEvents) + + max := uint64(0) + countSequence := 0 + sequenceNumbers.Range(func(key, value interface{}) bool { + seq := key.(uint64) + val := value.(bool) + if val != true { + return false + } + + // check for max sequence number + if seq > max { + max = seq + } + countSequence++ + return true + }) + assert.EqualValues(t, totalMessages-1, max) + assert.EqualValues(t, totalMessages, countSequence) +} + +func TestMessageFactory_POW(t *testing.T) { + msgFactory := NewMessageFactory( + mapdb.NewMapDB(), + []byte(sequenceKey), + identity.GenerateLocalIdentity(), + TipSelectorFunc(func() (MessageID, MessageID) { return EmptyMessageID, EmptyMessageID }), + ) + defer msgFactory.Shutdown() + + worker := pow.New(crypto.BLAKE2b_512, 1) + + msgFactory.SetWorker(WorkerFunc(func(msgBytes []byte) (uint64, error) { + content := msgBytes[:len(msgBytes)-ed25519.SignatureSize-8] + return worker.Mine(context.Background(), content, targetPOW) + })) + + msg, err := msgFactory.IssuePayload(NewDataPayload([]byte("test"))) + require.NoError(t, err) + + msgBytes := msg.Bytes() + content := msgBytes[:len(msgBytes)-ed25519.SignatureSize-8] + + zeroes, err := worker.LeadingZerosWithNonce(content, msg.Nonce()) + assert.GreaterOrEqual(t, zeroes, targetPOW) + assert.NoError(t, err) +} + +func TestWorkerFunc_PayloadSize(t *testing.T) { + msgFactory := NewMessageFactory( + mapdb.NewMapDB(), + []byte(sequenceKey), + identity.GenerateLocalIdentity(), + TipSelectorFunc(func() (MessageID, MessageID) { return EmptyMessageID, EmptyMessageID }), + ) + defer msgFactory.Shutdown() + + // issue message with max allowed payload size + // dataPayload headers: type|32bit + size|32bit + data := make([]byte, MaxPayloadSize-4-4) + msg, err := msgFactory.IssuePayload(NewDataPayload(data)) + require.NoError(t, err) + assert.Truef(t, MaxMessageSize == len(msg.Bytes()), "message size should be exactly %d bytes but is %d", MaxMessageSize, len(msg.Bytes())) + + // issue message bigger than max allowed payload size + data = make([]byte, MaxPayloadSize) + msg, err = msgFactory.IssuePayload(NewDataPayload(data)) + require.Error(t, err) + assert.Nil(t, msg) +} diff --git a/packages/tangle/messagemetadata.go b/packages/tangle/messagemetadata.go new file mode 100644 index 0000000000000000000000000000000000000000..08f695d1d20275acad06622f13b97b999e9c9f8d --- /dev/null +++ b/packages/tangle/messagemetadata.go @@ -0,0 +1,179 @@ +package tangle + +import ( + "fmt" + "sync" + "time" + + "github.com/iotaledger/hive.go/byteutils" + "github.com/iotaledger/hive.go/marshalutil" + "github.com/iotaledger/hive.go/objectstorage" +) + +// MessageMetadata defines the metadata for a message. +type MessageMetadata struct { + objectstorage.StorableObjectFlags + + messageID MessageID + receivedTime time.Time + solid bool + solidificationTime time.Time + + solidMutex sync.RWMutex + solidificationTimeMutex sync.RWMutex +} + +// NewMessageMetadata creates a new MessageMetadata from the specified messageID. +func NewMessageMetadata(messageID MessageID) *MessageMetadata { + return &MessageMetadata{ + messageID: messageID, + receivedTime: time.Now(), + } +} + +// MessageMetadataFromBytes unmarshals the given bytes into a MessageMetadata. +func MessageMetadataFromBytes(bytes []byte) (result *MessageMetadata, consumedBytes int, err error) { + marshalUtil := marshalutil.New(bytes) + result, err = MessageMetadataFromMarshalUtil(marshalUtil) + consumedBytes = marshalUtil.ReadOffset() + + return +} + +// MessageMetadataFromMarshalUtil parses a Message from the given MarshalUtil. +func MessageMetadataFromMarshalUtil(marshalUtil *marshalutil.MarshalUtil) (result *MessageMetadata, err error) { + result = &MessageMetadata{} + + if result.messageID, err = MessageIDFromMarshalUtil(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse message ID of message metadata: %w", err) + return + } + if result.receivedTime, err = marshalUtil.ReadTime(); err != nil { + err = fmt.Errorf("failed to parse received time of message metadata: %w", err) + return + } + if result.solidificationTime, err = marshalUtil.ReadTime(); err != nil { + err = fmt.Errorf("failed to parse solidification time of message metadata: %w", err) + return + } + if result.solid, err = marshalUtil.ReadBool(); err != nil { + err = fmt.Errorf("failed to parse 'solid' of message metadata: %w", err) + return + } + + return +} + +// MessageMetadataFromObjectStorage restores a MessageMetadata object from the ObjectStorage. +func MessageMetadataFromObjectStorage(key []byte, data []byte) (result objectstorage.StorableObject, err error) { + if result, _, err = MessageMetadataFromBytes(byteutils.ConcatBytes(key, data)); err != nil { + err = fmt.Errorf("failed to parse message metadata from object storage: %w", err) + return + } + + return +} + +// ReceivedTime returns the time when the message was received. +func (m *MessageMetadata) ReceivedTime() time.Time { + return m.receivedTime +} + +// IsSolid returns true if the message represented by this metadata is solid. False otherwise. +func (m *MessageMetadata) IsSolid() (result bool) { + m.solidMutex.RLock() + result = m.solid + m.solidMutex.RUnlock() + + return +} + +// SetSolid sets the message associated with this metadata as solid. +// It returns true if the solid status is modified. False otherwise. +func (m *MessageMetadata) SetSolid(solid bool) (modified bool) { + m.solidMutex.RLock() + if m.solid != solid { + m.solidMutex.RUnlock() + + m.solidMutex.Lock() + if m.solid != solid { + m.solid = solid + if solid { + m.solidificationTimeMutex.Lock() + m.solidificationTime = time.Now() + m.solidificationTimeMutex.Unlock() + } + + m.SetModified() + + modified = true + } + m.solidMutex.Unlock() + + } else { + m.solidMutex.RUnlock() + } + + return +} + +// SolidificationTime returns the time when the message was marked to be solid. +func (m *MessageMetadata) SolidificationTime() time.Time { + m.solidificationTimeMutex.RLock() + defer m.solidificationTimeMutex.RUnlock() + + return m.solidificationTime +} + +// Bytes returns a marshaled version of the whole MessageMetadata object. +func (m *MessageMetadata) Bytes() []byte { + return byteutils.ConcatBytes(m.ObjectStorageKey(), m.ObjectStorageValue()) +} + +// ObjectStorageKey returns the key of the stored message metadata object. +// This returns the bytes of the messageID. +func (m *MessageMetadata) ObjectStorageKey() []byte { + return m.messageID.Bytes() +} + +// ObjectStorageValue returns the value of the stored message metadata object. +// This includes the receivedTime, solidificationTime and solid status. +func (m *MessageMetadata) ObjectStorageValue() []byte { + return marshalutil.New(). + WriteTime(m.ReceivedTime()). + WriteTime(m.SolidificationTime()). + WriteBool(m.IsSolid()). + Bytes() +} + +// Update updates the message metadata. +// This should never happen and will panic if attempted. +func (m *MessageMetadata) Update(other objectstorage.StorableObject) { + panic("updates disabled") +} + +var _ objectstorage.StorableObject = &MessageMetadata{} + +// CachedMessageMetadata is a wrapper for stored cached object that represents a message metadata. +type CachedMessageMetadata struct { + objectstorage.CachedObject +} + +// Retain registers a new consumer for the cached message metadata. +func (c *CachedMessageMetadata) Retain() *CachedMessageMetadata { + return &CachedMessageMetadata{c.CachedObject.Retain()} +} + +// Unwrap returns the underlying stored message metadata wrapped by the CachedMessageMetadata. +// If the stored object cannot be cast to MessageMetadata or is deleted, it returns nil. +func (c *CachedMessageMetadata) Unwrap() *MessageMetadata { + untypedObject := c.Get() + if untypedObject == nil { + return nil + } + typedObject := untypedObject.(*MessageMetadata) + if typedObject == nil || typedObject.IsDeleted() { + return nil + } + return typedObject +} diff --git a/packages/tangle/messageparser.go b/packages/tangle/messageparser.go new file mode 100644 index 0000000000000000000000000000000000000000..711afaea57aacf15472fac2d2409874d1f4f6c5c --- /dev/null +++ b/packages/tangle/messageparser.go @@ -0,0 +1,126 @@ +package tangle + +import ( + "sync" + + "github.com/iotaledger/hive.go/autopeering/peer" + "github.com/iotaledger/hive.go/typeutils" +) + +// MessageParser parses messages and bytes and emits corresponding events for parsed and rejected messages. +type MessageParser struct { + bytesFilters []BytesFilter + messageFilters []MessageFilter + Events *MessageParserEvents + + byteFiltersModified typeutils.AtomicBool + messageFiltersModified typeutils.AtomicBool + bytesFiltersMutex sync.Mutex + messageFiltersMutex sync.Mutex +} + +// NewMessageParser creates a new message parser. +func NewMessageParser() (result *MessageParser) { + result = &MessageParser{ + bytesFilters: make([]BytesFilter, 0), + messageFilters: make([]MessageFilter, 0), + Events: newMessageParserEvents(), + } + + // add builtin filters + result.AddBytesFilter(NewRecentlySeenBytesFilter()) + result.AddMessageFilter(NewMessageSignatureFilter()) + return +} + +// Parse parses the given message bytes. +func (p *MessageParser) Parse(messageBytes []byte, peer *peer.Peer) { + p.setupBytesFilterDataFlow() + p.setupMessageFilterDataFlow() + p.bytesFilters[0].Filter(messageBytes, peer) +} + +// AddBytesFilter adds the given bytes filter to the parser. +func (p *MessageParser) AddBytesFilter(filter BytesFilter) { + p.bytesFiltersMutex.Lock() + p.bytesFilters = append(p.bytesFilters, filter) + p.bytesFiltersMutex.Unlock() + p.byteFiltersModified.Set() +} + +// AddMessageFilter adds a new message filter to the parser. +func (p *MessageParser) AddMessageFilter(filter MessageFilter) { + p.messageFiltersMutex.Lock() + p.messageFilters = append(p.messageFilters, filter) + p.messageFiltersMutex.Unlock() + p.messageFiltersModified.Set() +} + +// sets up the byte filter data flow chain. +func (p *MessageParser) setupBytesFilterDataFlow() { + if !p.byteFiltersModified.IsSet() { + return + } + + p.bytesFiltersMutex.Lock() + if p.byteFiltersModified.IsSet() { + p.byteFiltersModified.SetTo(false) + + numberOfBytesFilters := len(p.bytesFilters) + for i := 0; i < numberOfBytesFilters; i++ { + if i == numberOfBytesFilters-1 { + p.bytesFilters[i].OnAccept(p.parseMessage) + } else { + p.bytesFilters[i].OnAccept(p.bytesFilters[i+1].Filter) + } + p.bytesFilters[i].OnReject(func(bytes []byte, err error, peer *peer.Peer) { + p.Events.BytesRejected.Trigger(&BytesRejectedEvent{ + Bytes: bytes, + Peer: peer}, err) + }) + } + } + p.bytesFiltersMutex.Unlock() +} + +// sets up the message filter data flow chain. +func (p *MessageParser) setupMessageFilterDataFlow() { + if !p.messageFiltersModified.IsSet() { + return + } + + p.messageFiltersMutex.Lock() + if p.messageFiltersModified.IsSet() { + p.messageFiltersModified.SetTo(false) + + numberOfMessageFilters := len(p.messageFilters) + for i := 0; i < numberOfMessageFilters; i++ { + if i == numberOfMessageFilters-1 { + p.messageFilters[i].OnAccept(func(msg *Message, peer *peer.Peer) { + p.Events.MessageParsed.Trigger(&MessageParsedEvent{ + Message: msg, + Peer: peer}) + }) + } else { + p.messageFilters[i].OnAccept(p.messageFilters[i+1].Filter) + } + p.messageFilters[i].OnReject(func(msg *Message, err error, peer *peer.Peer) { + p.Events.MessageRejected.Trigger(&MessageRejectedEvent{ + Message: msg, + Peer: peer}, err) + }) + } + } + p.messageFiltersMutex.Unlock() +} + +// parses the given message and emits +func (p *MessageParser) parseMessage(bytes []byte, peer *peer.Peer) { + if parsedMessage, _, err := MessageFromBytes(bytes); err != nil { + p.Events.BytesRejected.Trigger(&BytesRejectedEvent{ + Bytes: bytes, + Peer: peer}, err) + } else { + p.messageFilters[0].Filter(parsedMessage, peer) + } +} diff --git a/packages/tangle/messageparser_test.go b/packages/tangle/messageparser_test.go new file mode 100644 index 0000000000000000000000000000000000000000..8c52c2344a56e6a0e32ab8b3773c7e8c3d76814e --- /dev/null +++ b/packages/tangle/messageparser_test.go @@ -0,0 +1,46 @@ +package tangle + +import ( + "strconv" + "testing" + + "github.com/iotaledger/hive.go/events" + "github.com/labstack/gommon/log" +) + +func BenchmarkMessageParser_ParseBytesSame(b *testing.B) { + msgBytes := newTestDataMessage("Test").Bytes() + msgParser := NewMessageParser() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + msgParser.Parse(msgBytes, nil) + } +} + +func BenchmarkMessageParser_ParseBytesDifferent(b *testing.B) { + messageBytes := make([][]byte, b.N) + for i := 0; i < b.N; i++ { + messageBytes[i] = newTestDataMessage("Test" + strconv.Itoa(i)).Bytes() + } + + msgParser := NewMessageParser() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + msgParser.Parse(messageBytes[i], nil) + } +} + +func TestMessageParser_ParseMessage(t *testing.T) { + msg := newTestDataMessage("Test") + + msgParser := NewMessageParser() + msgParser.Parse(msg.Bytes(), nil) + + msgParser.Events.MessageParsed.Attach(events.NewClosure(func(msgParsedEvent *MessageParsedEvent) { + log.Infof("parsed message") + })) +} diff --git a/packages/tangle/messagerequester.go b/packages/tangle/messagerequester.go new file mode 100644 index 0000000000000000000000000000000000000000..0e0299ffc94f496a20c6a2d56e30b79b0b37cd6e --- /dev/null +++ b/packages/tangle/messagerequester.go @@ -0,0 +1,133 @@ +package tangle + +import ( + "sync" + "time" +) + +const ( + // DefaultRetryInterval defines the Default Retry Interval of the message requester. + DefaultRetryInterval = 10 * time.Second + // the maximum amount of requests before we abort + maxRequestThreshold = 500 +) + +// Options holds options for a message requester. +type Options struct { + retryInterval time.Duration +} + +func newOptions(optionalOptions []Option) *Options { + result := &Options{ + retryInterval: 10 * time.Second, + } + + for _, optionalOption := range optionalOptions { + optionalOption(result) + } + + return result +} + +// Option is a function which inits an option. +type Option func(*Options) + +// RetryInterval creates an option which sets the retry interval to the given value. +func RetryInterval(interval time.Duration) Option { + return func(args *Options) { + args.retryInterval = interval + } +} + +// MessageRequester takes care of requesting messages. +type MessageRequester struct { + scheduledRequests map[MessageID]*time.Timer + options *Options + Events *MessageRequesterEvents + + scheduledRequestsMutex sync.RWMutex +} + +// MessageExistsFunc is a function that tells if a message exists. +type MessageExistsFunc func(messageId MessageID) bool + +// NewMessageRequester creates a new message requester. +func NewMessageRequester(missingMessages []MessageID, optionalOptions ...Option) *MessageRequester { + requester := &MessageRequester{ + scheduledRequests: make(map[MessageID]*time.Timer), + options: newOptions(optionalOptions), + Events: newMessageRequesterEvents(), + } + + // add requests for all missing messages + requester.scheduledRequestsMutex.Lock() + defer requester.scheduledRequestsMutex.Unlock() + + for _, id := range missingMessages { + requester.scheduledRequests[id] = time.AfterFunc(requester.options.retryInterval, requester.createReRequest(id, 0)) + } + + return requester +} + +// StartRequest initiates a regular triggering of the StartRequest event until it has been stopped using StopRequest. +func (r *MessageRequester) StartRequest(id MessageID) { + r.scheduledRequestsMutex.Lock() + + // ignore already scheduled requests + if _, exists := r.scheduledRequests[id]; exists { + r.scheduledRequestsMutex.Unlock() + return + } + + // schedule the next request and trigger the event + r.scheduledRequests[id] = time.AfterFunc(r.options.retryInterval, r.createReRequest(id, 0)) + r.scheduledRequestsMutex.Unlock() + r.Events.SendRequest.Trigger(&SendRequestEvent{ID: id}) +} + +// StopRequest stops requests for the given message to further happen. +func (r *MessageRequester) StopRequest(id MessageID) { + r.scheduledRequestsMutex.Lock() + defer r.scheduledRequestsMutex.Unlock() + + if timer, ok := r.scheduledRequests[id]; ok { + timer.Stop() + delete(r.scheduledRequests, id) + } +} + +func (r *MessageRequester) reRequest(id MessageID, count int) { + r.Events.SendRequest.Trigger(&SendRequestEvent{ID: id}) + + // as we schedule a request at most once per id we do not need to make the trigger and the re-schedule atomic + r.scheduledRequestsMutex.Lock() + defer r.scheduledRequestsMutex.Unlock() + + // reschedule, if the request has not been stopped in the meantime + if _, exists := r.scheduledRequests[id]; exists { + // increase the request counter + count++ + + // if we have requested too often => stop the requests + if count > maxRequestThreshold { + delete(r.scheduledRequests, id) + + return + } + + r.scheduledRequests[id] = time.AfterFunc(r.options.retryInterval, r.createReRequest(id, count)) + return + } +} + +// RequestQueueSize returns the number of scheduled message requests. +func (r *MessageRequester) RequestQueueSize() int { + r.scheduledRequestsMutex.RLock() + defer r.scheduledRequestsMutex.RUnlock() + return len(r.scheduledRequests) +} + +func (r *MessageRequester) createReRequest(msgID MessageID, count int) func() { + return func() { r.reRequest(msgID, count) } +} diff --git a/packages/tangle/missingmessage.go b/packages/tangle/missingmessage.go new file mode 100644 index 0000000000000000000000000000000000000000..23245b6d3de497c921a9e644d25e5ad99a7eb434 --- /dev/null +++ b/packages/tangle/missingmessage.go @@ -0,0 +1,99 @@ +package tangle + +import ( + "fmt" + "time" + + "github.com/iotaledger/hive.go/byteutils" + "github.com/iotaledger/hive.go/marshalutil" + "github.com/iotaledger/hive.go/objectstorage" +) + +// MissingMessage represents a missing message. +type MissingMessage struct { + objectstorage.StorableObjectFlags + + messageID MessageID + missingSince time.Time +} + +// NewMissingMessage creates new missing message with the specified messageID. +func NewMissingMessage(messageID MessageID) *MissingMessage { + return &MissingMessage{ + messageID: messageID, + missingSince: time.Now(), + } +} + +// MissingMessageFromBytes parses the given bytes into a MissingMessage. +func MissingMessageFromBytes(bytes []byte) (result *MissingMessage, consumedBytes int, err error) { + marshalUtil := marshalutil.New(bytes) + result, err = MissingMessageFromMarshalUtil(marshalUtil) + consumedBytes = marshalUtil.ReadOffset() + + return +} + +// MissingMessageFromMarshalUtil parses a MissingMessage from the given MarshalUtil. +func MissingMessageFromMarshalUtil(marshalUtil *marshalutil.MarshalUtil) (result *MissingMessage, err error) { + result = &MissingMessage{} + + if result.messageID, err = MessageIDFromMarshalUtil(marshalUtil); err != nil { + err = fmt.Errorf("failed to parse message ID of missing message: %w", err) + return + } + if result.missingSince, err = marshalUtil.ReadTime(); err != nil { + err = fmt.Errorf("failed to parse missingSince of missing message: %w", err) + return + } + + return +} + +// MissingMessageFromObjectStorage restores a MissingMessage from the ObjectStorage. +func MissingMessageFromObjectStorage(key []byte, data []byte) (result objectstorage.StorableObject, err error) { + result, _, err = MissingMessageFromBytes(byteutils.ConcatBytes(key, data)) + if err != nil { + err = fmt.Errorf("failed to parse missing message from object storage: %w", err) + return + } + + return +} + +// MessageID returns the id of the message. +func (m *MissingMessage) MessageID() MessageID { + return m.messageID +} + +// MissingSince returns the time since when this message is missing. +func (m *MissingMessage) MissingSince() time.Time { + return m.missingSince +} + +// Bytes returns a marshaled version of this MissingMessage. +func (m *MissingMessage) Bytes() []byte { + return byteutils.ConcatBytes(m.ObjectStorageKey(), m.ObjectStorageValue()) +} + +// Update update the missing message. +// It should never happen and will panic if called. +func (m *MissingMessage) Update(other objectstorage.StorableObject) { + panic("missing messages should never be overwritten and only stored once to optimize IO") +} + +// ObjectStorageKey returns the key of the stored missing message. +// This returns the bytes of the messageID of the missing message. +func (m *MissingMessage) ObjectStorageKey() []byte { + return m.messageID[:] +} + +// ObjectStorageValue returns the value of the stored missing message. +func (m *MissingMessage) ObjectStorageValue() (result []byte) { + result, err := m.missingSince.MarshalBinary() + if err != nil { + panic(err) + } + + return +} diff --git a/packages/tangle/payload.go b/packages/tangle/payload.go new file mode 100644 index 0000000000000000000000000000000000000000..beecf71a5ac94dc529cee6b0ed9513184feac2a1 --- /dev/null +++ b/packages/tangle/payload.go @@ -0,0 +1,259 @@ +package tangle + +import ( + "fmt" + "sync" + + "github.com/iotaledger/hive.go/marshalutil" + "github.com/iotaledger/hive.go/stringify" + "github.com/mr-tron/base58" +) + +const ( + // MaxPayloadSize defines the maximum size of a payload. + // parent1ID + parent2ID + issuerPublicKey + issuingTime + sequenceNumber + nonce + signature + MaxPayloadSize = MaxMessageSize - 64 - 64 - 32 - 8 - 8 - 8 - 64 + + // PayloadIDLength is the length of a data payload id. + PayloadIDLength = 64 + + // ObjectName defines the name of the data object. + ObjectName = "data" +) + +var ( + // ErrMaxPayloadSizeExceeded is returned if the maximum payload size is exceeded. + ErrMaxPayloadSizeExceeded = fmt.Errorf("maximum payload size of %d bytes exceeded", MaxPayloadSize) + + typeRegister = make(map[PayloadType]Definition) + typeRegisterMutex sync.RWMutex + genericUnmarshaler Unmarshaler +) + +// PayloadID represents the id of a data payload. +type PayloadID [PayloadIDLength]byte + +// Bytes returns the id as a byte slice backed by the original array, +// therefore it should not be modified. +func (id PayloadID) Bytes() []byte { + return id[:] +} + +func (id PayloadID) String() string { + return base58.Encode(id[:]) +} + +// PayloadType represents the type id of a payload. +type PayloadType = uint32 + +// Unmarshaler takes some data and unmarshals it into a payload. +type Unmarshaler func(data []byte) (Payload, error) + +// Definition defines the properties of a payload type. +type Definition struct { + Name string + Unmarshaler +} + +func init() { + // register the generic unmarshaler + SetGenericUnmarshaler(GenericPayloadUnmarshaler) +} + +// RegisterPayloadType registers a payload type with the given unmarshaler. +func RegisterPayloadType(payloadType PayloadType, payloadName string, unmarshaler Unmarshaler) { + typeRegisterMutex.Lock() + typeRegister[payloadType] = Definition{ + Name: payloadName, + Unmarshaler: unmarshaler, + } + typeRegisterMutex.Unlock() +} + +// GetUnmarshaler returns the unmarshaler for the given type if known or +// the generic unmarshaler if the given payload type has no associated unmarshaler. +func GetUnmarshaler(payloadType PayloadType) Unmarshaler { + typeRegisterMutex.RLock() + defer typeRegisterMutex.RUnlock() + + if definition, exists := typeRegister[payloadType]; exists { + return definition.Unmarshaler + } + + return genericUnmarshaler +} + +// SetGenericUnmarshaler sets the generic unmarshaler. +func SetGenericUnmarshaler(unmarshaler Unmarshaler) { + genericUnmarshaler = unmarshaler +} + +// Name returns the name of a given payload type. +func Name(payloadType PayloadType) string { + typeRegisterMutex.RLock() + defer typeRegisterMutex.RUnlock() + if definition, exists := typeRegister[payloadType]; exists { + return definition.Name + } + return ObjectName +} + +// Payload represents some kind of payload of data which only gains meaning by having +// corresponding node logic processing payloads of a given type. +type Payload interface { + // PayloadType returns the type of the payload. + Type() PayloadType + // Bytes returns the payload bytes. + Bytes() []byte + // String returns a human-friendly representation of the payload. + String() string +} + +// PayloadFromBytes unmarshals bytes into a payload. +func PayloadFromBytes(bytes []byte) (result Payload, consumedBytes int, err error) { + // initialize helper + marshalUtil := marshalutil.New(bytes) + + payloadSize, err := marshalUtil.ReadUint32() + if err != nil { + err = fmt.Errorf("failed to unmarshal payload size from bytes: %w", err) + return + } + + if payloadSize > MaxPayloadSize { + err = fmt.Errorf("%w: %d", ErrMaxPayloadSizeExceeded, payloadSize) + return + } + + // calculate result + payloadType, err := marshalUtil.ReadUint32() + if err != nil { + err = fmt.Errorf("failed to unmarshal payload type from bytes: %w", err) + return + } + + marshalUtil.ReadSeek(marshalUtil.ReadOffset() - marshalutil.UINT32_SIZE*2) + payloadBytes, err := marshalUtil.ReadBytes(int(payloadSize) + 8) + if err != nil { + err = fmt.Errorf("failed to unmarshal payload bytes from bytes: %w", err) + return + } + + readOffset := marshalUtil.ReadOffset() + result, err = GetUnmarshaler(payloadType)(payloadBytes) + if err != nil { + // fallback to the generic unmarshaler if registered type fails to unmarshal + marshalUtil.ReadSeek(readOffset) + result, err = GenericPayloadUnmarshaler(payloadBytes) + if err != nil { + err = fmt.Errorf("failed to unmarshal payload from bytes with generic unmarshaler: %w", err) + return + } + } + + // return the number of bytes we processed + consumedBytes = marshalUtil.ReadOffset() + return +} + +// PayloadFromMarshalUtil parses a payload by using the given marshal util. +func PayloadFromMarshalUtil(marshalUtil *marshalutil.MarshalUtil) (Payload, error) { + payload, err := marshalUtil.Parse(func(data []byte) (interface{}, int, error) { return PayloadFromBytes(data) }) + if err != nil { + err = fmt.Errorf("failed to parse payload: %w", err) + return nil, err + } + return payload.(Payload), nil +} + +// DataType is the message type of a data payload. +var DataType = PayloadType(0) + +func init() { + // register the generic data payload type + RegisterPayloadType(DataType, ObjectName, GenericPayloadUnmarshaler) +} + +// DataPayload represents a payload which just contains a blob of data. +type DataPayload struct { + payloadType PayloadType + data []byte +} + +// NewDataPayload creates new data payload. +func NewDataPayload(data []byte) *DataPayload { + return &DataPayload{ + payloadType: DataType, + data: data, + } +} + +// DataPayloadFromBytes creates a new data payload from the given bytes. +func DataPayloadFromBytes(bytes []byte) (result *DataPayload, consumedBytes int, err error) { + marshalUtil := marshalutil.New(bytes) + result, err = DataPayloadFromMarshalUtil(marshalUtil) + consumedBytes = marshalUtil.ReadOffset() + + return +} + +// DataPayloadFromMarshalUtil parses a new data payload out of the given marshal util. +func DataPayloadFromMarshalUtil(marshalUtil *marshalutil.MarshalUtil) (result *DataPayload, err error) { + // parse information + result = &DataPayload{} + payloadBytes, err := marshalUtil.ReadUint32() + if err != nil { + err = fmt.Errorf("failed to parse data payload size: %w", err) + return + } + result.payloadType, err = marshalUtil.ReadUint32() + if err != nil { + err = fmt.Errorf("failed to parse data payload type: %w", err) + return + } + result.data, err = marshalUtil.ReadBytes(int(payloadBytes)) + if err != nil { + err = fmt.Errorf("failed to parse data payload contest: %w", err) + return + } + + return +} + +// Type returns the payload type. +func (d *DataPayload) Type() PayloadType { + return d.payloadType +} + +// Data returns the data of the data payload. +func (d *DataPayload) Data() []byte { + return d.data +} + +// Bytes marshals the data payload into a sequence of bytes. +func (d *DataPayload) Bytes() []byte { + // initialize helper + marshalUtil := marshalutil.New() + + // marshal the payload specific information + marshalUtil.WriteUint32(uint32(len(d.data))) + marshalUtil.WriteUint32(d.Type()) + marshalUtil.WriteBytes(d.data[:]) + + // return result + return marshalUtil.Bytes() +} + +func (d *DataPayload) String() string { + return stringify.Struct("Data", + stringify.StructField("type", int(d.Type())), + stringify.StructField("data", string(d.Data())), + ) +} + +// GenericPayloadUnmarshaler is an unmarshaler for the generic data payload type. +func GenericPayloadUnmarshaler(data []byte) (payload Payload, err error) { + payload, _, err = DataPayloadFromBytes(data) + + return +} diff --git a/packages/tangle/payload_test.go b/packages/tangle/payload_test.go new file mode 100644 index 0000000000000000000000000000000000000000..67c5fa5598380572da183acdae8721eb283135b4 --- /dev/null +++ b/packages/tangle/payload_test.go @@ -0,0 +1,75 @@ +package tangle + +import ( + "runtime" + "sync" + "testing" + + "github.com/iotaledger/goshimmer/plugins/messagelayer" + "github.com/iotaledger/hive.go/async" + "github.com/iotaledger/hive.go/identity" + "github.com/iotaledger/hive.go/kvstore/mapdb" + "github.com/stretchr/testify/require" + + "github.com/panjf2000/ants/v2" +) + +func BenchmarkVerifyDataMessages(b *testing.B) { + var pool async.WorkerPool + pool.Tune(runtime.GOMAXPROCS(0)) + + factory := NewMessageFactory(mapdb.NewMapDB(), []byte(messagelayer.DBSequenceNumber), identity.GenerateLocalIdentity(), TipSelectorFunc(func() (MessageID, MessageID) { return EmptyMessageID, EmptyMessageID })) + + messages := make([][]byte, b.N) + for i := 0; i < b.N; i++ { + msg, err := factory.IssuePayload(NewDataPayload([]byte("some data"))) + require.NoError(b, err) + messages[i] = msg.Bytes() + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + currentIndex := i + pool.Submit(func() { + if msg, _, err := MessageFromBytes(messages[currentIndex]); err != nil { + b.Error(err) + } else { + msg.VerifySignature() + } + }) + } + + pool.Shutdown() +} + +func BenchmarkVerifySignature(b *testing.B) { + pool, _ := ants.NewPool(80, ants.WithNonblocking(false)) + + factory := NewMessageFactory(mapdb.NewMapDB(), []byte(messagelayer.DBSequenceNumber), identity.GenerateLocalIdentity(), TipSelectorFunc(func() (MessageID, MessageID) { return EmptyMessageID, EmptyMessageID })) + + messages := make([]*Message, b.N) + for i := 0; i < b.N; i++ { + msg, err := factory.IssuePayload(NewDataPayload([]byte("some data"))) + require.NoError(b, err) + messages[i] = msg + messages[i].Bytes() + } + b.ResetTimer() + + var wg sync.WaitGroup + for i := 0; i < b.N; i++ { + wg.Add(1) + + currentIndex := i + if err := pool.Submit(func() { + messages[currentIndex].VerifySignature() + wg.Done() + }); err != nil { + b.Error(err) + return + } + } + + wg.Wait() +} diff --git a/packages/tangle/tangle.go b/packages/tangle/tangle.go new file mode 100644 index 0000000000000000000000000000000000000000..3d8d33296fdaceadf949b9f282ce2cf4f6759b09 --- /dev/null +++ b/packages/tangle/tangle.go @@ -0,0 +1,385 @@ +package tangle + +import ( + "container/list" + "fmt" + "time" + + "github.com/iotaledger/goshimmer/packages/binary/storageprefix" + "github.com/iotaledger/hive.go/async" + "github.com/iotaledger/hive.go/kvstore" + "github.com/iotaledger/hive.go/objectstorage" + "github.com/iotaledger/hive.go/types" +) + +const ( + // PrefixMessage defines the storage prefix for message. + PrefixMessage byte = iota + // PrefixMessageMetadata defines the storage prefix for message metadata. + PrefixMessageMetadata + // PrefixApprovers defines the storage prefix for approvers. + PrefixApprovers + // PrefixMissingMessage defines the storage prefix for missing message. + PrefixMissingMessage + + cacheTime = 20 * time.Second +) + +// Tangle represents the base layer of messages. +type Tangle struct { + messageStorage *objectstorage.ObjectStorage + messageMetadataStorage *objectstorage.ObjectStorage + approverStorage *objectstorage.ObjectStorage + missingMessageStorage *objectstorage.ObjectStorage + + Events *Events + + storeMessageWorkerPool async.WorkerPool + solidifierWorkerPool async.WorkerPool + shutdown chan struct{} +} + +// New creates a new Tangle. +func New(store kvstore.KVStore) (result *Tangle) { + osFactory := objectstorage.NewFactory(store, storageprefix.MessageLayer) + + result = &Tangle{ + shutdown: make(chan struct{}), + messageStorage: osFactory.New(PrefixMessage, MessageFromObjectStorage, objectstorage.CacheTime(cacheTime), objectstorage.LeakDetectionEnabled(false)), + messageMetadataStorage: osFactory.New(PrefixMessageMetadata, MessageMetadataFromObjectStorage, objectstorage.CacheTime(cacheTime), objectstorage.LeakDetectionEnabled(false)), + approverStorage: osFactory.New(PrefixApprovers, ApproverFromObjectStorage, objectstorage.CacheTime(cacheTime), objectstorage.PartitionKey(MessageIDLength, MessageIDLength), objectstorage.LeakDetectionEnabled(false)), + missingMessageStorage: osFactory.New(PrefixMissingMessage, MissingMessageFromObjectStorage, objectstorage.CacheTime(cacheTime), objectstorage.LeakDetectionEnabled(false)), + + Events: newEvents(), + } + + result.solidifierWorkerPool.Tune(1024) + result.storeMessageWorkerPool.Tune(1024) + return +} + +// AttachMessage attaches a new message to the tangle. +func (t *Tangle) AttachMessage(msg *Message) { + t.storeMessageWorkerPool.Submit(func() { t.storeMessageWorker(msg) }) +} + +// Message retrieves a message from the tangle. +func (t *Tangle) Message(messageID MessageID) *CachedMessage { + return &CachedMessage{CachedObject: t.messageStorage.Load(messageID[:])} +} + +// MessageMetadata retrieves the metadata of a message from the tangle. +func (t *Tangle) MessageMetadata(messageID MessageID) *CachedMessageMetadata { + return &CachedMessageMetadata{CachedObject: t.messageMetadataStorage.Load(messageID[:])} +} + +// Approvers retrieves the approvers of a message from the tangle. +func (t *Tangle) Approvers(messageID MessageID) CachedApprovers { + approvers := make(CachedApprovers, 0) + t.approverStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool { + approvers = append(approvers, &CachedApprover{CachedObject: cachedObject}) + return true + }, messageID[:]) + return approvers +} + +// DeleteMessage deletes a message and its association to approvees by un-marking the given +// message as an approver. +func (t *Tangle) DeleteMessage(messageID MessageID) { + t.Message(messageID).Consume(func(currentMsg *Message) { + parent1MsgID := currentMsg.Parent1ID() + t.deleteApprover(parent1MsgID, messageID) + + parent2MsgID := currentMsg.Parent2ID() + if parent2MsgID != parent1MsgID { + t.deleteApprover(parent2MsgID, messageID) + } + + t.messageMetadataStorage.Delete(messageID[:]) + t.messageStorage.Delete(messageID[:]) + + t.Events.MessageRemoved.Trigger(messageID) + }) +} + +// DeleteMissingMessage deletes a message from the missingMessageStorage. +func (t *Tangle) DeleteMissingMessage(messageID MessageID) { + t.missingMessageStorage.Delete(messageID[:]) +} + +// Shutdown marks the tangle as stopped, so it will not accept any new messages (waits for all backgroundTasks to finish). +func (t *Tangle) Shutdown() *Tangle { + t.storeMessageWorkerPool.ShutdownGracefully() + t.solidifierWorkerPool.ShutdownGracefully() + + t.messageStorage.Shutdown() + t.messageMetadataStorage.Shutdown() + t.approverStorage.Shutdown() + t.missingMessageStorage.Shutdown() + close(t.shutdown) + + return t +} + +// Prune resets the database and deletes all objects (good for testing or "node resets"). +func (t *Tangle) Prune() error { + for _, storage := range []*objectstorage.ObjectStorage{ + t.messageStorage, + t.messageMetadataStorage, + t.approverStorage, + t.missingMessageStorage, + } { + if err := storage.Prune(); err != nil { + err = fmt.Errorf("failed to prune storage: %w", err) + return err + } + } + + return nil +} + +// DBStats returns the number of solid messages and total number of messages in the database (messageMetadataStorage, +// that should contain the messages as messageStorage), the number of messages in missingMessageStorage, furthermore +// the average time it takes to solidify messages. +func (t *Tangle) DBStats() (solidCount int, messageCount int, avgSolidificationTime float64, missingMessageCount int) { + var sumSolidificationTime time.Duration + t.messageMetadataStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool { + cachedObject.Consume(func(object objectstorage.StorableObject) { + msgMetaData := object.(*MessageMetadata) + messageCount++ + received := msgMetaData.ReceivedTime() + if msgMetaData.IsSolid() { + solidCount++ + sumSolidificationTime += msgMetaData.solidificationTime.Sub(received) + } + }) + return true + }) + if solidCount > 0 { + avgSolidificationTime = float64(sumSolidificationTime.Milliseconds()) / float64(solidCount) + } + t.missingMessageStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool { + cachedObject.Consume(func(object objectstorage.StorableObject) { + missingMessageCount++ + }) + return true + }) + return +} + +// MissingMessages return the ids of messages in missingMessageStorage +func (t *Tangle) MissingMessages() (ids []MessageID) { + t.missingMessageStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool { + cachedObject.Consume(func(object objectstorage.StorableObject) { + ids = append(ids, object.(*MissingMessage).messageID) + }) + + return true + }) + return +} + +// worker that stores the message and calls the corresponding storage events. +func (t *Tangle) storeMessageWorker(msg *Message) { + // store message + var cachedMessage *CachedMessage + _tmp, msgIsNew := t.messageStorage.StoreIfAbsent(msg) + if !msgIsNew { + return + } + cachedMessage = &CachedMessage{CachedObject: _tmp} + + // store message metadata + messageID := msg.ID() + cachedMsgMetadata := &CachedMessageMetadata{CachedObject: t.messageMetadataStorage.Store(NewMessageMetadata(messageID))} + + // store parent1 approver + parent1MsgID := msg.Parent1ID() + t.approverStorage.Store(NewApprover(parent1MsgID, messageID)).Release() + + // store parent2 approver + if parent2MsgID := msg.Parent2ID(); parent2MsgID != parent1MsgID { + t.approverStorage.Store(NewApprover(parent2MsgID, messageID)).Release() + } + + // trigger events + if t.missingMessageStorage.DeleteIfPresent(messageID[:]) { + t.Events.MissingMessageReceived.Trigger(&CachedMessageEvent{ + Message: cachedMessage, + MessageMetadata: cachedMsgMetadata}) + } + + t.Events.MessageAttached.Trigger(&CachedMessageEvent{ + Message: cachedMessage, + MessageMetadata: cachedMsgMetadata}) + + // check message solidity + t.solidifierWorkerPool.Submit(func() { + t.checkMessageSolidityAndPropagate(cachedMessage, cachedMsgMetadata) + }) +} + +// checks whether the given message is solid and marks it as missing if it isn't known. +func (t *Tangle) isMessageMarkedAsSolid(messageID MessageID) bool { + // return true if the message is the Genesis + if messageID == EmptyMessageID { + return true + } + + // retrieve the CachedMessageMetadata and mark it as missing if it doesn't exist + msgMetadataCached := &CachedMessageMetadata{t.messageMetadataStorage.ComputeIfAbsent(messageID.Bytes(), func(key []byte) objectstorage.StorableObject { + // store the missing message and trigger events + if cachedMissingMessage, stored := t.missingMessageStorage.StoreIfAbsent(NewMissingMessage(messageID)); stored { + cachedMissingMessage.Consume(func(object objectstorage.StorableObject) { + t.Events.MessageMissing.Trigger(messageID) + }) + } + + // do not initialize the metadata here, we execute this in ComputeIfAbsent to be secure from race conditions + return nil + })} + defer msgMetadataCached.Release() + + // return false if the metadata does not exist + msgMetadata := msgMetadataCached.Unwrap() + if msgMetadata == nil { + return false + } + + // return the solid flag of the metadata object + return msgMetadata.IsSolid() +} + +// checks whether the given message is solid by examining whether its parent1 and +// parent2 messages are solid. +func (t *Tangle) isMessageSolid(msg *Message, msgMetadata *MessageMetadata) bool { + if msg == nil || msg.IsDeleted() { + return false + } + + if msgMetadata == nil || msgMetadata.IsDeleted() { + return false + } + + if msgMetadata.IsSolid() { + return true + } + + // as missing messages are requested in isMessageMarkedAsSolid, we want to prevent short-circuit evaluation + parent1Solid := t.isMessageMarkedAsSolid(msg.Parent1ID()) + parent2Solid := t.isMessageMarkedAsSolid(msg.Parent2ID()) + return parent1Solid && parent2Solid +} + +// builds up a stack from the given message and tries to solidify into the present. +// missing messages which are needed for a message to become solid are marked as missing. +func (t *Tangle) checkMessageSolidityAndPropagate(cachedMessage *CachedMessage, cachedMsgMetadata *CachedMessageMetadata) { + + popElementsFromStack := func(stack *list.List) (*CachedMessage, *CachedMessageMetadata) { + currentSolidificationEntry := stack.Front() + currentCachedMsg := currentSolidificationEntry.Value.([2]interface{})[0] + currentCachedMsgMetadata := currentSolidificationEntry.Value.([2]interface{})[1] + stack.Remove(currentSolidificationEntry) + return currentCachedMsg.(*CachedMessage), currentCachedMsgMetadata.(*CachedMessageMetadata) + } + + // initialize the stack + solidificationStack := list.New() + solidificationStack.PushBack([2]interface{}{cachedMessage, cachedMsgMetadata}) + + // processed messages that are supposed to be checked for solidity recursively + for solidificationStack.Len() > 0 { + currentCachedMessage, currentCachedMsgMetadata := popElementsFromStack(solidificationStack) + + currentMessage := currentCachedMessage.Unwrap() + currentMsgMetadata := currentCachedMsgMetadata.Unwrap() + if currentMessage == nil || currentMsgMetadata == nil { + currentCachedMessage.Release() + currentCachedMsgMetadata.Release() + continue + } + + // mark the message as solid if it has become solid + if t.isMessageSolid(currentMessage, currentMsgMetadata) && currentMsgMetadata.SetSolid(true) { + t.Events.MessageSolid.Trigger(&CachedMessageEvent{ + Message: currentCachedMessage, + MessageMetadata: currentCachedMsgMetadata}) + + // auto. push approvers of the newly solid message to propagate solidification + t.Approvers(currentMessage.ID()).Consume(func(approver *Approver) { + approverMessageID := approver.ApproverMessageID() + solidificationStack.PushBack([2]interface{}{ + t.Message(approverMessageID), + t.MessageMetadata(approverMessageID), + }) + }) + } + + currentCachedMessage.Release() + currentCachedMsgMetadata.Release() + } +} + +// deletes the given approver association for the given approvee to its approver. +func (t *Tangle) deleteApprover(approvedMessageID MessageID, approvingMessage MessageID) { + idToDelete := make([]byte, MessageIDLength+MessageIDLength) + copy(idToDelete[:MessageIDLength], approvedMessageID[:]) + copy(idToDelete[MessageIDLength:], approvingMessage[:]) + t.approverStorage.Delete(idToDelete) +} + +// deletes a message and its future cone of messages/approvers. +// nolint +func (t *Tangle) deleteFutureCone(messageID MessageID) { + cleanupStack := list.New() + cleanupStack.PushBack(messageID) + + processedMessages := make(map[MessageID]types.Empty) + processedMessages[messageID] = types.Void + + for cleanupStack.Len() >= 1 { + currentStackEntry := cleanupStack.Front() + currentMessageID := currentStackEntry.Value.(MessageID) + cleanupStack.Remove(currentStackEntry) + + t.DeleteMessage(currentMessageID) + + t.Approvers(currentMessageID).Consume(func(approver *Approver) { + approverID := approver.ApproverMessageID() + if _, messageProcessed := processedMessages[approverID]; !messageProcessed { + cleanupStack.PushBack(approverID) + processedMessages[approverID] = types.Void + } + }) + } +} + +// SolidifierWorkerPoolStatus returns the name and the load of the workerpool. +func (t *Tangle) SolidifierWorkerPoolStatus() (name string, load int) { + return "Solidifier", t.solidifierWorkerPool.RunningWorkers() +} + +// StoreMessageWorkerPoolStatus returns the name and the load of the workerpool. +func (t *Tangle) StoreMessageWorkerPoolStatus() (name string, load int) { + return "StoreMessage", t.storeMessageWorkerPool.RunningWorkers() +} + +// RetrieveAllTips returns the tips (i.e., solid messages that are not part of the approvers list). +// It iterates over the messageMetadataStorage, thus only use this method if necessary. +// TODO: improve this function. +func (t *Tangle) RetrieveAllTips() (tips []MessageID) { + t.messageMetadataStorage.ForEach(func(key []byte, cachedMessage objectstorage.CachedObject) bool { + cachedMessage.Consume(func(object objectstorage.StorableObject) { + messageMetadata := object.(*MessageMetadata) + if messageMetadata != nil && messageMetadata.IsSolid() { + cachedApprovers := t.Approvers(messageMetadata.messageID) + if len(cachedApprovers) == 0 { + tips = append(tips, messageMetadata.messageID) + } + cachedApprovers.Consume(func(approver *Approver) {}) + } + }) + return true + }) + return tips +} diff --git a/packages/tangle/tangle_test.go b/packages/tangle/tangle_test.go new file mode 100644 index 0000000000000000000000000000000000000000..2407814aa18083e5b1f7c006ba4c83ac46c7e107 --- /dev/null +++ b/packages/tangle/tangle_test.go @@ -0,0 +1,261 @@ +package tangle + +import ( + "fmt" + "math/rand" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/iotaledger/hive.go/datastructure/randommap" + "github.com/iotaledger/hive.go/events" + "github.com/iotaledger/hive.go/identity" + "github.com/iotaledger/hive.go/kvstore/mapdb" + "github.com/iotaledger/hive.go/testutil" + "github.com/magiconair/properties/assert" + "github.com/stretchr/testify/require" +) + +func BenchmarkTangle_AttachMessage(b *testing.B) { + tangle := New(mapdb.NewMapDB()) + if err := tangle.Prune(); err != nil { + b.Error(err) + + return + } + + messageBytes := make([]*Message, b.N) + for i := 0; i < b.N; i++ { + messageBytes[i] = newTestDataMessage("some data") + messageBytes[i].Bytes() + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + tangle.AttachMessage(messageBytes[i]) + } + + tangle.Shutdown() +} + +func TestTangle_AttachMessage(t *testing.T) { + messageTangle := New(mapdb.NewMapDB()) + if err := messageTangle.Prune(); err != nil { + t.Error(err) + + return + } + + messageTangle.Events.MessageAttached.Attach(events.NewClosure(func(cachedMsgEvent *CachedMessageEvent) { + cachedMsgEvent.MessageMetadata.Release() + + cachedMsgEvent.Message.Consume(func(msg *Message) { + fmt.Println("ATTACHED:", msg.ID()) + }) + })) + + messageTangle.Events.MessageSolid.Attach(events.NewClosure(func(cachedMsgEvent *CachedMessageEvent) { + cachedMsgEvent.MessageMetadata.Release() + + cachedMsgEvent.Message.Consume(func(msg *Message) { + fmt.Println("SOLID:", msg.ID()) + }) + })) + + messageTangle.Events.MessageUnsolidifiable.Attach(events.NewClosure(func(messageId MessageID) { + fmt.Println("UNSOLIDIFIABLE:", messageId) + })) + + messageTangle.Events.MessageMissing.Attach(events.NewClosure(func(messageId MessageID) { + fmt.Println("MISSING:", messageId) + })) + + messageTangle.Events.MessageRemoved.Attach(events.NewClosure(func(messageId MessageID) { + fmt.Println("REMOVED:", messageId) + })) + + newMessageOne := newTestDataMessage("some data") + newMessageTwo := newTestDataMessage("some other data") + + messageTangle.AttachMessage(newMessageTwo) + + time.Sleep(7 * time.Second) + + messageTangle.AttachMessage(newMessageOne) + + messageTangle.Shutdown() +} + +func TestTangle_MissingMessages(t *testing.T) { + // test parameters + messageCount := 200000 + widthOfTheTangle := 2500 + + // variables required for the test + missingMessagesMap := make(map[MessageID]bool) + var missingMessagesMapMutex sync.Mutex + wg := sync.WaitGroup{} + + // create badger store + badgerDB, err := testutil.BadgerDB(t) + require.NoError(t, err) + + // map to keep track of the tips + tips := randommap.New() + tips.Set(EmptyMessageID, EmptyMessageID) + + // setup the message factory + msgFactory := NewMessageFactory( + badgerDB, + []byte("sequenceKey"), + identity.GenerateLocalIdentity(), + TipSelectorFunc(func() (MessageID, MessageID) { + return tips.RandomEntry().(MessageID), tips.RandomEntry().(MessageID) + }), + ) + defer msgFactory.Shutdown() + + // create a helper function that creates the messages + createNewMessage := func() *Message { + // issue the payload + msg, err := msgFactory.IssuePayload(NewDataPayload([]byte("0"))) + require.NoError(t, err) + + // remove a tip if the width of the tangle is reached + if tips.Size() >= widthOfTheTangle { + if rand.Intn(1000) < 500 { + tips.Delete(msg.Parent2ID()) + } else { + tips.Delete(msg.Parent1ID()) + } + } + + // add current message as a tip + tips.Set(msg.ID(), msg.ID()) + + // return the constructed message + return msg + } + + // create the tangle + tangle := New(badgerDB) + if err := tangle.Prune(); err != nil { + t.Error(err) + + return + } + + // generate the messages we want to solidify + preGeneratedMessages := make(map[MessageID]*Message) + for i := 0; i < messageCount; i++ { + msg := createNewMessage() + + preGeneratedMessages[msg.ID()] = msg + } + + fmt.Println("PRE-GENERATING MESSAGES: DONE") + + var receivedTransactionsCounter int32 + tangle.Events.MessageAttached.Attach(events.NewClosure(func(cachedMsgEvent *CachedMessageEvent) { + defer cachedMsgEvent.Message.Release() + defer cachedMsgEvent.MessageMetadata.Release() + + newReceivedTransactionsCounterValue := atomic.AddInt32(&receivedTransactionsCounter, 1) + if newReceivedTransactionsCounterValue%1000 == 0 { + fmt.Println("RECEIVED MESSAGES: ", newReceivedTransactionsCounterValue) + go fmt.Println("MISSING MESSAGES:", len(tangle.MissingMessages())) + } + })) + + // increase the counter when a missing message was detected + tangle.Events.MessageMissing.Attach(events.NewClosure(func(messageId MessageID) { + // attach the message after it has been requested + go func() { + time.Sleep(50 * time.Millisecond) + + tangle.AttachMessage(preGeneratedMessages[messageId]) + }() + + missingMessagesMapMutex.Lock() + missingMessagesMap[messageId] = true + missingMessagesMapMutex.Unlock() + })) + + // decrease the counter when a missing message was received + tangle.Events.MissingMessageReceived.Attach(events.NewClosure(func(cachedMsgEvent *CachedMessageEvent) { + cachedMsgEvent.MessageMetadata.Release() + cachedMsgEvent.Message.Consume(func(msg *Message) { + missingMessagesMapMutex.Lock() + delete(missingMessagesMap, msg.ID()) + missingMessagesMapMutex.Unlock() + }) + })) + + // mark the WaitGroup as done if all messages are solid + solidMessageCounter := int32(0) + tangle.Events.MessageSolid.Attach(events.NewClosure(func(cachedMsgEvent *CachedMessageEvent) { + defer cachedMsgEvent.MessageMetadata.Release() + defer cachedMsgEvent.Message.Release() + + // print progress status message + newSolidCounterValue := atomic.AddInt32(&solidMessageCounter, 1) + if newSolidCounterValue%1000 == 0 { + fmt.Println("SOLID MESSAGES: ", newSolidCounterValue) + go fmt.Println("MISSING MESSAGES:", len(tangle.MissingMessages())) + } + + // mark WaitGroup as done when we are done solidifying everything + if newSolidCounterValue == int32(messageCount) { + fmt.Println("ALL MESSAGES SOLID") + + wg.Done() + } + })) + + // issue tips to start solidification + wg.Add(1) + tips.ForEach(func(key interface{}, value interface{}) { + tangle.AttachMessage(preGeneratedMessages[key.(MessageID)]) + }) + + // wait for all transactions to become solid + wg.Wait() + + // make sure that all MessageMissing events also had a corresponding MissingMessageReceived event + assert.Equal(t, len(missingMessagesMap), 0) + assert.Equal(t, len(tangle.MissingMessages()), 0) + + // shutdown the tangle + tangle.Shutdown() +} + +func TestRetrieveAllTips(t *testing.T) { + messageTangle := New(mapdb.NewMapDB()) + + messageA := newTestParentsDataMessage("A", EmptyMessageID, EmptyMessageID) + messageB := newTestParentsDataMessage("B", messageA.ID(), EmptyMessageID) + messageC := newTestParentsDataMessage("C", messageA.ID(), EmptyMessageID) + + var wg sync.WaitGroup + + messageTangle.Events.MessageSolid.Attach(events.NewClosure(func(cachedMsgEvent *CachedMessageEvent) { + cachedMsgEvent.Message.Release() + cachedMsgEvent.MessageMetadata.Release() + wg.Done() + })) + + wg.Add(3) + messageTangle.AttachMessage(messageA) + messageTangle.AttachMessage(messageB) + messageTangle.AttachMessage(messageC) + + wg.Wait() + + allTips := messageTangle.RetrieveAllTips() + + assert.Equal(t, 2, len(allTips)) + + messageTangle.Shutdown() +} diff --git a/packages/tangle/test_utils.go b/packages/tangle/test_utils.go new file mode 100644 index 0000000000000000000000000000000000000000..df737ae8873adbdc1a7bb10fbac4342e99a14e90 --- /dev/null +++ b/packages/tangle/test_utils.go @@ -0,0 +1,19 @@ +package tangle + +import ( + "time" + + "github.com/iotaledger/hive.go/crypto/ed25519" +) + +func newTestNonceMessage(nonce uint64) *Message { + return NewMessage(EmptyMessageID, EmptyMessageID, time.Time{}, ed25519.PublicKey{}, 0, NewDataPayload([]byte("test")), nonce, ed25519.Signature{}) +} + +func newTestDataMessage(payloadString string) *Message { + return NewMessage(EmptyMessageID, EmptyMessageID, time.Now(), ed25519.PublicKey{}, 0, NewDataPayload([]byte(payloadString)), 0, ed25519.Signature{}) +} + +func newTestParentsDataMessage(payloadString string, parent1, parent2 MessageID) *Message { + return NewMessage(parent1, parent2, time.Now(), ed25519.PublicKey{}, 0, NewDataPayload([]byte(payloadString)), 0, ed25519.Signature{}) +} diff --git a/packages/tangle/tipselector.go b/packages/tangle/tipselector.go new file mode 100644 index 0000000000000000000000000000000000000000..ded0de28ac5f130c762795877cb0103bec5ef68f --- /dev/null +++ b/packages/tangle/tipselector.go @@ -0,0 +1,80 @@ +package tangle + +import ( + "github.com/iotaledger/hive.go/datastructure/randommap" +) + +// MessageTipSelector manages a map of tips and emits events for their removal and addition. +type MessageTipSelector struct { + tips *randommap.RandomMap + Events *MessageTipSelectorEvents +} + +// NewMessageTipSelector creates a new tip-selector. +func NewMessageTipSelector(tips ...MessageID) *MessageTipSelector { + tipSelector := &MessageTipSelector{ + tips: randommap.New(), + Events: newMessageTipSelectorEvents(), + } + + if tips != nil { + tipSelector.Set(tips...) + } + + return tipSelector +} + +// Set adds the given messageIDs as tips. +func (t *MessageTipSelector) Set(tips ...MessageID) { + for _, messageID := range tips { + t.tips.Set(messageID, messageID) + } +} + +// AddTip adds the given message as a tip. +func (t *MessageTipSelector) AddTip(msg *Message) { + messageID := msg.ID() + if t.tips.Set(messageID, messageID) { + t.Events.TipAdded.Trigger(messageID) + } + + parent1MessageID := msg.Parent1ID() + if _, deleted := t.tips.Delete(parent1MessageID); deleted { + t.Events.TipRemoved.Trigger(parent1MessageID) + } + + parent2MessageID := msg.Parent2ID() + if _, deleted := t.tips.Delete(parent2MessageID); deleted { + t.Events.TipRemoved.Trigger(parent2MessageID) + } +} + +// Tips returns two tips. +func (t *MessageTipSelector) Tips() (parent1MessageID, parent2MessageID MessageID) { + tip := t.tips.RandomEntry() + if tip == nil { + parent1MessageID = EmptyMessageID + parent2MessageID = EmptyMessageID + + return + } + + parent2MessageID = tip.(MessageID) + + if t.tips.Size() == 1 { + parent1MessageID = parent2MessageID + return + } + + parent1MessageID = t.tips.RandomEntry().(MessageID) + for parent1MessageID == parent2MessageID && t.tips.Size() > 1 { + parent1MessageID = t.tips.RandomEntry().(MessageID) + } + + return +} + +// TipCount the amount of current tips. +func (t *MessageTipSelector) TipCount() int { + return t.tips.Size() +} diff --git a/packages/tangle/tipselector_test.go b/packages/tangle/tipselector_test.go new file mode 100644 index 0000000000000000000000000000000000000000..3c139271de95d5c97dfa4e2c5ef880c447ee8cb6 --- /dev/null +++ b/packages/tangle/tipselector_test.go @@ -0,0 +1,47 @@ +package tangle + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMessageTipSelector(t *testing.T) { + // create tip selector + tipSelector := NewMessageTipSelector() + + // check if first tips point to genesis + parent11, parent21 := tipSelector.Tips() + assert.Equal(t, EmptyMessageID, parent11) + assert.Equal(t, EmptyMessageID, parent21) + + // create a message and attach it + message1 := newTestParentsDataMessage("testmessage1", parent11, parent21) + tipSelector.AddTip(message1) + + // check if the tip shows up in the tip count + assert.Equal(t, 1, tipSelector.TipCount()) + + // check if next tips point to our first message + parent12, parent22 := tipSelector.Tips() + assert.Equal(t, message1.ID(), parent12) + assert.Equal(t, message1.ID(), parent22) + + // create a 2nd message and attach it + message2 := newTestParentsDataMessage("testmessage2", EmptyMessageID, EmptyMessageID) + tipSelector.AddTip(message2) + + // check if the tip shows up in the tip count + assert.Equal(t, 2, tipSelector.TipCount()) + + // attach a message to our two tips + parent13, parent23 := tipSelector.Tips() + message3 := newTestParentsDataMessage("testmessage3", parent13, parent23) + tipSelector.AddTip(message3) + + // check if the tip shows replaces the current tips + parent14, parent24 := tipSelector.Tips() + assert.Equal(t, 1, tipSelector.TipCount()) + assert.Equal(t, message3.ID(), parent14) + assert.Equal(t, message3.ID(), parent24) +} diff --git a/plugins/syncbeacon/payload/payload.go b/plugins/syncbeacon/payload/payload.go index f4319d61456ccfee93326dfc651aff8984357b3e..aaa422dd27c0b9d5053c530b74b0c529adfa9af1 100644 --- a/plugins/syncbeacon/payload/payload.go +++ b/plugins/syncbeacon/payload/payload.go @@ -1,6 +1,8 @@ package payload import ( + "fmt" + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/payload" "github.com/iotaledger/hive.go/marshalutil" "github.com/iotaledger/hive.go/stringify" @@ -38,14 +40,17 @@ func FromBytes(bytes []byte) (result *Payload, consumedBytes int, err error) { result = &Payload{} _, err = marshalUtil.ReadUint32() if err != nil { + err = fmt.Errorf("failed to parse payload size of syncbeacon payload: %w", err) return } result.payloadType, err = marshalUtil.ReadUint32() if err != nil { + err = fmt.Errorf("failed to parse payload type of syncbeacon payload: %w", err) return } result.sentTime, err = marshalUtil.ReadInt64() if err != nil { + err = fmt.Errorf("failed to parse sent time of syncbeacon payload: %w", err) return }