Skip to content
Snippets Groups Projects
Unverified Commit 0018f4a6 authored by Hans Moog's avatar Hans Moog Committed by GitHub
Browse files

Feat: Added Signature checks for value transfers to MessageParser (#471)


* Feat: Added Signature checks for value transfers to MessageParser

* Refactor: renamed to SignatureFilter

* Update dapps/valuetransfers/packages/tangle/signature_filter.go

Co-authored-by: default avatarAngelo Capossele <angelocapossele@gmail.com>

* Update dapps/valuetransfers/packages/tangle/signature_filter.go

Co-authored-by: default avatarAngelo Capossele <angelocapossele@gmail.com>

* Refactor: refactored code according to guard programming style

* Refactor: refactored some code

* Refactor: refactored code + adjusted go import settings

* Refactor: changed goimport of value dapp

* Refactor: refactored some more code

* Feat: more refactor :D

* Refactor: simplified constructor

* Refactor: reordered properties of struct

* Refactor: test

* Fix: fixed bug in payload parser (fallback to generic)

* Refactor: adjusted comment

* Fix: fixed conditions for third test case

* Refactor: refactored comment

Co-authored-by: default avatarHans Moog <hm@mkjc.net>
Co-authored-by: default avatarAngelo Capossele <angelocapossele@gmail.com>
parent f4acc2bd
No related branches found
No related tags found
No related merge requests found
......@@ -93,6 +93,9 @@ func configure(_ *node.Plugin) {
log.Errorf("FPC failed for transaction with id '%s' - last opinion: '%s'", id, lastOpinion)
}))
// register SignatureFilter in Parser
messagelayer.MessageParser.AddMessageFilter(tangle.NewSignatureFilter())
// subscribe to message-layer
messagelayer.Tangle.Events.MessageSolid.Attach(events.NewClosure(onReceiveMessageFromMessageLayer))
}
......
package tangle
import (
"errors"
"sync"
"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/payload"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/messageparser"
"github.com/iotaledger/hive.go/async"
"github.com/iotaledger/hive.go/autopeering/peer"
)
// SignatureFilter represents a filter for the MessageParser that filters out transactions with an invalid signature.
type SignatureFilter struct {
onAcceptCallback func(message *message.Message, peer *peer.Peer)
onRejectCallback func(message *message.Message, err error, peer *peer.Peer)
onAcceptCallbackMutex sync.RWMutex
onRejectCallbackMutex sync.RWMutex
workerPool async.WorkerPool
}
// NewSignatureFilter is the constructor of the MessageFilter.
func NewSignatureFilter() *SignatureFilter {
return &SignatureFilter{}
}
// Filter get's called whenever a new message is received. It rejects the message, if the message is not a valid value
// message.
func (filter *SignatureFilter) Filter(message *message.Message, peer *peer.Peer) {
filter.workerPool.Submit(func() {
// accept message if the message is not a value message (it will be checked by other filters)
valuePayload := message.Payload()
if valuePayload.Type() != payload.Type {
filter.getAcceptCallback()(message, peer)
return
}
// reject if the payload can not be casted to a ValuePayload (invalid payload)
typeCastedValuePayload, ok := valuePayload.(*payload.Payload)
if !ok {
filter.getRejectCallback()(message, errors.New("invalid value message"), peer)
return
}
// reject message if it contains a transaction with invalid signatures
if !typeCastedValuePayload.Transaction().SignaturesValid() {
filter.getRejectCallback()(message, errors.New("invalid transaction signatures"), peer)
return
}
// if all previous checks passed: accept message
filter.getAcceptCallback()(message, peer)
})
}
// OnAccept registers the given callback as the acceptance function of the filter.
func (filter *SignatureFilter) OnAccept(callback func(message *message.Message, peer *peer.Peer)) {
filter.onAcceptCallbackMutex.Lock()
defer filter.onAcceptCallbackMutex.Unlock()
filter.onAcceptCallback = callback
}
// OnReject registers the given callback as the rejection function of the filter.
func (filter *SignatureFilter) OnReject(callback func(message *message.Message, err error, peer *peer.Peer)) {
filter.onRejectCallbackMutex.Lock()
defer filter.onRejectCallbackMutex.Unlock()
filter.onRejectCallback = callback
}
// Shutdown shuts down the filter.
func (filter *SignatureFilter) Shutdown() {
filter.workerPool.ShutdownGracefully()
}
// getAcceptCallback returns the callback that is executed when a message passes the filter.
func (filter *SignatureFilter) getAcceptCallback() func(message *message.Message, peer *peer.Peer) {
filter.onAcceptCallbackMutex.RLock()
defer filter.onAcceptCallbackMutex.RUnlock()
return filter.onAcceptCallback
}
// getRejectCallback returns the callback that is executed when a message is blocked by the filter.
func (filter *SignatureFilter) getRejectCallback() func(message *message.Message, err error, peer *peer.Peer) {
filter.onRejectCallbackMutex.RLock()
defer filter.onRejectCallbackMutex.RUnlock()
return filter.onRejectCallback
}
// interface contract (allow the compiler to check if the implementation has all of the required methods).
var _ messageparser.MessageFilter = &SignatureFilter{}
package tangle
import (
"sync"
"testing"
"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/address"
"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/address/signaturescheme"
"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/balance"
valuePayload "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/payload"
"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/transaction"
"github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/wallet"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/messagefactory"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/messageparser"
messagePayload "github.com/iotaledger/goshimmer/packages/binary/messagelayer/payload"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/tipselector"
"github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/identity"
"github.com/iotaledger/hive.go/kvstore/mapdb"
"github.com/iotaledger/hive.go/marshalutil"
"github.com/stretchr/testify/require"
)
func TestSignatureFilter(t *testing.T) {
// create parser
messageParser := newSyncMessageParser(NewSignatureFilter())
// create helper instances
seed := wallet.NewSeed()
messageFactory := messagefactory.New(mapdb.NewMapDB(), identity.GenerateLocalIdentity(), tipselector.New(), []byte("sequenceKey"))
// 1. test value message without signatures
{
// create unsigned transaction
tx := transaction.New(
transaction.NewInputs(
transaction.NewOutputID(seed.Address(0), transaction.GenesisID),
),
transaction.NewOutputs(map[address.Address][]*balance.Balance{
seed.Address(1): {
balance.New(balance.ColorIOTA, 1337),
},
}),
)
// parse message bytes
accepted, _, _, err := messageParser.Parse(messageFactory.IssuePayload(valuePayload.New(valuePayload.GenesisID, valuePayload.GenesisID, tx)).Bytes(), &peer.Peer{})
// check results (should be rejected)
require.Equal(t, false, accepted)
require.NotNil(t, err)
require.Equal(t, "invalid transaction signatures", err.Error())
}
// 2. test value message with signatures
{
// create signed transaction
tx := transaction.New(
transaction.NewInputs(
transaction.NewOutputID(seed.Address(0), transaction.GenesisID),
),
transaction.NewOutputs(map[address.Address][]*balance.Balance{
seed.Address(1): {
balance.New(balance.ColorIOTA, 1337),
},
}),
)
tx.Sign(signaturescheme.ED25519(*seed.KeyPair(0)))
// parse message bytes
accepted, _, _, err := messageParser.Parse(messageFactory.IssuePayload(valuePayload.New(valuePayload.GenesisID, valuePayload.GenesisID, tx)).Bytes(), &peer.Peer{})
// check results (should be accepted)
require.Equal(t, true, accepted)
require.Nil(t, err)
}
// 3. test message with an invalid value payload
{
// create a data payload
marshalUtil := marshalutil.New(messagePayload.NewData([]byte("test")).Bytes())
// set the type to be a value payload
marshalUtil.WriteSeek(0)
marshalUtil.WriteUint32(valuePayload.Type)
// parse modified bytes back into a payload object
dataPayload, err, _ := messagePayload.DataFromBytes(marshalUtil.Bytes())
require.NoError(t, err)
// parse message bytes
accepted, _, _, err := messageParser.Parse(messageFactory.IssuePayload(dataPayload).Bytes(), &peer.Peer{})
// check results (should be rejected)
require.Equal(t, false, accepted)
require.NotNil(t, err)
require.Equal(t, "invalid value message", err.Error())
}
}
// newSyncMessageParser creates a wrapped MessageParser that works synchronously by using a WaitGroup to wait for the
// parse result.
func newSyncMessageParser(messageFilters ...messageparser.MessageFilter) (tester *syncMessageParser) {
// initialize MessageParser
messageParser := messageparser.New()
for _, messageFilter := range messageFilters {
messageParser.AddMessageFilter(messageFilter)
}
// create wrapped result
tester = &syncMessageParser{
messageParser: messageParser,
}
// setup async behavior (store result + mark WaitGroup done)
messageParser.Events.BytesRejected.Attach(events.NewClosure(func(bytes []byte, err error, peer *peer.Peer) {
tester.result = &messageParserResult{
accepted: false,
message: nil,
peer: peer,
err: err,
}
tester.wg.Done()
}))
messageParser.Events.MessageRejected.Attach(events.NewClosure(func(message *message.Message, err error, peer *peer.Peer) {
tester.result = &messageParserResult{
accepted: false,
message: message,
peer: peer,
err: err,
}
tester.wg.Done()
}))
messageParser.Events.MessageParsed.Attach(events.NewClosure(func(message *message.Message, peer *peer.Peer) {
tester.result = &messageParserResult{
accepted: true,
message: message,
peer: peer,
err: nil,
}
tester.wg.Done()
}))
return
}
// syncMessageParser is a wrapper for the MessageParser that allows to parse Messages synchronously.
type syncMessageParser struct {
messageParser *messageparser.MessageParser
result *messageParserResult
wg sync.WaitGroup
}
// Parse parses the message bytes into a message. It either gets accepted or rejected.
func (tester *syncMessageParser) Parse(messageBytes []byte, peer *peer.Peer) (bool, *message.Message, *peer.Peer, error) {
tester.wg.Add(1)
tester.messageParser.Parse(messageBytes, peer)
tester.wg.Wait()
return tester.result.accepted, tester.result.message, tester.result.peer, tester.result.err
}
// messageParserResult is a struct that stores the results of a parsing operation, so we can return them after the
// WaitGroup is done waiting.
type messageParserResult struct {
accepted bool
message *message.Message
peer *peer.Peer
err error
}
......@@ -91,6 +91,7 @@ func (dataPayload *Data) Unmarshal(data []byte) (err error) {
func (dataPayload *Data) String() string {
return stringify.Struct("Data",
stringify.StructField("type", int(dataPayload.Type())),
stringify.StructField("data", string(dataPayload.Data())),
)
}
......
......@@ -46,9 +46,15 @@ func FromBytes(bytes []byte) (result Payload, consumedBytes int, err error) {
return
}
readOffset := marshalUtil.ReadOffset()
result, err = GetUnmarshaler(payloadType)(payloadBytes)
if err != nil {
return
// fallback to the generic unmarshaler if registered type fails to unmarshal
marshalUtil.ReadSeek(readOffset)
result, err = GenericPayloadUnmarshalerFactory(payloadType)(payloadBytes)
if err != nil {
return
}
}
// return the number of bytes we processed
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment