From 0018f4a637a35d63b6c2977229f09b2a888586b2 Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Wed, 17 Jun 2020 11:26:29 +0200 Subject: [PATCH] Feat: Added Signature checks for value transfers to MessageParser (#471) * Feat: Added Signature checks for value transfers to MessageParser * Refactor: renamed to SignatureFilter * Update dapps/valuetransfers/packages/tangle/signature_filter.go Co-authored-by: Angelo Capossele <angelocapossele@gmail.com> * Update dapps/valuetransfers/packages/tangle/signature_filter.go Co-authored-by: Angelo Capossele <angelocapossele@gmail.com> * Refactor: refactored code according to guard programming style * Refactor: refactored some code * Refactor: refactored code + adjusted go import settings * Refactor: changed goimport of value dapp * Refactor: refactored some more code * Feat: more refactor :D * Refactor: simplified constructor * Refactor: reordered properties of struct * Refactor: test * Fix: fixed bug in payload parser (fallback to generic) * Refactor: adjusted comment * Fix: fixed conditions for third test case * Refactor: refactored comment Co-authored-by: Hans Moog <hm@mkjc.net> Co-authored-by: Angelo Capossele <angelocapossele@gmail.com> --- dapps/valuetransfers/dapp.go | 3 + .../packages/tangle/signature_filter.go | 98 ++++++++++ .../packages/tangle/signature_filter_test.go | 175 ++++++++++++++++++ packages/binary/messagelayer/payload/data.go | 1 + .../binary/messagelayer/payload/payload.go | 8 +- 5 files changed, 284 insertions(+), 1 deletion(-) create mode 100644 dapps/valuetransfers/packages/tangle/signature_filter.go create mode 100644 dapps/valuetransfers/packages/tangle/signature_filter_test.go diff --git a/dapps/valuetransfers/dapp.go b/dapps/valuetransfers/dapp.go index 7964e471..fd63cfe7 100644 --- a/dapps/valuetransfers/dapp.go +++ b/dapps/valuetransfers/dapp.go @@ -93,6 +93,9 @@ func configure(_ *node.Plugin) { log.Errorf("FPC failed for transaction with id '%s' - last opinion: '%s'", id, lastOpinion) })) + // register SignatureFilter in Parser + messagelayer.MessageParser.AddMessageFilter(tangle.NewSignatureFilter()) + // subscribe to message-layer messagelayer.Tangle.Events.MessageSolid.Attach(events.NewClosure(onReceiveMessageFromMessageLayer)) } diff --git a/dapps/valuetransfers/packages/tangle/signature_filter.go b/dapps/valuetransfers/packages/tangle/signature_filter.go new file mode 100644 index 00000000..3720dab3 --- /dev/null +++ b/dapps/valuetransfers/packages/tangle/signature_filter.go @@ -0,0 +1,98 @@ +package tangle + +import ( + "errors" + "sync" + + "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/payload" + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/messageparser" + "github.com/iotaledger/hive.go/async" + "github.com/iotaledger/hive.go/autopeering/peer" +) + +// SignatureFilter represents a filter for the MessageParser that filters out transactions with an invalid signature. +type SignatureFilter struct { + onAcceptCallback func(message *message.Message, peer *peer.Peer) + onRejectCallback func(message *message.Message, err error, peer *peer.Peer) + onAcceptCallbackMutex sync.RWMutex + onRejectCallbackMutex sync.RWMutex + workerPool async.WorkerPool +} + +// NewSignatureFilter is the constructor of the MessageFilter. +func NewSignatureFilter() *SignatureFilter { + return &SignatureFilter{} +} + +// Filter get's called whenever a new message is received. It rejects the message, if the message is not a valid value +// message. +func (filter *SignatureFilter) Filter(message *message.Message, peer *peer.Peer) { + filter.workerPool.Submit(func() { + // accept message if the message is not a value message (it will be checked by other filters) + valuePayload := message.Payload() + if valuePayload.Type() != payload.Type { + filter.getAcceptCallback()(message, peer) + + return + } + + // reject if the payload can not be casted to a ValuePayload (invalid payload) + typeCastedValuePayload, ok := valuePayload.(*payload.Payload) + if !ok { + filter.getRejectCallback()(message, errors.New("invalid value message"), peer) + + return + } + + // reject message if it contains a transaction with invalid signatures + if !typeCastedValuePayload.Transaction().SignaturesValid() { + filter.getRejectCallback()(message, errors.New("invalid transaction signatures"), peer) + + return + } + + // if all previous checks passed: accept message + filter.getAcceptCallback()(message, peer) + }) +} + +// OnAccept registers the given callback as the acceptance function of the filter. +func (filter *SignatureFilter) OnAccept(callback func(message *message.Message, peer *peer.Peer)) { + filter.onAcceptCallbackMutex.Lock() + defer filter.onAcceptCallbackMutex.Unlock() + + filter.onAcceptCallback = callback +} + +// OnReject registers the given callback as the rejection function of the filter. +func (filter *SignatureFilter) OnReject(callback func(message *message.Message, err error, peer *peer.Peer)) { + filter.onRejectCallbackMutex.Lock() + defer filter.onRejectCallbackMutex.Unlock() + + filter.onRejectCallback = callback +} + +// Shutdown shuts down the filter. +func (filter *SignatureFilter) Shutdown() { + filter.workerPool.ShutdownGracefully() +} + +// getAcceptCallback returns the callback that is executed when a message passes the filter. +func (filter *SignatureFilter) getAcceptCallback() func(message *message.Message, peer *peer.Peer) { + filter.onAcceptCallbackMutex.RLock() + defer filter.onAcceptCallbackMutex.RUnlock() + + return filter.onAcceptCallback +} + +// getRejectCallback returns the callback that is executed when a message is blocked by the filter. +func (filter *SignatureFilter) getRejectCallback() func(message *message.Message, err error, peer *peer.Peer) { + filter.onRejectCallbackMutex.RLock() + defer filter.onRejectCallbackMutex.RUnlock() + + return filter.onRejectCallback +} + +// interface contract (allow the compiler to check if the implementation has all of the required methods). +var _ messageparser.MessageFilter = &SignatureFilter{} diff --git a/dapps/valuetransfers/packages/tangle/signature_filter_test.go b/dapps/valuetransfers/packages/tangle/signature_filter_test.go new file mode 100644 index 00000000..0dabaa30 --- /dev/null +++ b/dapps/valuetransfers/packages/tangle/signature_filter_test.go @@ -0,0 +1,175 @@ +package tangle + +import ( + "sync" + "testing" + + "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/address" + "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/address/signaturescheme" + "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/balance" + valuePayload "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/payload" + "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/transaction" + "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/wallet" + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/messagefactory" + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/messageparser" + messagePayload "github.com/iotaledger/goshimmer/packages/binary/messagelayer/payload" + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/tipselector" + "github.com/iotaledger/hive.go/autopeering/peer" + "github.com/iotaledger/hive.go/events" + "github.com/iotaledger/hive.go/identity" + "github.com/iotaledger/hive.go/kvstore/mapdb" + "github.com/iotaledger/hive.go/marshalutil" + "github.com/stretchr/testify/require" +) + +func TestSignatureFilter(t *testing.T) { + // create parser + messageParser := newSyncMessageParser(NewSignatureFilter()) + + // create helper instances + seed := wallet.NewSeed() + messageFactory := messagefactory.New(mapdb.NewMapDB(), identity.GenerateLocalIdentity(), tipselector.New(), []byte("sequenceKey")) + + // 1. test value message without signatures + { + // create unsigned transaction + tx := transaction.New( + transaction.NewInputs( + transaction.NewOutputID(seed.Address(0), transaction.GenesisID), + ), + transaction.NewOutputs(map[address.Address][]*balance.Balance{ + seed.Address(1): { + balance.New(balance.ColorIOTA, 1337), + }, + }), + ) + + // parse message bytes + accepted, _, _, err := messageParser.Parse(messageFactory.IssuePayload(valuePayload.New(valuePayload.GenesisID, valuePayload.GenesisID, tx)).Bytes(), &peer.Peer{}) + + // check results (should be rejected) + require.Equal(t, false, accepted) + require.NotNil(t, err) + require.Equal(t, "invalid transaction signatures", err.Error()) + } + + // 2. test value message with signatures + { + // create signed transaction + tx := transaction.New( + transaction.NewInputs( + transaction.NewOutputID(seed.Address(0), transaction.GenesisID), + ), + transaction.NewOutputs(map[address.Address][]*balance.Balance{ + seed.Address(1): { + balance.New(balance.ColorIOTA, 1337), + }, + }), + ) + tx.Sign(signaturescheme.ED25519(*seed.KeyPair(0))) + + // parse message bytes + accepted, _, _, err := messageParser.Parse(messageFactory.IssuePayload(valuePayload.New(valuePayload.GenesisID, valuePayload.GenesisID, tx)).Bytes(), &peer.Peer{}) + + // check results (should be accepted) + require.Equal(t, true, accepted) + require.Nil(t, err) + } + + // 3. test message with an invalid value payload + { + // create a data payload + marshalUtil := marshalutil.New(messagePayload.NewData([]byte("test")).Bytes()) + + // set the type to be a value payload + marshalUtil.WriteSeek(0) + marshalUtil.WriteUint32(valuePayload.Type) + + // parse modified bytes back into a payload object + dataPayload, err, _ := messagePayload.DataFromBytes(marshalUtil.Bytes()) + require.NoError(t, err) + + // parse message bytes + accepted, _, _, err := messageParser.Parse(messageFactory.IssuePayload(dataPayload).Bytes(), &peer.Peer{}) + + // check results (should be rejected) + require.Equal(t, false, accepted) + require.NotNil(t, err) + require.Equal(t, "invalid value message", err.Error()) + } +} + +// newSyncMessageParser creates a wrapped MessageParser that works synchronously by using a WaitGroup to wait for the +// parse result. +func newSyncMessageParser(messageFilters ...messageparser.MessageFilter) (tester *syncMessageParser) { + // initialize MessageParser + messageParser := messageparser.New() + for _, messageFilter := range messageFilters { + messageParser.AddMessageFilter(messageFilter) + } + + // create wrapped result + tester = &syncMessageParser{ + messageParser: messageParser, + } + + // setup async behavior (store result + mark WaitGroup done) + messageParser.Events.BytesRejected.Attach(events.NewClosure(func(bytes []byte, err error, peer *peer.Peer) { + tester.result = &messageParserResult{ + accepted: false, + message: nil, + peer: peer, + err: err, + } + + tester.wg.Done() + })) + messageParser.Events.MessageRejected.Attach(events.NewClosure(func(message *message.Message, err error, peer *peer.Peer) { + tester.result = &messageParserResult{ + accepted: false, + message: message, + peer: peer, + err: err, + } + + tester.wg.Done() + })) + messageParser.Events.MessageParsed.Attach(events.NewClosure(func(message *message.Message, peer *peer.Peer) { + tester.result = &messageParserResult{ + accepted: true, + message: message, + peer: peer, + err: nil, + } + + tester.wg.Done() + })) + + return +} + +// syncMessageParser is a wrapper for the MessageParser that allows to parse Messages synchronously. +type syncMessageParser struct { + messageParser *messageparser.MessageParser + result *messageParserResult + wg sync.WaitGroup +} + +// Parse parses the message bytes into a message. It either gets accepted or rejected. +func (tester *syncMessageParser) Parse(messageBytes []byte, peer *peer.Peer) (bool, *message.Message, *peer.Peer, error) { + tester.wg.Add(1) + tester.messageParser.Parse(messageBytes, peer) + tester.wg.Wait() + + return tester.result.accepted, tester.result.message, tester.result.peer, tester.result.err +} + +// messageParserResult is a struct that stores the results of a parsing operation, so we can return them after the +// WaitGroup is done waiting. +type messageParserResult struct { + accepted bool + message *message.Message + peer *peer.Peer + err error +} diff --git a/packages/binary/messagelayer/payload/data.go b/packages/binary/messagelayer/payload/data.go index 6103a264..9db9cc64 100644 --- a/packages/binary/messagelayer/payload/data.go +++ b/packages/binary/messagelayer/payload/data.go @@ -91,6 +91,7 @@ func (dataPayload *Data) Unmarshal(data []byte) (err error) { func (dataPayload *Data) String() string { return stringify.Struct("Data", + stringify.StructField("type", int(dataPayload.Type())), stringify.StructField("data", string(dataPayload.Data())), ) } diff --git a/packages/binary/messagelayer/payload/payload.go b/packages/binary/messagelayer/payload/payload.go index a918ed1b..f0f013e6 100644 --- a/packages/binary/messagelayer/payload/payload.go +++ b/packages/binary/messagelayer/payload/payload.go @@ -46,9 +46,15 @@ func FromBytes(bytes []byte) (result Payload, consumedBytes int, err error) { return } + readOffset := marshalUtil.ReadOffset() result, err = GetUnmarshaler(payloadType)(payloadBytes) if err != nil { - return + // fallback to the generic unmarshaler if registered type fails to unmarshal + marshalUtil.ReadSeek(readOffset) + result, err = GenericPayloadUnmarshalerFactory(payloadType)(payloadBytes) + if err != nil { + return + } } // return the number of bytes we processed -- GitLab