diff --git a/.gitignore b/.gitignore index 87725e6da2f14d20435f3bf9504e84bd09dd6903..ed554be4bbe94dca5030e830ec94f18fa5ef4953 100644 --- a/.gitignore +++ b/.gitignore @@ -28,4 +28,5 @@ objectsdb/ shimmer goshimmer -config.json \ No newline at end of file +config.json +.env \ No newline at end of file diff --git a/config.default.json b/config.default.json index 3161b48f6ff436ba5113347f95791fef5bc0cf8d..059be5f0ed5343b805a56f32c75fb3d558b5c8f4 100644 --- a/config.default.json +++ b/config.default.json @@ -67,5 +67,8 @@ "username": "goshimmer" }, "bindAddress": "127.0.0.1:8080" + }, + "networkdelay": { + "originPublicKey": "9DB3j9cWYSuEEtkvanrzqkzCQMdH1FGv3TawJdVbDxkd" } } diff --git a/dapps/networkdelay/dapp.go b/dapps/networkdelay/dapp.go new file mode 100644 index 0000000000000000000000000000000000000000..36611d18db9398465168fe0d19358021db196b9f --- /dev/null +++ b/dapps/networkdelay/dapp.go @@ -0,0 +1,132 @@ +package networkdelay + +import ( + "fmt" + "time" + + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" + messageTangle "github.com/iotaledger/goshimmer/packages/binary/messagelayer/tangle" + "github.com/iotaledger/goshimmer/plugins/autopeering/local" + "github.com/iotaledger/goshimmer/plugins/config" + "github.com/iotaledger/goshimmer/plugins/messagelayer" + "github.com/iotaledger/goshimmer/plugins/remotelog" + "github.com/iotaledger/hive.go/crypto/ed25519" + "github.com/iotaledger/hive.go/events" + "github.com/iotaledger/hive.go/logger" + "github.com/iotaledger/hive.go/node" + "github.com/mr-tron/base58" +) + +const ( + // PluginName contains the human readable name of the plugin. + PluginName = "NetworkDelay" + + // CfgNetworkDelayOriginPublicKey defines the config flag of the issuer node public key. + CfgNetworkDelayOriginPublicKey = "networkdelay.originPublicKey" + + remoteLogType = "networkdelay" +) + +var ( + // App is the "plugin" instance of the network delay application. + App = node.NewPlugin(PluginName, node.Disabled, configure) + + // log holds a reference to the logger used by this app. + log *logger.Logger + + remoteLogger *remotelog.RemoteLoggerConn + + myID string + myPublicKey ed25519.PublicKey + originPublicKey ed25519.PublicKey +) + +func configure(_ *node.Plugin) { + // configure logger + log = logger.NewLogger(PluginName) + + remoteLogger = remotelog.RemoteLogger() + + if local.GetInstance() != nil { + myID = local.GetInstance().ID().String() + myPublicKey = local.GetInstance().PublicKey() + } + + // get origin public key from config + bytes, err := base58.Decode(config.Node.GetString(CfgNetworkDelayOriginPublicKey)) + if err != nil { + log.Fatalf("could not parse %s config entry as base58. %v", CfgNetworkDelayOriginPublicKey, err) + } + originPublicKey, _, err = ed25519.PublicKeyFromBytes(bytes) + if err != nil { + log.Fatalf("could not parse %s config entry as public key. %v", CfgNetworkDelayOriginPublicKey, err) + } + + configureWebAPI() + + // subscribe to message-layer + messagelayer.Tangle.Events.MessageSolid.Attach(events.NewClosure(onReceiveMessageFromMessageLayer)) +} + +func onReceiveMessageFromMessageLayer(cachedMessage *message.CachedMessage, cachedMessageMetadata *messageTangle.CachedMessageMetadata) { + defer cachedMessage.Release() + defer cachedMessageMetadata.Release() + + solidMessage := cachedMessage.Unwrap() + if solidMessage == nil { + log.Debug("failed to unpack solid message from message layer") + + return + } + + messagePayload := solidMessage.Payload() + if messagePayload.Type() != Type { + return + } + + // check for node identity + issuerPubKey := solidMessage.IssuerPublicKey() + if issuerPubKey != originPublicKey || issuerPubKey == myPublicKey { + return + } + + networkDelayObject, ok := messagePayload.(*Object) + if !ok { + log.Info("could not cast payload to network delay object") + + return + } + + now := time.Now().UnixNano() + + // abort if message was sent more than 1min ago + // this should only happen due to a node resyncing + fmt.Println(time.Duration(now-networkDelayObject.sentTime), time.Minute) + if time.Duration(now-networkDelayObject.sentTime) > time.Minute { + log.Debugf("Received network delay message with >1min delay\n%s", networkDelayObject) + return + } + + sendToRemoteLog(networkDelayObject, now) +} + +func sendToRemoteLog(networkDelayObject *Object, receiveTime int64) { + m := networkDelay{ + NodeID: myID, + ID: networkDelayObject.id.String(), + SentTime: networkDelayObject.sentTime, + ReceiveTime: receiveTime, + Delta: receiveTime - networkDelayObject.sentTime, + Type: remoteLogType, + } + _ = remoteLogger.Send(m) +} + +type networkDelay struct { + NodeID string `json:"nodeId"` + ID string `json:"id"` + SentTime int64 `json:"sentTime"` + ReceiveTime int64 `json:"receiveTime"` + Delta int64 `json:"delta"` + Type string `json:"type"` +} diff --git a/dapps/networkdelay/object.go b/dapps/networkdelay/object.go new file mode 100644 index 0000000000000000000000000000000000000000..5b41369019c68354bdaaa1fd9a3f5db26c8a3b6a --- /dev/null +++ b/dapps/networkdelay/object.go @@ -0,0 +1,156 @@ +package networkdelay + +import ( + "sync" + + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/payload" + "github.com/iotaledger/hive.go/marshalutil" + "github.com/iotaledger/hive.go/stringify" + "github.com/mr-tron/base58" +) + +// ID represents a 32 byte ID of a network delay object. +type ID [32]byte + +// String returns a human-friendly representation of the ID. +func (id ID) String() string { + return base58.Encode(id[:]) +} + +// Object represents the network delay object type. +type Object struct { + id ID + sentTime int64 + + bytes []byte + bytesMutex sync.RWMutex +} + +// NewObject creates a new network delay object. +func NewObject(id ID, sentTime int64) *Object { + return &Object{ + id: id, + sentTime: sentTime, + } +} + +// FromBytes parses the marshaled version of an Object into a Go object. +// It either returns a new Object or fills an optionally provided Object with the parsed information. +func FromBytes(bytes []byte, optionalTargetObject ...*Object) (result *Object, consumedBytes int, err error) { + marshalUtil := marshalutil.New(bytes) + result, err = Parse(marshalUtil, optionalTargetObject...) + consumedBytes = marshalUtil.ReadOffset() + + return +} + +// Parse unmarshals an Object using the given marshalUtil (for easier marshaling/unmarshaling). +func Parse(marshalUtil *marshalutil.MarshalUtil, optionalTarget ...*Object) (result *Object, err error) { + // determine the target that will hold the unmarshaled information + switch len(optionalTarget) { + case 0: + result = &Object{} + case 1: + result = optionalTarget[0] + default: + panic("too many arguments in call to FromBytes") + } + + // read information that are required to identify the object from the outside + if _, err = marshalUtil.ReadUint32(); err != nil { + return + } + if _, err = marshalUtil.ReadUint32(); err != nil { + return + } + + // parse id + id, err := marshalUtil.ReadBytes(32) + if err != nil { + return + } + copy(result.id[:], id) + + // parse sent time + if result.sentTime, err = marshalUtil.ReadInt64(); err != nil { + return + } + + // store bytes, so we don't have to marshal manually + consumedBytes := marshalUtil.ReadOffset() + copy(result.bytes, marshalUtil.Bytes()[:consumedBytes]) + + return +} + +// Bytes returns a marshaled version of this Object. +func (o *Object) Bytes() (bytes []byte) { + // acquire lock for reading bytes + o.bytesMutex.RLock() + + // return if bytes have been determined already + if bytes = o.bytes; bytes != nil { + o.bytesMutex.RUnlock() + return + } + + // switch to write lock + o.bytesMutex.RUnlock() + o.bytesMutex.Lock() + defer o.bytesMutex.Unlock() + + // return if bytes have been determined in the mean time + if bytes = o.bytes; bytes != nil { + return + } + + objectLength := len(o.id) + marshalutil.INT64_SIZE + // initialize helper + marshalUtil := marshalutil.New(marshalutil.UINT32_SIZE + marshalutil.UINT32_SIZE + objectLength) + + // marshal the payload specific information + marshalUtil.WriteUint32(Type) + marshalUtil.WriteUint32(uint32(objectLength)) + marshalUtil.WriteBytes(o.id[:]) + marshalUtil.WriteInt64(o.sentTime) + + bytes = marshalUtil.Bytes() + + return +} + +// String returns a human-friendly representation of the Object. +func (o *Object) String() string { + return stringify.Struct("NetworkDelayObject", + stringify.StructField("id", o.id), + stringify.StructField("sentTime", uint64(o.sentTime)), + ) +} + +// region Payload implementation /////////////////////////////////////////////////////////////////////////////////////// + +// Type represents the identifier which addresses the network delay Object type. +const Type = payload.Type(189) + +// Type returns the type of the Object. +func (o *Object) Type() payload.Type { + return Type +} + +// Unmarshal unmarshals the payload from the given bytes. +func (o *Object) Unmarshal(data []byte) (err error) { + _, _, err = FromBytes(data, o) + + return +} + +func init() { + payload.RegisterType(Type, func(data []byte) (payload payload.Payload, err error) { + payload = &Object{} + err = payload.Unmarshal(data) + + return + }) +} + +// // endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/dapps/networkdelay/webapi.go b/dapps/networkdelay/webapi.go new file mode 100644 index 0000000000000000000000000000000000000000..b7ecf54f5fbf20c0e81e33c3e7dd0a5473f00727 --- /dev/null +++ b/dapps/networkdelay/webapi.go @@ -0,0 +1,38 @@ +package networkdelay + +import ( + "math/rand" + "net/http" + "time" + + "github.com/iotaledger/goshimmer/plugins/issuer" + "github.com/iotaledger/goshimmer/plugins/webapi" + "github.com/labstack/echo" +) + +func configureWebAPI() { + webapi.Server.POST("networkdelay", broadcastNetworkDelayObject) +} + +// broadcastNetworkDelayObject creates a message with a network delay object and +// broadcasts it to the node's neighbors. It returns the message ID if successful. +func broadcastNetworkDelayObject(c echo.Context) error { + // generate random id + rand.Seed(time.Now().UnixNano()) + var id [32]byte + if _, err := rand.Read(id[:]); err != nil { + return c.JSON(http.StatusInternalServerError, Response{Error: err.Error()}) + } + + msg, err := issuer.IssuePayload(NewObject(id, time.Now().UnixNano())) + if err != nil { + return c.JSON(http.StatusBadRequest, Response{Error: err.Error()}) + } + return c.JSON(http.StatusOK, Response{ID: msg.Id().String()}) +} + +// Response contains the ID of the message sent. +type Response struct { + ID string `json:"id,omitempty"` + Error string `json:"error,omitempty"` +} diff --git a/dapps/valuetransfers/dapp.go b/dapps/valuetransfers/dapp.go index 45d8ad1a7acfbd57dc4814cdd4126ccdb565cef9..2347b2e8f1c47f0cca3bae3ad68aabcf299a0113 100644 --- a/dapps/valuetransfers/dapp.go +++ b/dapps/valuetransfers/dapp.go @@ -125,6 +125,9 @@ func configure(_ *node.Plugin) { log.Errorf("FPC failed for transaction with id '%s' - last opinion: '%s'", id, lastOpinion) })) + // register SignatureFilter in Parser + messagelayer.MessageParser.AddMessageFilter(tangle.NewSignatureFilter()) + // subscribe to message-layer messagelayer.Tangle.Events.MessageSolid.Attach(events.NewClosure(onReceiveMessageFromMessageLayer)) } diff --git a/dapps/valuetransfers/packages/branchmanager/objectstorage.go b/dapps/valuetransfers/packages/branchmanager/objectstorage.go index fede032fbef8ed4df734b530299f7752fe865974..c9cbfe4abe13858e9f33906a27d86cbe63afc10e 100644 --- a/dapps/valuetransfers/packages/branchmanager/objectstorage.go +++ b/dapps/valuetransfers/packages/branchmanager/objectstorage.go @@ -15,6 +15,8 @@ const ( osChildBranch osConflict osConflictMember + + cacheTime = 30 * time.Second ) var ( @@ -24,23 +26,23 @@ var ( }) osBranchOptions = []objectstorage.Option{ - objectstorage.CacheTime(60 * time.Second), + objectstorage.CacheTime(cacheTime), osLeakDetectionOption, } osChildBranchOptions = []objectstorage.Option{ - objectstorage.CacheTime(60 * time.Second), + objectstorage.CacheTime(cacheTime), objectstorage.PartitionKey(BranchIDLength, BranchIDLength), osLeakDetectionOption, } osConflictOptions = []objectstorage.Option{ - objectstorage.CacheTime(60 * time.Second), + objectstorage.CacheTime(cacheTime), osLeakDetectionOption, } osConflictMemberOptions = []objectstorage.Option{ - objectstorage.CacheTime(60 * time.Second), + objectstorage.CacheTime(cacheTime), objectstorage.PartitionKey(ConflictIDLength, BranchIDLength), osLeakDetectionOption, } diff --git a/dapps/valuetransfers/packages/tangle/objectstorage.go b/dapps/valuetransfers/packages/tangle/objectstorage.go index 29fee08f3ce8419119ce562dbe67707adc1cd3c3..e7ed013043d5f781219f0dc7030275975d249bc3 100644 --- a/dapps/valuetransfers/packages/tangle/objectstorage.go +++ b/dapps/valuetransfers/packages/tangle/objectstorage.go @@ -22,6 +22,8 @@ const ( osAttachment osOutput osConsumer + + cacheTime = 20 * time.Second ) var ( diff --git a/dapps/valuetransfers/packages/tangle/output.go b/dapps/valuetransfers/packages/tangle/output.go index 3771a853ca8a912057c5a5e0afe6dfc5c25c3bcd..5d19ec51f12cd50a29abc7de98e279a8b8d5cb8b 100644 --- a/dapps/valuetransfers/packages/tangle/output.go +++ b/dapps/valuetransfers/packages/tangle/output.go @@ -144,8 +144,8 @@ func (output *Output) BranchID() branchmanager.BranchID { return output.branchID } -// SetBranchID is the setter for the property that indicates in which ledger state branch the output is booked. -func (output *Output) SetBranchID(branchID branchmanager.BranchID) (modified bool) { +// setBranchID is the setter for the property that indicates in which ledger state branch the output is booked. +func (output *Output) setBranchID(branchID branchmanager.BranchID) (modified bool) { output.branchIDMutex.RLock() if output.branchID == branchID { output.branchIDMutex.RUnlock() diff --git a/dapps/valuetransfers/packages/tangle/payloadmetadata.go b/dapps/valuetransfers/packages/tangle/payloadmetadata.go index 11ce6e61e28a5c4010da0d102367f57707b796aa..6c65c2ac5e1fa48f66dc64380021bef63525cd89 100644 --- a/dapps/valuetransfers/packages/tangle/payloadmetadata.go +++ b/dapps/valuetransfers/packages/tangle/payloadmetadata.go @@ -249,8 +249,8 @@ func (payloadMetadata *PayloadMetadata) BranchID() branchmanager.BranchID { return payloadMetadata.branchID } -// SetBranchID is the setter for the BranchID that the corresponding Payload is booked into. -func (payloadMetadata *PayloadMetadata) SetBranchID(branchID branchmanager.BranchID) (modified bool) { +// setBranchID is the setter for the BranchID that the corresponding Payload is booked into. +func (payloadMetadata *PayloadMetadata) setBranchID(branchID branchmanager.BranchID) (modified bool) { payloadMetadata.branchIDMutex.RLock() if branchID == payloadMetadata.branchID { payloadMetadata.branchIDMutex.RUnlock() diff --git a/dapps/valuetransfers/packages/tangle/signature_filter.go b/dapps/valuetransfers/packages/tangle/signature_filter.go new file mode 100644 index 0000000000000000000000000000000000000000..3720dab3a1923799579df560a72378d9e609541a --- /dev/null +++ b/dapps/valuetransfers/packages/tangle/signature_filter.go @@ -0,0 +1,98 @@ +package tangle + +import ( + "errors" + "sync" + + "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/payload" + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/messageparser" + "github.com/iotaledger/hive.go/async" + "github.com/iotaledger/hive.go/autopeering/peer" +) + +// SignatureFilter represents a filter for the MessageParser that filters out transactions with an invalid signature. +type SignatureFilter struct { + onAcceptCallback func(message *message.Message, peer *peer.Peer) + onRejectCallback func(message *message.Message, err error, peer *peer.Peer) + onAcceptCallbackMutex sync.RWMutex + onRejectCallbackMutex sync.RWMutex + workerPool async.WorkerPool +} + +// NewSignatureFilter is the constructor of the MessageFilter. +func NewSignatureFilter() *SignatureFilter { + return &SignatureFilter{} +} + +// Filter get's called whenever a new message is received. It rejects the message, if the message is not a valid value +// message. +func (filter *SignatureFilter) Filter(message *message.Message, peer *peer.Peer) { + filter.workerPool.Submit(func() { + // accept message if the message is not a value message (it will be checked by other filters) + valuePayload := message.Payload() + if valuePayload.Type() != payload.Type { + filter.getAcceptCallback()(message, peer) + + return + } + + // reject if the payload can not be casted to a ValuePayload (invalid payload) + typeCastedValuePayload, ok := valuePayload.(*payload.Payload) + if !ok { + filter.getRejectCallback()(message, errors.New("invalid value message"), peer) + + return + } + + // reject message if it contains a transaction with invalid signatures + if !typeCastedValuePayload.Transaction().SignaturesValid() { + filter.getRejectCallback()(message, errors.New("invalid transaction signatures"), peer) + + return + } + + // if all previous checks passed: accept message + filter.getAcceptCallback()(message, peer) + }) +} + +// OnAccept registers the given callback as the acceptance function of the filter. +func (filter *SignatureFilter) OnAccept(callback func(message *message.Message, peer *peer.Peer)) { + filter.onAcceptCallbackMutex.Lock() + defer filter.onAcceptCallbackMutex.Unlock() + + filter.onAcceptCallback = callback +} + +// OnReject registers the given callback as the rejection function of the filter. +func (filter *SignatureFilter) OnReject(callback func(message *message.Message, err error, peer *peer.Peer)) { + filter.onRejectCallbackMutex.Lock() + defer filter.onRejectCallbackMutex.Unlock() + + filter.onRejectCallback = callback +} + +// Shutdown shuts down the filter. +func (filter *SignatureFilter) Shutdown() { + filter.workerPool.ShutdownGracefully() +} + +// getAcceptCallback returns the callback that is executed when a message passes the filter. +func (filter *SignatureFilter) getAcceptCallback() func(message *message.Message, peer *peer.Peer) { + filter.onAcceptCallbackMutex.RLock() + defer filter.onAcceptCallbackMutex.RUnlock() + + return filter.onAcceptCallback +} + +// getRejectCallback returns the callback that is executed when a message is blocked by the filter. +func (filter *SignatureFilter) getRejectCallback() func(message *message.Message, err error, peer *peer.Peer) { + filter.onRejectCallbackMutex.RLock() + defer filter.onRejectCallbackMutex.RUnlock() + + return filter.onRejectCallback +} + +// interface contract (allow the compiler to check if the implementation has all of the required methods). +var _ messageparser.MessageFilter = &SignatureFilter{} diff --git a/dapps/valuetransfers/packages/tangle/signature_filter_test.go b/dapps/valuetransfers/packages/tangle/signature_filter_test.go new file mode 100644 index 0000000000000000000000000000000000000000..0dabaa30650947abc298cb069d28b0adfd89909b --- /dev/null +++ b/dapps/valuetransfers/packages/tangle/signature_filter_test.go @@ -0,0 +1,175 @@ +package tangle + +import ( + "sync" + "testing" + + "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/address" + "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/address/signaturescheme" + "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/balance" + valuePayload "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/payload" + "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/transaction" + "github.com/iotaledger/goshimmer/dapps/valuetransfers/packages/wallet" + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/messagefactory" + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/messageparser" + messagePayload "github.com/iotaledger/goshimmer/packages/binary/messagelayer/payload" + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/tipselector" + "github.com/iotaledger/hive.go/autopeering/peer" + "github.com/iotaledger/hive.go/events" + "github.com/iotaledger/hive.go/identity" + "github.com/iotaledger/hive.go/kvstore/mapdb" + "github.com/iotaledger/hive.go/marshalutil" + "github.com/stretchr/testify/require" +) + +func TestSignatureFilter(t *testing.T) { + // create parser + messageParser := newSyncMessageParser(NewSignatureFilter()) + + // create helper instances + seed := wallet.NewSeed() + messageFactory := messagefactory.New(mapdb.NewMapDB(), identity.GenerateLocalIdentity(), tipselector.New(), []byte("sequenceKey")) + + // 1. test value message without signatures + { + // create unsigned transaction + tx := transaction.New( + transaction.NewInputs( + transaction.NewOutputID(seed.Address(0), transaction.GenesisID), + ), + transaction.NewOutputs(map[address.Address][]*balance.Balance{ + seed.Address(1): { + balance.New(balance.ColorIOTA, 1337), + }, + }), + ) + + // parse message bytes + accepted, _, _, err := messageParser.Parse(messageFactory.IssuePayload(valuePayload.New(valuePayload.GenesisID, valuePayload.GenesisID, tx)).Bytes(), &peer.Peer{}) + + // check results (should be rejected) + require.Equal(t, false, accepted) + require.NotNil(t, err) + require.Equal(t, "invalid transaction signatures", err.Error()) + } + + // 2. test value message with signatures + { + // create signed transaction + tx := transaction.New( + transaction.NewInputs( + transaction.NewOutputID(seed.Address(0), transaction.GenesisID), + ), + transaction.NewOutputs(map[address.Address][]*balance.Balance{ + seed.Address(1): { + balance.New(balance.ColorIOTA, 1337), + }, + }), + ) + tx.Sign(signaturescheme.ED25519(*seed.KeyPair(0))) + + // parse message bytes + accepted, _, _, err := messageParser.Parse(messageFactory.IssuePayload(valuePayload.New(valuePayload.GenesisID, valuePayload.GenesisID, tx)).Bytes(), &peer.Peer{}) + + // check results (should be accepted) + require.Equal(t, true, accepted) + require.Nil(t, err) + } + + // 3. test message with an invalid value payload + { + // create a data payload + marshalUtil := marshalutil.New(messagePayload.NewData([]byte("test")).Bytes()) + + // set the type to be a value payload + marshalUtil.WriteSeek(0) + marshalUtil.WriteUint32(valuePayload.Type) + + // parse modified bytes back into a payload object + dataPayload, err, _ := messagePayload.DataFromBytes(marshalUtil.Bytes()) + require.NoError(t, err) + + // parse message bytes + accepted, _, _, err := messageParser.Parse(messageFactory.IssuePayload(dataPayload).Bytes(), &peer.Peer{}) + + // check results (should be rejected) + require.Equal(t, false, accepted) + require.NotNil(t, err) + require.Equal(t, "invalid value message", err.Error()) + } +} + +// newSyncMessageParser creates a wrapped MessageParser that works synchronously by using a WaitGroup to wait for the +// parse result. +func newSyncMessageParser(messageFilters ...messageparser.MessageFilter) (tester *syncMessageParser) { + // initialize MessageParser + messageParser := messageparser.New() + for _, messageFilter := range messageFilters { + messageParser.AddMessageFilter(messageFilter) + } + + // create wrapped result + tester = &syncMessageParser{ + messageParser: messageParser, + } + + // setup async behavior (store result + mark WaitGroup done) + messageParser.Events.BytesRejected.Attach(events.NewClosure(func(bytes []byte, err error, peer *peer.Peer) { + tester.result = &messageParserResult{ + accepted: false, + message: nil, + peer: peer, + err: err, + } + + tester.wg.Done() + })) + messageParser.Events.MessageRejected.Attach(events.NewClosure(func(message *message.Message, err error, peer *peer.Peer) { + tester.result = &messageParserResult{ + accepted: false, + message: message, + peer: peer, + err: err, + } + + tester.wg.Done() + })) + messageParser.Events.MessageParsed.Attach(events.NewClosure(func(message *message.Message, peer *peer.Peer) { + tester.result = &messageParserResult{ + accepted: true, + message: message, + peer: peer, + err: nil, + } + + tester.wg.Done() + })) + + return +} + +// syncMessageParser is a wrapper for the MessageParser that allows to parse Messages synchronously. +type syncMessageParser struct { + messageParser *messageparser.MessageParser + result *messageParserResult + wg sync.WaitGroup +} + +// Parse parses the message bytes into a message. It either gets accepted or rejected. +func (tester *syncMessageParser) Parse(messageBytes []byte, peer *peer.Peer) (bool, *message.Message, *peer.Peer, error) { + tester.wg.Add(1) + tester.messageParser.Parse(messageBytes, peer) + tester.wg.Wait() + + return tester.result.accepted, tester.result.message, tester.result.peer, tester.result.err +} + +// messageParserResult is a struct that stores the results of a parsing operation, so we can return them after the +// WaitGroup is done waiting. +type messageParserResult struct { + accepted bool + message *message.Message + peer *peer.Peer + err error +} diff --git a/dapps/valuetransfers/packages/tangle/tangle.go b/dapps/valuetransfers/packages/tangle/tangle.go index 33772a148e94c7c492bc7f7efee450acf452d0b6..4b18130df03a1554604381b652c1ee2c49b2e2bc 100644 --- a/dapps/valuetransfers/packages/tangle/tangle.go +++ b/dapps/valuetransfers/packages/tangle/tangle.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "math" - "time" "github.com/iotaledger/hive.go/async" "github.com/iotaledger/hive.go/events" @@ -50,15 +49,15 @@ func New(store kvstore.KVStore) (tangle *Tangle) { tangle = &Tangle{ branchManager: branchmanager.New(store), - payloadStorage: osFactory.New(osPayload, osPayloadFactory, objectstorage.CacheTime(1*time.Second)), - payloadMetadataStorage: osFactory.New(osPayloadMetadata, osPayloadMetadataFactory, objectstorage.CacheTime(1*time.Second)), - missingPayloadStorage: osFactory.New(osMissingPayload, osMissingPayloadFactory, objectstorage.CacheTime(1*time.Second)), - approverStorage: osFactory.New(osApprover, osPayloadApproverFactory, objectstorage.CacheTime(1*time.Second), objectstorage.PartitionKey(payload.IDLength, payload.IDLength), objectstorage.KeysOnly(true)), - transactionStorage: osFactory.New(osTransaction, osTransactionFactory, objectstorage.CacheTime(1*time.Second), osLeakDetectionOption), - transactionMetadataStorage: osFactory.New(osTransactionMetadata, osTransactionMetadataFactory, objectstorage.CacheTime(1*time.Second), osLeakDetectionOption), - attachmentStorage: osFactory.New(osAttachment, osAttachmentFactory, objectstorage.CacheTime(1*time.Second), objectstorage.PartitionKey(transaction.IDLength, payload.IDLength), osLeakDetectionOption), - outputStorage: osFactory.New(osOutput, osOutputFactory, OutputKeyPartitions, objectstorage.CacheTime(1*time.Second), osLeakDetectionOption), - consumerStorage: osFactory.New(osConsumer, osConsumerFactory, ConsumerPartitionKeys, objectstorage.CacheTime(1*time.Second), osLeakDetectionOption), + payloadStorage: osFactory.New(osPayload, osPayloadFactory, objectstorage.CacheTime(cacheTime)), + payloadMetadataStorage: osFactory.New(osPayloadMetadata, osPayloadMetadataFactory, objectstorage.CacheTime(cacheTime)), + missingPayloadStorage: osFactory.New(osMissingPayload, osMissingPayloadFactory, objectstorage.CacheTime(cacheTime)), + approverStorage: osFactory.New(osApprover, osPayloadApproverFactory, objectstorage.CacheTime(cacheTime), objectstorage.PartitionKey(payload.IDLength, payload.IDLength), objectstorage.KeysOnly(true)), + transactionStorage: osFactory.New(osTransaction, osTransactionFactory, objectstorage.CacheTime(cacheTime), osLeakDetectionOption), + transactionMetadataStorage: osFactory.New(osTransactionMetadata, osTransactionMetadataFactory, objectstorage.CacheTime(cacheTime), osLeakDetectionOption), + attachmentStorage: osFactory.New(osAttachment, osAttachmentFactory, objectstorage.CacheTime(cacheTime), objectstorage.PartitionKey(transaction.IDLength, payload.IDLength), osLeakDetectionOption), + outputStorage: osFactory.New(osOutput, osOutputFactory, OutputKeyPartitions, objectstorage.CacheTime(cacheTime), osLeakDetectionOption), + consumerStorage: osFactory.New(osConsumer, osConsumerFactory, ConsumerPartitionKeys, objectstorage.CacheTime(cacheTime), osLeakDetectionOption), Events: *newEvents(), } @@ -171,7 +170,7 @@ func (tangle *Tangle) LoadSnapshot(snapshot map[transaction.ID]map[address.Addre for outputAddress, balances := range addressBalances { input := NewOutput(outputAddress, transactionID, branchmanager.MasterBranchID, balances) input.setSolid(true) - input.SetBranchID(branchmanager.MasterBranchID) + input.setBranchID(branchmanager.MasterBranchID) // store output and abort if the snapshot has already been loaded earlier (output exists in the database) cachedOutput, stored := tangle.outputStorage.StoreIfAbsent(input) @@ -544,7 +543,7 @@ func (tangle *Tangle) setTransactionFinalized(transactionID transaction.ID, even cachedTransactionMetadata := tangle.TransactionMetadata(transactionID) cachedTransactionMetadata.Consume(func(metadata *TransactionMetadata) { // update the finalized flag of the transaction - modified = metadata.SetFinalized(true) + modified = metadata.setFinalized(true) // only propagate the changes if the flag was modified if modified { @@ -620,33 +619,36 @@ func (tangle *Tangle) propagateRejectedToTransactions(transactionID transaction. cachedTransactionMetadata := tangle.TransactionMetadata(currentTransactionID) cachedTransactionMetadata.Consume(func(metadata *TransactionMetadata) { - if !metadata.setRejected(true) { - return - } - if metadata.setPreferred(false) { - // set outputs to be not preferred as well - tangle.Transaction(currentTransactionID).Consume(func(tx *transaction.Transaction) { - tx.Outputs().ForEach(func(address address.Address, balances []*balance.Balance) bool { - tangle.TransactionOutput(transaction.NewOutputID(address, currentTransactionID)).Consume(func(output *Output) { - output.setPreferred(false) - }) + cachedTransaction := tangle.Transaction(currentTransactionID) + cachedTransaction.Consume(func(tx *transaction.Transaction) { + if !metadata.setRejected(true) { + return + } - return true + if metadata.setPreferred(false) { + // set outputs to be not preferred as well + tangle.Transaction(currentTransactionID).Consume(func(tx *transaction.Transaction) { + tx.Outputs().ForEach(func(address address.Address, balances []*balance.Balance) bool { + tangle.TransactionOutput(transaction.NewOutputID(address, currentTransactionID)).Consume(func(output *Output) { + output.setPreferred(false) + }) + + return true + }) }) - }) - } - // if the transaction is not finalized, yet then we set it to finalized - if !metadata.Finalized() { - if _, err := tangle.setTransactionFinalized(metadata.ID(), EventSourceTangle); err != nil { - tangle.Events.Error.Trigger(err) + tangle.Events.TransactionUnpreferred.Trigger(cachedTransaction, cachedTransactionMetadata) + } + + // if the transaction is not finalized, yet then we set it to finalized + if !metadata.Finalized() { + if _, err := tangle.setTransactionFinalized(metadata.ID(), EventSourceTangle); err != nil { + tangle.Events.Error.Trigger(err) - return + return + } } - } - cachedTransaction := tangle.Transaction(currentTransactionID) - cachedTransaction.Consume(func(tx *transaction.Transaction) { // process all outputs tx.Outputs().ForEach(func(address address.Address, balances []*balance.Balance) bool { outputID := transaction.NewOutputID(address, currentTransactionID) @@ -1253,7 +1255,7 @@ func (tangle *Tangle) bookTransaction(cachedTransaction *transaction.CachedTrans } // abort if transaction was marked as solid before - if !transactionMetadata.SetSolid(true) { + if !transactionMetadata.setSolid(true) { return } @@ -1341,7 +1343,7 @@ func (tangle *Tangle) bookTransaction(cachedTransaction *transaction.CachedTrans } // book transaction into target branch - transactionMetadata.SetBranchID(targetBranch.ID()) + transactionMetadata.setBranchID(targetBranch.ID()) // create color for newly minted coins mintedColor, _, err := balance.ColorFromBytes(transactionToBook.ID().Bytes()) @@ -1370,7 +1372,8 @@ func (tangle *Tangle) bookTransaction(cachedTransaction *transaction.CachedTrans }) // fork the conflicting transactions into their own branch if a decision is still pending - if decisionPending = !finalizedConflictingSpenderFound; decisionPending { + decisionPending = !finalizedConflictingSpenderFound + if decisionPending { for consumerID, conflictingInputs := range conflictingInputsOfFirstConsumers { _, decisionFinalized, forkedErr := tangle.Fork(consumerID, conflictingInputs) if forkedErr != nil { @@ -1427,7 +1430,7 @@ func (tangle *Tangle) bookPayload(cachedPayload *payload.CachedPayload, cachedPa return } - payloadBooked = valueObjectMetadata.SetBranchID(aggregatedBranch.ID()) + payloadBooked = valueObjectMetadata.setBranchID(aggregatedBranch.ID()) return } @@ -1760,7 +1763,7 @@ func (tangle *Tangle) moveTransactionToBranch(cachedTransaction *transaction.Cac } // abort if we did not modify the branch of the transaction - if !currentTransactionMetadata.SetBranchID(targetBranch.ID()) { + if !currentTransactionMetadata.setBranchID(targetBranch.ID()) { return nil } @@ -1785,7 +1788,7 @@ func (tangle *Tangle) moveTransactionToBranch(cachedTransaction *transaction.Cac } // abort if the output was moved already - if !output.SetBranchID(targetBranch.ID()) { + if !output.setBranchID(targetBranch.ID()) { return true } @@ -1861,7 +1864,7 @@ func (tangle *Tangle) updateBranchOfValuePayloadsAttachingTransaction(transactio // try to update the metadata of the payload and queue its approvers cachedAggregatedBranch.Consume(func(branch *branchmanager.Branch) { tangle.PayloadMetadata(currentPayload.ID()).Consume(func(payloadMetadata *PayloadMetadata) { - if !payloadMetadata.SetBranchID(branch.ID()) { + if !payloadMetadata.setBranchID(branch.ID()) { return } diff --git a/dapps/valuetransfers/packages/tangle/tangle_test.go b/dapps/valuetransfers/packages/tangle/tangle_test.go index 5ee63e0fa7cef7b2c2dd9044c71588da49d77aef..ea157aac8073b1365f3365b8ba4f05ca77783254 100644 --- a/dapps/valuetransfers/packages/tangle/tangle_test.go +++ b/dapps/valuetransfers/packages/tangle/tangle_test.go @@ -59,7 +59,7 @@ func TestBookTransaction(t *testing.T) { cachedTransaction, cachedTransactionMetadata, _, _ := tangle.storeTransactionModels(valueObject) transactionMetadata := cachedTransactionMetadata.Unwrap() - transactionMetadata.SetSolid(true) + transactionMetadata.setSolid(true) transactionBooked, decisionPending, err := tangle.bookTransaction(cachedTransaction, cachedTransactionMetadata) require.NoError(t, err) @@ -257,7 +257,7 @@ func TestFork(t *testing.T) { _, cachedTransactionMetadata, _, _ := tangle.storeTransactionModels(valueObject) txMetadata := cachedTransactionMetadata.Unwrap() - txMetadata.SetFinalized(true) + txMetadata.setFinalized(true) forked, finalized, err := tangle.Fork(tx.ID(), []transaction.OutputID{}) require.NoError(t, err) @@ -334,12 +334,12 @@ func TestBookPayload(t *testing.T) { cachedPayload, cachedMetadata, _ := tangle.storePayload(valueObject) metadata := cachedMetadata.Unwrap() - metadata.SetBranchID(branchmanager.BranchID{1}) - metadata.SetBranchID(branchmanager.BranchID{2}) + metadata.setBranchID(branchmanager.BranchID{1}) + metadata.setBranchID(branchmanager.BranchID{2}) _, cachedTransactionMetadata, _, _ := tangle.storeTransactionModels(valueObject) txMetadata := cachedTransactionMetadata.Unwrap() - txMetadata.SetBranchID(branchmanager.BranchID{1}) + txMetadata.setBranchID(branchmanager.BranchID{1}) payloadBooked, err := tangle.bookPayload(cachedPayload.Retain(), cachedMetadata.Retain(), cachedTransactionMetadata.Retain()) defer func() { @@ -359,12 +359,12 @@ func TestBookPayload(t *testing.T) { cachedPayload, cachedMetadata, _ := tangle.storePayload(valueObject) metadata := cachedMetadata.Unwrap() - metadata.SetBranchID(branchmanager.BranchID{1}) - metadata.SetBranchID(branchmanager.BranchID{1}) + metadata.setBranchID(branchmanager.BranchID{1}) + metadata.setBranchID(branchmanager.BranchID{1}) _, cachedTransactionMetadata, _, _ := tangle.storeTransactionModels(valueObject) txMetadata := cachedTransactionMetadata.Unwrap() - txMetadata.SetBranchID(branchmanager.BranchID{1}) + txMetadata.setBranchID(branchmanager.BranchID{1}) payloadBooked, err := tangle.bookPayload(cachedPayload.Retain(), cachedMetadata.Retain(), cachedTransactionMetadata.Retain()) defer func() { @@ -1016,8 +1016,8 @@ func TestCheckTransactionSolidity(t *testing.T) { tangle := New(mapdb.NewMapDB()) tx := createDummyTransaction() txMetadata := NewTransactionMetadata(tx.ID()) - txMetadata.SetSolid(true) - txMetadata.SetBranchID(branchmanager.MasterBranchID) + txMetadata.setSolid(true) + txMetadata.setBranchID(branchmanager.MasterBranchID) solid, consumedBranches, err := tangle.checkTransactionSolidity(tx, txMetadata) assert.True(t, solid) @@ -1187,7 +1187,7 @@ func TestPayloadBranchID(t *testing.T) { expectedBranchID := branchmanager.BranchID{1} cachedMetadata.Consume(func(metadata *PayloadMetadata) { metadata.setSolid(true) - metadata.SetBranchID(expectedBranchID) + metadata.setBranchID(expectedBranchID) }) branchID := tangle.payloadBranchID(valueObject.ID()) @@ -1216,7 +1216,7 @@ func TestCheckPayloadSolidity(t *testing.T) { valueObject := payload.New(payload.GenesisID, payload.GenesisID, createDummyTransaction()) metadata := NewPayloadMetadata(valueObject.ID()) metadata.setSolid(true) - metadata.SetBranchID(branchmanager.MasterBranchID) + metadata.setBranchID(branchmanager.MasterBranchID) transactionBranches := []branchmanager.BranchID{branchmanager.MasterBranchID} solid, err := tangle.payloadBecameNewlySolid(valueObject, metadata, transactionBranches) @@ -1239,7 +1239,7 @@ func TestCheckPayloadSolidity(t *testing.T) { { setParent := func(payloadMetadata *PayloadMetadata) { payloadMetadata.setSolid(true) - payloadMetadata.SetBranchID(branchmanager.MasterBranchID) + payloadMetadata.setBranchID(branchmanager.MasterBranchID) } valueObject := payload.New(storeParentPayloadWithMetadataFunc(t, tangle, setParent), storeParentPayloadWithMetadataFunc(t, tangle, setParent), createDummyTransaction()) @@ -1275,11 +1275,11 @@ func TestCheckPayloadSolidity(t *testing.T) { defer cachedBranch3.Release() setParent1 := func(payloadMetadata *PayloadMetadata) { payloadMetadata.setSolid(true) - payloadMetadata.SetBranchID(branchmanager.BranchID{2}) + payloadMetadata.setBranchID(branchmanager.BranchID{2}) } setParent2 := func(payloadMetadata *PayloadMetadata) { payloadMetadata.setSolid(true) - payloadMetadata.SetBranchID(branchmanager.BranchID{3}) + payloadMetadata.setBranchID(branchmanager.BranchID{3}) } valueObject := payload.New(storeParentPayloadWithMetadataFunc(t, tangle, setParent1), storeParentPayloadWithMetadataFunc(t, tangle, setParent2), createDummyTransaction()) @@ -1300,11 +1300,11 @@ func TestCheckPayloadSolidity(t *testing.T) { defer cachedBranch3.Release() setParent1 := func(payloadMetadata *PayloadMetadata) { payloadMetadata.setSolid(true) - payloadMetadata.SetBranchID(branchmanager.MasterBranchID) + payloadMetadata.setBranchID(branchmanager.MasterBranchID) } setParent2 := func(payloadMetadata *PayloadMetadata) { payloadMetadata.setSolid(true) - payloadMetadata.SetBranchID(branchmanager.BranchID{3}) + payloadMetadata.setBranchID(branchmanager.BranchID{3}) } valueObject := payload.New(storeParentPayloadWithMetadataFunc(t, tangle, setParent1), storeParentPayloadWithMetadataFunc(t, tangle, setParent2), createDummyTransaction()) diff --git a/dapps/valuetransfers/packages/tangle/transactionmetadata.go b/dapps/valuetransfers/packages/tangle/transactionmetadata.go index 01b161613ae30fcb8d4a98e2e337c3a9723f370e..106075bafe490af586673c960e379a8b8430a4dc 100644 --- a/dapps/valuetransfers/packages/tangle/transactionmetadata.go +++ b/dapps/valuetransfers/packages/tangle/transactionmetadata.go @@ -113,8 +113,8 @@ func (transactionMetadata *TransactionMetadata) BranchID() branchmanager.BranchI return transactionMetadata.branchID } -// SetBranchID is the setter for the branch id. It returns true if the value of the flag has been updated. -func (transactionMetadata *TransactionMetadata) SetBranchID(branchID branchmanager.BranchID) (modified bool) { +// setBranchID is the setter for the branch id. It returns true if the value of the flag has been updated. +func (transactionMetadata *TransactionMetadata) setBranchID(branchID branchmanager.BranchID) (modified bool) { transactionMetadata.branchIDMutex.RLock() if transactionMetadata.branchID == branchID { transactionMetadata.branchIDMutex.RUnlock() @@ -150,9 +150,9 @@ func (transactionMetadata *TransactionMetadata) Solid() (result bool) { return } -// SetSolid marks a Transaction as either solid or not solid. +// setSolid marks a Transaction as either solid or not solid. // It returns true if the solid flag was changes and automatically updates the solidificationTime as well. -func (transactionMetadata *TransactionMetadata) SetSolid(solid bool) (modified bool) { +func (transactionMetadata *TransactionMetadata) setSolid(solid bool) (modified bool) { transactionMetadata.solidMutex.RLock() if transactionMetadata.solid != solid { transactionMetadata.solidMutex.RUnlock() @@ -213,9 +213,9 @@ func (transactionMetadata *TransactionMetadata) setPreferred(preferred bool) (mo return } -// SetFinalized allows us to set the finalized flag on the transactions. Finalized transactions will not be forked when +// setFinalized allows us to set the finalized flag on the transactions. Finalized transactions will not be forked when // a conflict arrives later. -func (transactionMetadata *TransactionMetadata) SetFinalized(finalized bool) (modified bool) { +func (transactionMetadata *TransactionMetadata) setFinalized(finalized bool) (modified bool) { transactionMetadata.finalizedMutex.RLock() if transactionMetadata.finalized == finalized { transactionMetadata.finalizedMutex.RUnlock() diff --git a/packages/binary/messagelayer/payload/data.go b/packages/binary/messagelayer/payload/data.go index 6103a26426185ffd97283d354c26aa1b2aa82a23..9db9cc640b6e9f5114f94a5de8d042e640cc866a 100644 --- a/packages/binary/messagelayer/payload/data.go +++ b/packages/binary/messagelayer/payload/data.go @@ -91,6 +91,7 @@ func (dataPayload *Data) Unmarshal(data []byte) (err error) { func (dataPayload *Data) String() string { return stringify.Struct("Data", + stringify.StructField("type", int(dataPayload.Type())), stringify.StructField("data", string(dataPayload.Data())), ) } diff --git a/packages/binary/messagelayer/payload/payload.go b/packages/binary/messagelayer/payload/payload.go index a918ed1b7e6e6b5aa1fa7876ec836ecf1feb0825..f0f013e6c45c35ca70b0b082da9b774c1e62946d 100644 --- a/packages/binary/messagelayer/payload/payload.go +++ b/packages/binary/messagelayer/payload/payload.go @@ -46,9 +46,15 @@ func FromBytes(bytes []byte) (result Payload, consumedBytes int, err error) { return } + readOffset := marshalUtil.ReadOffset() result, err = GetUnmarshaler(payloadType)(payloadBytes) if err != nil { - return + // fallback to the generic unmarshaler if registered type fails to unmarshal + marshalUtil.ReadSeek(readOffset) + result, err = GenericPayloadUnmarshalerFactory(payloadType)(payloadBytes) + if err != nil { + return + } } // return the number of bytes we processed diff --git a/packages/binary/messagelayer/tangle/tangle.go b/packages/binary/messagelayer/tangle/tangle.go index 26158982caca8548f07f2ec2b69d35856150a053..1133c98aa6f36e9c5cc705b0806ec59c98c2964b 100644 --- a/packages/binary/messagelayer/tangle/tangle.go +++ b/packages/binary/messagelayer/tangle/tangle.go @@ -21,6 +21,8 @@ const ( // MissingCheckInterval is the interval on which it is checked whether a missing // message is still missing. MissingCheckInterval = 5 * time.Second + + cacheTime = 20 * time.Second ) // Tangle represents the base layer of messages. @@ -55,10 +57,10 @@ func New(store kvstore.KVStore) (result *Tangle) { result = &Tangle{ shutdown: make(chan struct{}), - messageStorage: osFactory.New(PrefixMessage, messageFactory, objectstorage.CacheTime(10*time.Second), objectstorage.LeakDetectionEnabled(false)), - messageMetadataStorage: osFactory.New(PrefixMessageMetadata, MessageMetadataFromStorageKey, objectstorage.CacheTime(10*time.Second), objectstorage.LeakDetectionEnabled(false)), - approverStorage: osFactory.New(PrefixApprovers, approverFactory, objectstorage.CacheTime(10*time.Second), objectstorage.PartitionKey(message.IdLength, message.IdLength), objectstorage.LeakDetectionEnabled(false)), - missingMessageStorage: osFactory.New(PrefixMissingMessage, missingMessageFactory, objectstorage.CacheTime(10*time.Second), objectstorage.LeakDetectionEnabled(false)), + messageStorage: osFactory.New(PrefixMessage, messageFactory, objectstorage.CacheTime(cacheTime), objectstorage.LeakDetectionEnabled(false)), + messageMetadataStorage: osFactory.New(PrefixMessageMetadata, MessageMetadataFromStorageKey, objectstorage.CacheTime(cacheTime), objectstorage.LeakDetectionEnabled(false)), + approverStorage: osFactory.New(PrefixApprovers, approverFactory, objectstorage.CacheTime(cacheTime), objectstorage.PartitionKey(message.IdLength, message.IdLength), objectstorage.LeakDetectionEnabled(false)), + missingMessageStorage: osFactory.New(PrefixMissingMessage, missingMessageFactory, objectstorage.CacheTime(cacheTime), objectstorage.LeakDetectionEnabled(false)), Events: *newEvents(), } diff --git a/packages/pow/pow.go b/packages/pow/pow.go new file mode 100644 index 0000000000000000000000000000000000000000..035086ecef88b65a712687d648fafae1c046a54d --- /dev/null +++ b/packages/pow/pow.go @@ -0,0 +1,151 @@ +package pow + +import ( + "context" + "encoding/binary" + "errors" + "hash" + "math" + "math/big" + "sync" + "sync/atomic" +) + +// errors returned by the PoW +var ( + ErrCancelled = errors.New("canceled") + ErrDone = errors.New("done") +) + +// Hash identifies a cryptographic hash function that is implemented in another package. +type Hash interface { + // Size returns the length, in bytes, of a digest resulting from the given hash function. + Size() int + // New returns a new hash.Hash calculating the given hash function. + New() hash.Hash +} + +// The Worker provides PoW functionality using an arbitrary hash function. +type Worker struct { + hash Hash + numWorkers int +} + +// New creates a new PoW based on the provided hash. +// The optional numWorkers specifies how many go routines are used to mine. +func New(hash Hash, numWorkers ...int) *Worker { + w := &Worker{ + hash: hash, + numWorkers: 1, + } + if len(numWorkers) > 0 { + w.numWorkers = numWorkers[0] + } + return w +} + +// Mine performs the PoW. +// It appends the 8-byte nonce to the provided msg and tries to find a nonce +// until the target number of leading zeroes is reached. +// The computation can be be canceled using the provided ctx. +func (w *Worker) Mine(ctx context.Context, msg []byte, target int) (uint64, error) { + var ( + done uint32 + counter uint64 + wg sync.WaitGroup + results = make(chan uint64, w.numWorkers) + closing = make(chan struct{}) + ) + + // stop when the context has been canceled + go func() { + select { + case <-ctx.Done(): + atomic.StoreUint32(&done, 1) + case <-closing: + return + } + }() + + workerWidth := math.MaxUint64 / uint64(w.numWorkers) + for i := 0; i < w.numWorkers; i++ { + startNonce := uint64(i) * workerWidth + wg.Add(1) + go func() { + defer wg.Done() + + nonce, workerErr := w.worker(msg, startNonce, target, &done, &counter) + if workerErr != nil { + return + } + atomic.StoreUint32(&done, 1) + results <- nonce + }() + } + wg.Wait() + close(results) + close(closing) + + nonce, ok := <-results + if !ok { + return 0, ErrCancelled + } + return nonce, nil +} + +// LeadingZeros returns the number of leading zeros in the digest of the given data. +func (w *Worker) LeadingZeros(data []byte) (int, error) { + digest, err := w.sum(data) + if err != nil { + return 0, err + } + asAnInt := new(big.Int).SetBytes(digest) + return 8*w.hash.Size() - asAnInt.BitLen(), nil +} + +// LeadingZerosWithNonce returns the number of leading zeros in the digest +// after the provided 8-byte nonce is appended to msg. +func (w *Worker) LeadingZerosWithNonce(msg []byte, nonce uint64) (int, error) { + buf := make([]byte, len(msg)+8) + copy(buf, msg) + binary.BigEndian.PutUint64(buf[len(msg):], nonce) + + return w.LeadingZeros(buf) +} + +func (w *Worker) worker(msg []byte, startNonce uint64, target int, done *uint32, counter *uint64) (uint64, error) { + buf := make([]byte, len(msg)+8) + copy(buf, msg) + asAnInt := new(big.Int) + + for nonce := startNonce; ; { + if atomic.LoadUint32(done) != 0 { + break + } + atomic.AddUint64(counter, 1) + + // write nonce in the buffer + binary.BigEndian.PutUint64(buf[len(msg):], nonce) + + digest, err := w.sum(buf) + if err != nil { + return 0, err + } + asAnInt.SetBytes(digest) + leadingZeros := 8*w.hash.Size() - asAnInt.BitLen() + if leadingZeros >= target { + return nonce, nil + } + + nonce++ + } + return 0, ErrDone +} + +func (w *Worker) sum(data []byte) ([]byte, error) { + h := w.hash.New() + if _, err := h.Write(data); err != nil { + return nil, err + } + return h.Sum(nil), nil +} diff --git a/packages/pow/pow_test.go b/packages/pow/pow_test.go new file mode 100644 index 0000000000000000000000000000000000000000..33e360161da68b9d68f214bfb1131e4be3937601 --- /dev/null +++ b/packages/pow/pow_test.go @@ -0,0 +1,78 @@ +package pow + +import ( + "context" + "crypto" + "math" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + _ "golang.org/x/crypto/sha3" // required by crypto.SHA3_256 +) + +const ( + workers = 2 + target = 10 +) + +var testWorker = New(crypto.SHA3_256, workers) + +func TestWorker_Work(t *testing.T) { + nonce, err := testWorker.Mine(context.Background(), nil, target) + require.NoError(t, err) + difficulty, err := testWorker.LeadingZerosWithNonce(nil, nonce) + assert.GreaterOrEqual(t, difficulty, target) + assert.NoError(t, err) +} + +func TestWorker_Validate(t *testing.T) { + tests := []*struct { + msg []byte + nonce uint64 + expLeadingZeros int + expErr error + }{ + {msg: nil, nonce: 0, expLeadingZeros: 1, expErr: nil}, + {msg: nil, nonce: 13176245766944605079, expLeadingZeros: 29, expErr: nil}, + {msg: make([]byte, 1024), nonce: 0, expLeadingZeros: 4, expErr: nil}, + } + + w := &Worker{hash: crypto.SHA3_256} + for _, tt := range tests { + zeros, err := w.LeadingZerosWithNonce(tt.msg, tt.nonce) + assert.Equal(t, tt.expLeadingZeros, zeros) + assert.Equal(t, tt.expErr, err) + } +} + +func TestWorker_Cancel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var err error + go func() { + _, err = testWorker.Mine(ctx, nil, math.MaxInt32) + }() + time.Sleep(10 * time.Millisecond) + cancel() + + assert.Eventually(t, func() bool { return err == ErrCancelled }, time.Second, 10*time.Millisecond) +} + +func BenchmarkWorker(b *testing.B) { + var ( + buf = make([]byte, 1024) + done uint32 + counter uint64 + ) + go func() { + _, _ = testWorker.worker(buf, 0, math.MaxInt32, &done, &counter) + }() + b.ResetTimer() + for atomic.LoadUint64(&counter) < uint64(b.N) { + } + atomic.StoreUint32(&done, 1) +} diff --git a/pluginmgr/research/plugins.go b/pluginmgr/research/plugins.go index 3102284d0d3dda315afd9f70b30802d2e8b36552..1275898c7d35401f7db62d3c804f88b50e8bd019 100644 --- a/pluginmgr/research/plugins.go +++ b/pluginmgr/research/plugins.go @@ -1,6 +1,7 @@ package research import ( + "github.com/iotaledger/goshimmer/dapps/networkdelay" analysisclient "github.com/iotaledger/goshimmer/plugins/analysis/client" analysisdashboard "github.com/iotaledger/goshimmer/plugins/analysis/dashboard" analysisserver "github.com/iotaledger/goshimmer/plugins/analysis/server" @@ -15,4 +16,5 @@ var PLUGINS = node.Plugins( analysisclient.Plugin, analysisdashboard.Plugin, prometheus.Plugin, + networkdelay.App, ) diff --git a/plugins/remotelog/plugin.go b/plugins/remotelog/plugin.go index 32a507786ca41e984517e8f103e64030e86b0045..298038d337d303007ce73879fe92d942e3006791 100644 --- a/plugins/remotelog/plugin.go +++ b/plugins/remotelog/plugin.go @@ -5,12 +5,10 @@ package remotelog import ( - "encoding/json" - "fmt" - "net" "os" "path/filepath" "runtime" + "sync" "time" "github.com/iotaledger/goshimmer/packages/shutdown" @@ -26,17 +24,6 @@ import ( "gopkg.in/src-d/go-git.v4" ) -type logMessage struct { - Version string `json:"version"` - GitHead string `json:"gitHead,omitempty"` - GitBranch string `json:"gitBranch,omitempty"` - NodeID string `json:"nodeId"` - Level string `json:"level"` - Name string `json:"name"` - Msg string `json:"msg"` - Timestamp time.Time `json:"timestamp"` -} - const ( // CfgLoggerRemotelogServerAddress defines the config flag of the server address. CfgLoggerRemotelogServerAddress = "logger.remotelog.serverAddress" @@ -44,17 +31,21 @@ const ( CfgDisableEvents = "logger.disableEvents" // PluginName is the name of the remote log plugin. PluginName = "RemoteLog" + + remoteLogType = "log" ) var ( // Plugin is the plugin instance of the remote plugin instance. Plugin = node.NewPlugin(PluginName, node.Disabled, configure, run) log *logger.Logger - conn net.Conn myID string myGitHead string myGitBranch string workerPool *workerpool.WorkerPool + + remoteLogger *RemoteLoggerConn + remoteLoggerOnce sync.Once ) func init() { @@ -69,12 +60,8 @@ func configure(plugin *node.Plugin) { return } - c, err := net.Dial("udp", config.Node.GetString(CfgLoggerRemotelogServerAddress)) - if err != nil { - log.Fatalf("Could not create UDP socket to '%s'. %v", config.Node.GetString(CfgLoggerRemotelogServerAddress), err) - return - } - conn = c + // initialize remote logger connection + RemoteLogger() if local.GetInstance() != nil { myID = local.GetInstance().ID().String() @@ -117,9 +104,10 @@ func sendLogMsg(level logger.Level, name string, msg string) { name, msg, time.Now(), + remoteLogType, } - b, _ := json.Marshal(m) - fmt.Fprint(conn, string(b)) + + _ = RemoteLogger().Send(m) } func getGitInfo() { @@ -159,3 +147,30 @@ func getGitDir() string { return gitDir } + +// RemoteLogger represents a connection to our remote log server. +func RemoteLogger() *RemoteLoggerConn { + remoteLoggerOnce.Do(func() { + r, err := newRemoteLoggerConn(config.Node.GetString(CfgLoggerRemotelogServerAddress)) + if err != nil { + log.Fatal(err) + return + } + + remoteLogger = r + }) + + return remoteLogger +} + +type logMessage struct { + Version string `json:"version"` + GitHead string `json:"gitHead,omitempty"` + GitBranch string `json:"gitBranch,omitempty"` + NodeID string `json:"nodeId"` + Level string `json:"level"` + Name string `json:"name"` + Msg string `json:"msg"` + Timestamp time.Time `json:"timestamp"` + Type string `json:"type"` +} diff --git a/plugins/remotelog/remotelogger.go b/plugins/remotelog/remotelogger.go new file mode 100644 index 0000000000000000000000000000000000000000..d2fab8432ae3d28c454a4126eeff8cfc7ede8413 --- /dev/null +++ b/plugins/remotelog/remotelogger.go @@ -0,0 +1,35 @@ +package remotelog + +import ( + "encoding/json" + "fmt" + "net" +) + +// RemoteLoggerConn is a wrapper for a connection to our RemoteLog server. +type RemoteLoggerConn struct { + conn net.Conn +} + +func newRemoteLoggerConn(address string) (*RemoteLoggerConn, error) { + c, err := net.Dial("udp", address) + if err != nil { + return nil, fmt.Errorf("could not create UDP socket to '%s'. %v", address, err) + } + + return &RemoteLoggerConn{conn: c}, nil +} + +// Send sends a message on the RemoteLoggers connection. +func (r *RemoteLoggerConn) Send(msg interface{}) error { + b, err := json.Marshal(msg) + if err != nil { + return err + } + _, err = r.conn.Write(b) + if err != nil { + return err + } + + return nil +} diff --git a/plugins/remotelog/server/.env b/plugins/remotelog/server/.env deleted file mode 100644 index 68aaea9a63f638a31870bb929bbc323409a12ebc..0000000000000000000000000000000000000000 --- a/plugins/remotelog/server/.env +++ /dev/null @@ -1 +0,0 @@ -ELK_VERSION=7.5.2 \ No newline at end of file diff --git a/plugins/remotelog/server/.env.default b/plugins/remotelog/server/.env.default new file mode 100644 index 0000000000000000000000000000000000000000..89b017525eecb4a7a0f397ea8bfee33b8bf05ad8 --- /dev/null +++ b/plugins/remotelog/server/.env.default @@ -0,0 +1,3 @@ +ELK_VERSION=7.5.2 +VIRTUAL_HOST= +LETSENCRYPT_HOST= \ No newline at end of file diff --git a/plugins/remotelog/server/config/logstash/pipeline/logstash.conf b/plugins/remotelog/server/config/logstash/pipeline/logstash.conf index fcc2ccbc0a770d42433a8323a5664fa956f20c8c..c5d451334c6e63916d5aa88a49fa292cc327a40d 100644 --- a/plugins/remotelog/server/config/logstash/pipeline/logstash.conf +++ b/plugins/remotelog/server/config/logstash/pipeline/logstash.conf @@ -20,7 +20,15 @@ filter { } output { - elasticsearch { - hosts => "elasticsearch:9200" - } +# stdout {codec => rubydebug} + if [log][type] == "networkdelay" { + elasticsearch { + hosts => "elasticsearch:9200" + index => "networkdelay" + } + } else { + elasticsearch { + hosts => "elasticsearch:9200" + } + } } \ No newline at end of file diff --git a/plugins/remotelog/server/docker-compose.yml b/plugins/remotelog/server/docker-compose.yml index 69ed87d88d86cb6f112359e7de0528274730f6e7..02784c0051b36d206344b8935e06180e801881a0 100644 --- a/plugins/remotelog/server/docker-compose.yml +++ b/plugins/remotelog/server/docker-compose.yml @@ -4,7 +4,9 @@ services: elasticsearch: container_name: elasticsearch image: docker.elastic.co/elasticsearch/elasticsearch:${ELK_VERSION} + restart: always volumes: + - "/etc/localtime:/etc/localtime:ro" - type: bind source: ./config/elasticsearch.yml target: /usr/share/elasticsearch/config/elasticsearch.yml @@ -23,7 +25,9 @@ services: logstash: container_name: logstash image: docker.elastic.co/logstash/logstash:${ELK_VERSION} + restart: always volumes: + - "/etc/localtime:/etc/localtime:ro" - type: bind source: ./config/logstash/logstash.yml target: /usr/share/logstash/config/logstash.yml @@ -44,13 +48,19 @@ services: kibana: container_name: kibana image: docker.elastic.co/kibana/kibana:${ELK_VERSION} + restart: always volumes: + - "/etc/localtime:/etc/localtime:ro" - type: bind source: ./config/kibana.yml target: /usr/share/kibana/config/kibana.yml read_only: true ports: - - "5601:5601" + - "127.0.0.1:5601:5601" + environment: + - "VIRTUAL_HOST=${VIRTUAL_HOST}" + - "LETSENCRYPT_HOST=${LETSENCRYPT_HOST}" + - "VIRTUAL_PORT=5601" networks: - elk depends_on: