diff --git a/plugins/gossip/gossip.go b/plugins/gossip/gossip.go index 64e5b778906cb465eec81bc8c3b6f781470064fd..2d549ce97b63a708b2e4ddf01f9cabec1cd3b1eb 100644 --- a/plugins/gossip/gossip.go +++ b/plugins/gossip/gossip.go @@ -5,6 +5,7 @@ import ( zL "github.com/iotaledger/autopeering-sim/logger" "github.com/iotaledger/autopeering-sim/peer/service" "github.com/iotaledger/autopeering-sim/selection" + "github.com/iotaledger/goshimmer/packages/gossip" gp "github.com/iotaledger/goshimmer/packages/gossip" pb "github.com/iotaledger/goshimmer/packages/gossip/proto" "github.com/iotaledger/goshimmer/packages/gossip/transport" @@ -43,10 +44,14 @@ var ( ) func getTransaction(h []byte) ([]byte, error) { - tx := &pb.TransactionRequest{ - Hash: []byte("testTx"), + tx, err := tangle.GetTransaction(string(h)) + if err != nil { + return []byte{}, err + } + pTx := &pb.TransactionRequest{ + Hash: tx.GetBytes(), } - b, _ := proto.Marshal(tx) + b, _ := proto.Marshal(pTx) return b, nil } @@ -87,7 +92,7 @@ func configureEvents() { })) tangle.Events.TransactionSolid.Attach(events.NewClosure(func(tx *value_transaction.ValueTransaction) { - log.Info("Tx solidified") + log.Info("Tx solidified:", tx.MetaTransaction.GetBytes()) t := &pb.Transaction{ Body: tx.MetaTransaction.GetBytes(), } @@ -97,4 +102,9 @@ func configureEvents() { } go mgr.SendTransaction(b) })) + + gossip.Events.RequestTransaction.Attach(events.NewClosure(func(ev *gossip.RequestTransactionEvent) { + log.Info("Tx Requested:", ev.Hash) + go mgr.RequestTransaction(ev.Hash) + })) } diff --git a/plugins/tangle/solidifier.go b/plugins/tangle/solidifier.go index 696a475052de2da455201a5da381e85eefc4daac..f799848033d57030fc925c38ccb514b1bc5078cd 100644 --- a/plugins/tangle/solidifier.go +++ b/plugins/tangle/solidifier.go @@ -2,6 +2,7 @@ package tangle import ( "runtime" + "time" "github.com/golang/protobuf/proto" "github.com/iotaledger/goshimmer/packages/errors" @@ -20,7 +21,12 @@ import ( // region plugin module setup ////////////////////////////////////////////////////////////////////////////////////////// -var workerPool *workerpool.WorkerPool +const UnsolidInterval = 30 + +var ( + workerPool *workerpool.WorkerPool + unsolidTxs *UnsolidTxs +) func configureSolidifier(plugin *node.Plugin) { workerPool = workerpool.New(func(task workerpool.Task) { @@ -29,6 +35,8 @@ func configureSolidifier(plugin *node.Plugin) { task.Return(nil) }, workerpool.WorkerCount(WORKER_COUNT), workerpool.QueueSize(10000)) + unsolidTxs = NewUnsolidTxs() + gossip.Events.TransactionReceived.Attach(events.NewClosure(func(ev *gossip.TransactionReceivedEvent) { //log.Info("New Transaction", ev.Body) pTx := &pb.Transaction{} @@ -77,10 +85,16 @@ func checkSolidity(transaction *value_transaction.ValueTransaction) (result bool return } else if branchTransaction == nil { + //log.Info("Missing Branch (nil)", transaction.GetValue(), "Missing ", transaction.GetBranchTransactionHash()) + + // add BranchTransactionHash to the unsolid txs and send a transaction request + unsolidTxs.Add(transaction.GetBranchTransactionHash()) + requestTransaction(transaction.GetBranchTransactionHash()) + return } else if branchTransactionMetadata, branchErr := GetTransactionMetadata(branchTransaction.GetHash(), transactionmetadata.New); branchErr != nil { err = branchErr - + //log.Info("Missing Branch", transaction.GetValue()) return } else if !branchTransactionMetadata.GetSolid() { return @@ -94,10 +108,16 @@ func checkSolidity(transaction *value_transaction.ValueTransaction) (result bool return } else if trunkTransaction == nil { + //log.Info("Missing Trunk (nil)", transaction.GetValue()) + + // add TrunkTransactionHash to the unsolid txs and send a transaction request + unsolidTxs.Add(transaction.GetTrunkTransactionHash()) + requestTransaction(transaction.GetTrunkTransactionHash()) + return } else if trunkTransactionMetadata, trunkErr := GetTransactionMetadata(trunkTransaction.GetHash(), transactionmetadata.New); trunkErr != nil { err = trunkErr - + //log.Info("Missing Trunk", transaction.GetValue()) return } else if !trunkTransactionMetadata.GetSolid() { return @@ -119,11 +139,13 @@ func IsSolid(transaction *value_transaction.ValueTransaction) (bool, errors.Iden if isSolid, err := checkSolidity(transaction); err != nil { return false, err } else if isSolid { + //log.Info("Solid ", transaction.GetValue()) if err := propagateSolidity(transaction.GetHash()); err != nil { - return false, err + return false, err //should we return true? } + return true, nil } - + //log.Info("Not solid ", transaction.GetValue()) return false, nil } @@ -158,6 +180,7 @@ func processMetaTransaction(plugin *node.Plugin, metaTransaction *meta_transacti }); err != nil { log.Errorf("Unable to load transaction %s: %s", metaTransaction.GetHash(), err.Error()) } else if newTransaction { + updateUnsolidTxs(tx) processTransaction(plugin, tx) } } @@ -184,10 +207,27 @@ func processTransaction(plugin *node.Plugin, transaction *value_transaction.Valu } // update the solidity flags of this transaction and its approvers - if _, err := IsSolid(transaction); err != nil { + _, err := IsSolid(transaction) + if err != nil { log.Errorf("Unable to check solidity: %s", err.Error()) return } } +func updateUnsolidTxs(tx *value_transaction.ValueTransaction) { + unsolidTxs.Remove(tx.GetHash()) + targetTime := time.Now().Add(time.Duration(-UnsolidInterval) * time.Second) + txs := unsolidTxs.Update(targetTime) + for _, tx := range txs { + requestTransaction(tx) + } +} + +func requestTransaction(tx string) { + log.Info("Requesting tx: ", tx) + req := &pb.TransactionRequest{Hash: []byte(tx)} + b, _ := proto.Marshal(req) + gossip.Events.RequestTransaction.Trigger(&gossip.RequestTransactionEvent{Hash: b}) +} + var WORKER_COUNT = runtime.NumCPU() diff --git a/plugins/tangle/solidifier_test.go b/plugins/tangle/solidifier_test.go index 5daf06e8a224e442fea49a5fd8c594684ac99b49..263294f59e8cc036a283e1bc1d16798c75e2757f 100644 --- a/plugins/tangle/solidifier_test.go +++ b/plugins/tangle/solidifier_test.go @@ -31,18 +31,28 @@ func TestSolidifier(t *testing.T) { transaction1 := value_transaction.New() transaction1.SetNonce(trinary.Trytes("99999999999999999999999999A")) transaction2 := value_transaction.New() + transaction2.SetValue(2) transaction2.SetBranchTransactionHash(transaction1.GetHash()) transaction3 := value_transaction.New() + transaction3.SetValue(3) transaction3.SetBranchTransactionHash(transaction2.GetHash()) transaction4 := value_transaction.New() + transaction4.SetValue(4) transaction4.SetBranchTransactionHash(transaction3.GetHash()) // setup event handlers var wg sync.WaitGroup Events.TransactionSolid.Attach(events.NewClosure(func(transaction *value_transaction.ValueTransaction) { + t.Log("Tx solidified", transaction.GetValue()) wg.Done() })) + gossip.Events.RequestTransaction.Attach(events.NewClosure(func(ev *gossip.RequestTransactionEvent) { + tx := &pb.Transaction{Body: transaction3.MetaTransaction.GetBytes()} + b, _ := proto.Marshal(tx) + gossip.Events.TransactionReceived.Trigger(&gossip.TransactionReceivedEvent{Body: b}) + })) + // issue transactions wg.Add(4) tx := &pb.Transaction{Body: transaction1.MetaTransaction.GetBytes()} @@ -53,9 +63,9 @@ func TestSolidifier(t *testing.T) { b, _ = proto.Marshal(tx) gossip.Events.TransactionReceived.Trigger(&gossip.TransactionReceivedEvent{Body: b}) - tx = &pb.Transaction{Body: transaction3.MetaTransaction.GetBytes()} - b, _ = proto.Marshal(tx) - gossip.Events.TransactionReceived.Trigger(&gossip.TransactionReceivedEvent{Body: b}) + // tx = &pb.Transaction{Body: transaction3.MetaTransaction.GetBytes()} + // b, _ = proto.Marshal(tx) + // gossip.Events.TransactionReceived.Trigger(&gossip.TransactionReceivedEvent{Body: b}) tx = &pb.Transaction{Body: transaction4.MetaTransaction.GetBytes()} b, _ = proto.Marshal(tx) @@ -66,4 +76,5 @@ func TestSolidifier(t *testing.T) { // shutdown test node node.Shutdown() + } diff --git a/plugins/tangle/unsolidTxs.go b/plugins/tangle/unsolidTxs.go new file mode 100644 index 0000000000000000000000000000000000000000..631da933afa558b0737266129e40e876fba2a7d3 --- /dev/null +++ b/plugins/tangle/unsolidTxs.go @@ -0,0 +1,56 @@ +package tangle + +import ( + "sync" + "time" +) + +type UnsolidTxs struct { + internal map[string]Info + sync.RWMutex +} + +type Info struct { + lastRequest time.Time + counter int +} + +func NewUnsolidTxs() *UnsolidTxs { + return &UnsolidTxs{ + internal: make(map[string]Info), + } +} + +func (u *UnsolidTxs) Add(hash string) { + u.Lock() + info := Info{ + lastRequest: time.Now(), + counter: 1, + } + u.internal[hash] = info + u.Unlock() +} + +func (u *UnsolidTxs) Remove(hash string) { + u.Lock() + if _, exists := u.internal[hash]; !exists { + delete(u.internal, hash) + } + u.Unlock() +} + +func (u *UnsolidTxs) Update(targetTime time.Time) (result []string) { + u.Lock() + for k, v := range u.internal { + if v.lastRequest.Before(targetTime) { + result = append(result, k) + + v.lastRequest = time.Now() + v.counter++ + + u.internal[k] = v + } + } + u.Unlock() + return result +}