diff --git a/main.go b/main.go index c51e880060dd135c8f4aa88b159bff8bf0595a96..fddd23288bdbf73e296e1934a4050aeb87cc4c0b 100644 --- a/main.go +++ b/main.go @@ -1,13 +1,8 @@ package main import ( - "fmt" - "net/http" _ "net/http/pprof" - "github.com/mr-tron/base58" - - "github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction" "github.com/iotaledger/goshimmer/plugins/analysis" "github.com/iotaledger/goshimmer/plugins/autopeering" "github.com/iotaledger/goshimmer/plugins/banner" @@ -31,14 +26,7 @@ import ( ) func main() { - go http.ListenAndServe("localhost:6061", nil) // pprof Server for Debbuging Mutexes - - testTxId := transaction.NewId([]byte("Test")) - - fmt.Println(len(base58.Encode(transaction.EmptyId[:]))) - fmt.Println(base58.Encode(transaction.EmptyId[:])) - fmt.Println(len(base58.Encode(testTxId[:]))) - fmt.Println(base58.Encode(testTxId[:])) + //go http.ListenAndServe("localhost:6061", nil) // pprof Server for Debbuging Mutexes node.Run( node.Plugins( @@ -47,12 +35,12 @@ func main() { logger.PLUGIN, cli.PLUGIN, remotelog.PLUGIN, + gracefulshutdown.PLUGIN, autopeering.PLUGIN, tangle.PLUGIN, gossip.PLUGIN, portcheck.PLUGIN, - gracefulshutdown.PLUGIN, analysis.PLUGIN, metrics.PLUGIN, diff --git a/packages/binary/marshalutil/marshalutil.bytes.go b/packages/binary/marshalutil/marshalutil.bytes.go new file mode 100644 index 0000000000000000000000000000000000000000..5f2c298fce9440247db38a98e8d1326b8e56a921 --- /dev/null +++ b/packages/binary/marshalutil/marshalutil.bytes.go @@ -0,0 +1,26 @@ +package marshalutil + +func (util *MarshalUtil) WriteBytes(bytes []byte) { + writeEndOffset := util.expandWriteCapacity(len(bytes)) + + copy(util.bytes[util.writeOffset:writeEndOffset], bytes) + + util.WriteSeek(writeEndOffset) +} + +func (util *MarshalUtil) ReadBytes(length int) ([]byte, error) { + readEndOffset, err := util.checkReadCapacity(length) + if err != nil { + return nil, err + } + + defer util.ReadSeek(readEndOffset) + + return util.bytes[util.readOffset:readEndOffset], nil +} + +func (util *MarshalUtil) ReadRemainingBytes() []byte { + defer util.ReadSeek(util.size) + + return util.bytes[util.readOffset:] +} diff --git a/packages/binary/marshalutil/marshalutil.go b/packages/binary/marshalutil/marshalutil.go new file mode 100644 index 0000000000000000000000000000000000000000..7218e54db052aec58d59e42e85c74503551e5e94 --- /dev/null +++ b/packages/binary/marshalutil/marshalutil.go @@ -0,0 +1,73 @@ +package marshalutil + +import ( + "fmt" +) + +type MarshalUtil struct { + bytes []byte + readOffset int + writeOffset int + size int +} + +func New(args ...interface{}) *MarshalUtil { + switch argsCount := len(args); argsCount { + case 0: + return &MarshalUtil{ + bytes: make([]byte, 1024), + size: 0, + } + case 1: + switch param := args[0].(type) { + case int: + return &MarshalUtil{ + bytes: make([]byte, param), + size: param, + } + case []byte: + return &MarshalUtil{ + bytes: param, + size: len(param), + } + default: + panic(fmt.Errorf("illegal argument type %T in marshalutil.New(...)", param)) + } + default: + panic(fmt.Errorf("illegal argument count %d in marshalutil.New(...)", argsCount)) + } +} + +func (util *MarshalUtil) WriteSeek(offset int) { + util.writeOffset = offset +} + +func (util *MarshalUtil) ReadSeek(offset int) { + util.readOffset = offset +} + +func (util *MarshalUtil) Bytes() []byte { + return util.bytes[:util.size] +} + +func (util *MarshalUtil) checkReadCapacity(length int) (readEndOffset int, err error) { + readEndOffset = util.readOffset + length + + if readEndOffset > util.size { + err = fmt.Errorf("tried to read %d bytes from %d bytes input", readEndOffset, util.size) + } + + return +} + +func (util *MarshalUtil) expandWriteCapacity(length int) (writeEndOffset int) { + writeEndOffset = util.writeOffset + length + + if writeEndOffset > util.size { + extendedBytes := make([]byte, writeEndOffset-util.size) + util.bytes = append(util.bytes, extendedBytes...) + util.size = writeEndOffset + } + + return +} diff --git a/packages/binary/marshalutil/marshalutil.int64.go b/packages/binary/marshalutil/marshalutil.int64.go new file mode 100644 index 0000000000000000000000000000000000000000..5f1e55754abe159ab0df532234f5b8535eb77eb6 --- /dev/null +++ b/packages/binary/marshalutil/marshalutil.int64.go @@ -0,0 +1,26 @@ +package marshalutil + +import ( + "encoding/binary" +) + +const INT64_SIZE = 8 + +func (util *MarshalUtil) WriteInt64(value int64) { + writeEndOffset := util.expandWriteCapacity(INT64_SIZE) + + binary.LittleEndian.PutUint64(util.bytes[util.writeOffset:writeEndOffset], uint64(value)) + + util.WriteSeek(writeEndOffset) +} + +func (util *MarshalUtil) ReadInt64() (int64, error) { + readEndOffset, err := util.checkReadCapacity(INT64_SIZE) + if err != nil { + return 0, err + } + + defer util.ReadSeek(readEndOffset) + + return int64(binary.LittleEndian.Uint64(util.bytes[util.readOffset:readEndOffset])), nil +} diff --git a/packages/binary/marshalutil/marshalutil.uint64.go b/packages/binary/marshalutil/marshalutil.uint64.go new file mode 100644 index 0000000000000000000000000000000000000000..0bb33119de4cf8b337998ea119f2bd138ef977e3 --- /dev/null +++ b/packages/binary/marshalutil/marshalutil.uint64.go @@ -0,0 +1,24 @@ +package marshalutil + +import "encoding/binary" + +const UINT64_SIZE = 8 + +func (util *MarshalUtil) WriteUint64(value uint64) { + writeEndOffset := util.expandWriteCapacity(UINT64_SIZE) + + binary.LittleEndian.PutUint64(util.bytes[util.writeOffset:writeEndOffset], value) + + util.WriteSeek(writeEndOffset) +} + +func (util *MarshalUtil) ReadUint64() (uint64, error) { + readEndOffset, err := util.checkReadCapacity(UINT64_SIZE) + if err != nil { + return 0, err + } + + defer util.ReadSeek(readEndOffset) + + return binary.LittleEndian.Uint64(util.bytes[util.readOffset:readEndOffset]), nil +} diff --git a/packages/binary/marshalutil/marshalutil_test.go b/packages/binary/marshalutil/marshalutil_test.go new file mode 100644 index 0000000000000000000000000000000000000000..4b38f69ceb12747fff314784bde35a9683f120e1 --- /dev/null +++ b/packages/binary/marshalutil/marshalutil_test.go @@ -0,0 +1,21 @@ +package marshalutil + +import ( + "fmt" + "testing" +) + +func Test(t *testing.T) { + util := New(1) + + util.WriteBytes(make([]byte, UINT64_SIZE)) + util.WriteInt64(-12) + util.WriteUint64(38) + + fmt.Println(util.ReadBytes(UINT64_SIZE)) + fmt.Println(util.ReadInt64()) + fmt.Println(util.ReadUint64()) + fmt.Println(util.ReadUint64()) + + fmt.Println(util) +} diff --git a/packages/binary/spammer/spammer.go b/packages/binary/spammer/spammer.go index 12d92d4c22a0f9a762f0d69a1cb4255d3169cc1d..945393339b7f04c57810cb224fa7d23660a81ee2 100644 --- a/packages/binary/spammer/spammer.go +++ b/packages/binary/spammer/spammer.go @@ -1,7 +1,8 @@ package spammer import ( - "sync" + "fmt" + "sync/atomic" "time" "github.com/iotaledger/hive.go/types" @@ -17,8 +18,7 @@ type Spammer struct { transactionParser *transactionparser.TransactionParser tipSelector *tipselector.TipSelector - running bool - startStopMutex sync.Mutex + processId int64 shutdownSignal chan types.Empty } @@ -31,96 +31,69 @@ func New(transactionParser *transactionparser.TransactionParser, tipSelector *ti } func (spammer *Spammer) Start(tps int) { - spammer.startStopMutex.Lock() - defer spammer.startStopMutex.Unlock() - - if !spammer.running { - spammer.running = true - - go spammer.run(tps) - } + go spammer.run(tps, atomic.AddInt64(&spammer.processId, 1)) } func (spammer *Spammer) Burst(transactions int) { - spammer.startStopMutex.Lock() - defer spammer.startStopMutex.Unlock() - - if !spammer.running { - spammer.running = true - - go spammer.sendBurst(transactions) - } + go spammer.sendBurst(transactions, atomic.AddInt64(&spammer.processId, 1)) } func (spammer *Spammer) Shutdown() { - spammer.startStopMutex.Lock() - defer spammer.startStopMutex.Unlock() - - if spammer.running { - spammer.running = false - - spammer.shutdownSignal <- types.Void - } + atomic.AddInt64(&spammer.processId, 1) } -func (spammer *Spammer) run(tps int) { +func (spammer *Spammer) run(tps int, processId int64) { + fmt.Println(processId) currentSentCounter := 0 start := time.Now() for { - - select { - case <-spammer.shutdownSignal: + if atomic.LoadInt64(&spammer.processId) != processId { return + } - default: - trunkTransactionId, branchTransactionId := spammer.tipSelector.GetTips() - spammer.transactionParser.Parse( - transaction.New(trunkTransactionId, branchTransactionId, identity.Generate(), data.New([]byte("SPAM"))).GetBytes(), - nil, - ) - - currentSentCounter++ + trunkTransactionId, branchTransactionId := spammer.tipSelector.GetTips() + spammer.transactionParser.Parse( + transaction.New(trunkTransactionId, branchTransactionId, identity.Generate(), data.New([]byte("SPAM"))).GetBytes(), + nil, + ) - // rate limit to the specified TPS - if currentSentCounter >= tps { - duration := time.Since(start) - if duration < time.Second { - time.Sleep(time.Second - duration) - } + currentSentCounter++ - start = time.Now() - currentSentCounter = 0 + // rate limit to the specified TPS + if currentSentCounter >= tps { + duration := time.Since(start) + if duration < time.Second { + time.Sleep(time.Second - duration) } + + start = time.Now() + currentSentCounter = 0 } } } -func (spammer *Spammer) sendBurst(transactions int) { +func (spammer *Spammer) sendBurst(transactions int, processId int64) { spammingIdentity := identity.Generate() previousTransactionId := transaction.EmptyId burstBuffer := make([][]byte, transactions) for i := 0; i < transactions; i++ { - select { - case <-spammer.shutdownSignal: + if atomic.LoadInt64(&spammer.processId) != processId { return - - default: - spamTransaction := transaction.New(previousTransactionId, previousTransactionId, spammingIdentity, data.New([]byte("SPAM"))) - previousTransactionId = spamTransaction.GetId() - burstBuffer[i] = spamTransaction.GetBytes() } + + spamTransaction := transaction.New(previousTransactionId, previousTransactionId, spammingIdentity, data.New([]byte("SPAM"))) + previousTransactionId = spamTransaction.GetId() + burstBuffer[i] = spamTransaction.GetBytes() } for i := 0; i < transactions; i++ { - select { - case <-spammer.shutdownSignal: + if atomic.LoadInt64(&spammer.processId) != processId { return - - default: - spammer.transactionParser.Parse(burstBuffer[i], nil) } + + spammer.transactionParser.Parse(burstBuffer[i], nil) } } diff --git a/packages/binary/valuetangle/payload.go b/packages/binary/valuetangle/payload.go new file mode 100644 index 0000000000000000000000000000000000000000..659bbd09554e77bc3c7132510a6f2d4738b2ae28 --- /dev/null +++ b/packages/binary/valuetangle/payload.go @@ -0,0 +1,186 @@ +package valuetangle + +import ( + "sync" + + "github.com/iotaledger/hive.go/objectstorage" + "golang.org/x/crypto/blake2b" + + "github.com/iotaledger/goshimmer/packages/binary/marshalutil" + "github.com/iotaledger/goshimmer/packages/binary/tangle/model/transaction/payload" +) + +type Payload struct { + objectstorage.StorableObjectFlags + + id *PayloadId + trunkPayloadId PayloadId + branchPayloadId PayloadId + transfer *Transfer + bytes []byte + + idMutex sync.RWMutex + bytesMutex sync.RWMutex +} + +func NewPayload(trunkPayloadId, branchPayloadId PayloadId, valueTransfer *Transfer) *Payload { + return &Payload{ + trunkPayloadId: trunkPayloadId, + branchPayloadId: branchPayloadId, + transfer: valueTransfer, + } +} + +func (payload *Payload) GetId() PayloadId { + // acquire lock for reading id + payload.idMutex.RLock() + + // return if id has been calculated already + if payload.id != nil { + defer payload.idMutex.RUnlock() + + return *payload.id + } + + // switch to write lock + payload.idMutex.RUnlock() + payload.idMutex.Lock() + defer payload.idMutex.Unlock() + + // return if id has been calculated in the mean time + if payload.id != nil { + return *payload.id + } + + // otherwise calculate the id + transferId := payload.GetTransfer().GetId() + marshalUtil := marshalutil.New(PayloadIdLength + PayloadIdLength + TransferIdLength) + marshalUtil.WriteBytes(payload.trunkPayloadId[:]) + marshalUtil.WriteBytes(payload.branchPayloadId[:]) + marshalUtil.WriteBytes(transferId[:]) + var id PayloadId = blake2b.Sum256(marshalUtil.Bytes()) + payload.id = &id + + return id +} + +func (payload *Payload) GetTrunkPayloadId() PayloadId { + return payload.trunkPayloadId +} + +func (payload *Payload) GetBranchPayloadId() PayloadId { + return payload.branchPayloadId +} + +func (payload *Payload) GetTransfer() *Transfer { + return payload.transfer +} + +// region Payload implementation /////////////////////////////////////////////////////////////////////////////////////// + +var Type = payload.Type(1) + +func (payload *Payload) GetType() payload.Type { + return Type +} + +func (payload *Payload) MarshalBinary() (bytes []byte, err error) { + // acquire lock for reading bytes + payload.bytesMutex.RLock() + + // return if bytes have been determined already + if bytes = payload.bytes; bytes != nil { + defer payload.bytesMutex.RUnlock() + + return + } + + // switch to write lock + payload.bytesMutex.RUnlock() + payload.bytesMutex.Lock() + defer payload.bytesMutex.Unlock() + + // return if bytes have been determined in the mean time + if bytes = payload.bytes; bytes != nil { + return + } + + // retrieve bytes of transfer + transferBytes, err := payload.GetTransfer().MarshalBinary() + if err != nil { + return + } + + // marshal fields + marshalUtil := marshalutil.New(PayloadIdLength + PayloadIdLength + TransferIdLength) + marshalUtil.WriteBytes(payload.trunkPayloadId[:]) + marshalUtil.WriteBytes(payload.branchPayloadId[:]) + marshalUtil.WriteBytes(transferBytes) + bytes = marshalUtil.Bytes() + + // store result + payload.bytes = bytes + + return +} + +func (payload *Payload) UnmarshalBinary(data []byte) (err error) { + marshalUtil := marshalutil.New(data) + + trunkTransactionIdBytes, err := marshalUtil.ReadBytes(PayloadIdLength) + if err != nil { + return + } + + branchTransactionIdBytes, err := marshalUtil.ReadBytes(PayloadIdLength) + if err != nil { + return + } + + valueTransfer := &Transfer{} + if err = valueTransfer.UnmarshalBinary(marshalUtil.ReadRemainingBytes()); err != nil { + return + } + + payload.trunkPayloadId = NewPayloadId(trunkTransactionIdBytes) + payload.branchPayloadId = NewPayloadId(branchTransactionIdBytes) + payload.transfer = valueTransfer + payload.bytes = data + + return +} + +func init() { + payload.RegisterType(Type, func(data []byte) (payload payload.Payload, err error) { + payload = &Payload{} + err = payload.UnmarshalBinary(data) + + return + }) +} + +// define contract (ensure that the struct fulfills the corresponding interface) +var _ payload.Payload = &Payload{} + +// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// + +// region StorableObject implementation //////////////////////////////////////////////////////////////////////////////// + +// MarshalBinary() (bytes []byte, err error) already implemented by Payload + +// UnmarshalBinary(data []byte) (err error) already implemented by Payload + +func (payload *Payload) GetStorageKey() []byte { + id := payload.GetId() + + return id[:] +} + +func (payload *Payload) Update(other objectstorage.StorableObject) { + panic("a Payload should never be updated") +} + +// define contract (ensure that the struct fulfills the corresponding interface) +var _ objectstorage.StorableObject = &Payload{} + +// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/packages/binary/valuetangle/payload_id.go b/packages/binary/valuetangle/payload_id.go new file mode 100644 index 0000000000000000000000000000000000000000..70f5b7c5b3c713d5cb331b276487f2a632bf9341 --- /dev/null +++ b/packages/binary/valuetangle/payload_id.go @@ -0,0 +1,21 @@ +package valuetangle + +import ( + "github.com/mr-tron/base58" +) + +type PayloadId [PayloadIdLength]byte + +func NewPayloadId(idBytes []byte) (result PayloadId) { + copy(result[:], idBytes) + + return +} + +func (id PayloadId) String() string { + return base58.Encode(id[:]) +} + +var EmptyPayloadId PayloadId + +const PayloadIdLength = 32 diff --git a/packages/binary/valuetangle/test/payload_test.go b/packages/binary/valuetangle/test/payload_test.go new file mode 100644 index 0000000000000000000000000000000000000000..1666070110a4cd37b82b9a3ae49e3349027c3ce9 --- /dev/null +++ b/packages/binary/valuetangle/test/payload_test.go @@ -0,0 +1,16 @@ +package test + +import ( + "fmt" + "testing" + + "github.com/iotaledger/goshimmer/packages/binary/valuetangle" +) + +func TestPayload(t *testing.T) { + transfer := valuetangle.NewTransfer() + fmt.Println(transfer.GetId()) + + payload := valuetangle.NewPayload(valuetangle.EmptyPayloadId, valuetangle.EmptyPayloadId, transfer) + fmt.Println(payload.GetId()) +} diff --git a/packages/binary/valuetangle/transfer.go b/packages/binary/valuetangle/transfer.go new file mode 100644 index 0000000000000000000000000000000000000000..e9faad1e4a5ffe845de5491793e886d224e5be7c --- /dev/null +++ b/packages/binary/valuetangle/transfer.go @@ -0,0 +1,86 @@ +package valuetangle + +import ( + "sync" + + "github.com/iotaledger/hive.go/objectstorage" + "golang.org/x/crypto/blake2b" +) + +type Transfer struct { + objectstorage.StorableObjectFlags + + id *TransferId + bytes []byte + + idMutex sync.RWMutex + bytesMutex sync.RWMutex +} + +func NewTransfer() *Transfer { + return &Transfer{} +} + +func FromStorage(key []byte) *Transfer { + id := NewTransferId(key) + + return &Transfer{ + id: &id, + } +} + +func (transfer *Transfer) GetId() TransferId { + // acquire lock for reading id + transfer.idMutex.RLock() + + // return if id has been calculated already + if transfer.id != nil { + defer transfer.idMutex.RUnlock() + + return *transfer.id + } + + // switch to write lock + transfer.idMutex.RUnlock() + transfer.idMutex.Lock() + defer transfer.idMutex.Unlock() + + // return if id has been calculated in the mean time + if transfer.id != nil { + return *transfer.id + } + + // otherwise calculate the PayloadId + transfer.id = transfer.calculateId() + + return *transfer.id +} + +func (transfer *Transfer) calculateId() *TransferId { + bytes, _ := transfer.MarshalBinary() + + id := blake2b.Sum256(bytes) + + result := NewTransferId(id[:]) + + return &result +} + +func (transfer *Transfer) Update(other objectstorage.StorableObject) { + panic("implement me") +} + +func (transfer *Transfer) GetStorageKey() []byte { + panic("implement me") +} + +func (transfer *Transfer) MarshalBinary() ([]byte, error) { + return nil, nil +} + +func (transfer *Transfer) UnmarshalBinary(data []byte) error { + panic("implement me") +} + +// define contracts (ensure that the struct fulfills the corresponding interfaces +var _ objectstorage.StorableObject = &Transfer{} diff --git a/packages/binary/valuetangle/transfer_id.go b/packages/binary/valuetangle/transfer_id.go new file mode 100644 index 0000000000000000000000000000000000000000..147ac73ffed7865743b33ec95d6f64513c87880b --- /dev/null +++ b/packages/binary/valuetangle/transfer_id.go @@ -0,0 +1,19 @@ +package valuetangle + +import ( + "github.com/mr-tron/base58" +) + +type TransferId [TransferIdLength]byte + +func NewTransferId(idBytes []byte) (result TransferId) { + copy(result[:], idBytes) + + return +} + +func (id TransferId) String() string { + return base58.Encode(id[:]) +} + +const TransferIdLength = 32 diff --git a/plugins/tangle/plugin.go b/plugins/tangle/plugin.go index 4afcbb6c40d8b2c2dacc9e1fdae0308a3419c01d..acc998fe3a0e6801f6337f0a120a91683fb0bab7 100644 --- a/plugins/tangle/plugin.go +++ b/plugins/tangle/plugin.go @@ -42,7 +42,6 @@ func configure(*node.Plugin) { // setup TransactionParser TransactionParser.Events.TransactionParsed.Attach(events.NewClosure(func(transaction *transaction.Transaction, peer *peer.Peer) { - peer.PublicKey() // TODO: ADD PEER Instance.AttachTransaction(transaction) diff --git a/plugins/webapi/spammer/plugin.go b/plugins/webapi/spammer/plugin.go index dc06b9f23dadf851976514106e39b6cc57830c79..0b827a85c9b82b59053b043948dff098bb1d8300 100644 --- a/plugins/webapi/spammer/plugin.go +++ b/plugins/webapi/spammer/plugin.go @@ -17,7 +17,6 @@ var PLUGIN = node.NewPlugin("Spammer", node.Disabled, configure, run) func configure(plugin *node.Plugin) { transactionSpammer = spammer.New(tangle.TransactionParser, tangle.TipSelector) - webapi.Server.GET("spammer", handleRequest) }