Skip to content
Snippets Groups Projects
Commit 39a918a8 authored by capossele's avatar capossele
Browse files

:sparkles: adds unsolid tx request

parent adf5f02a
No related branches found
No related tags found
No related merge requests found
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
zL "github.com/iotaledger/autopeering-sim/logger" zL "github.com/iotaledger/autopeering-sim/logger"
"github.com/iotaledger/autopeering-sim/peer/service" "github.com/iotaledger/autopeering-sim/peer/service"
"github.com/iotaledger/autopeering-sim/selection" "github.com/iotaledger/autopeering-sim/selection"
"github.com/iotaledger/goshimmer/packages/gossip"
gp "github.com/iotaledger/goshimmer/packages/gossip" gp "github.com/iotaledger/goshimmer/packages/gossip"
pb "github.com/iotaledger/goshimmer/packages/gossip/proto" pb "github.com/iotaledger/goshimmer/packages/gossip/proto"
"github.com/iotaledger/goshimmer/packages/gossip/transport" "github.com/iotaledger/goshimmer/packages/gossip/transport"
...@@ -43,10 +44,14 @@ var ( ...@@ -43,10 +44,14 @@ var (
) )
func getTransaction(h []byte) ([]byte, error) { func getTransaction(h []byte) ([]byte, error) {
tx := &pb.TransactionRequest{ tx, err := tangle.GetTransaction(string(h))
Hash: []byte("testTx"), if err != nil {
return []byte{}, err
}
pTx := &pb.TransactionRequest{
Hash: tx.GetBytes(),
} }
b, _ := proto.Marshal(tx) b, _ := proto.Marshal(pTx)
return b, nil return b, nil
} }
...@@ -87,7 +92,7 @@ func configureEvents() { ...@@ -87,7 +92,7 @@ func configureEvents() {
})) }))
tangle.Events.TransactionSolid.Attach(events.NewClosure(func(tx *value_transaction.ValueTransaction) { tangle.Events.TransactionSolid.Attach(events.NewClosure(func(tx *value_transaction.ValueTransaction) {
log.Info("Tx solidified") log.Info("Tx solidified:", tx.MetaTransaction.GetBytes())
t := &pb.Transaction{ t := &pb.Transaction{
Body: tx.MetaTransaction.GetBytes(), Body: tx.MetaTransaction.GetBytes(),
} }
...@@ -97,4 +102,9 @@ func configureEvents() { ...@@ -97,4 +102,9 @@ func configureEvents() {
} }
go mgr.SendTransaction(b) go mgr.SendTransaction(b)
})) }))
gossip.Events.RequestTransaction.Attach(events.NewClosure(func(ev *gossip.RequestTransactionEvent) {
log.Info("Tx Requested:", ev.Hash)
go mgr.RequestTransaction(ev.Hash)
}))
} }
...@@ -2,6 +2,7 @@ package tangle ...@@ -2,6 +2,7 @@ package tangle
import ( import (
"runtime" "runtime"
"time"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/iotaledger/goshimmer/packages/errors" "github.com/iotaledger/goshimmer/packages/errors"
...@@ -20,7 +21,12 @@ import ( ...@@ -20,7 +21,12 @@ import (
// region plugin module setup ////////////////////////////////////////////////////////////////////////////////////////// // region plugin module setup //////////////////////////////////////////////////////////////////////////////////////////
var workerPool *workerpool.WorkerPool const UnsolidInterval = 30
var (
workerPool *workerpool.WorkerPool
unsolidTxs *UnsolidTxs
)
func configureSolidifier(plugin *node.Plugin) { func configureSolidifier(plugin *node.Plugin) {
workerPool = workerpool.New(func(task workerpool.Task) { workerPool = workerpool.New(func(task workerpool.Task) {
...@@ -29,6 +35,8 @@ func configureSolidifier(plugin *node.Plugin) { ...@@ -29,6 +35,8 @@ func configureSolidifier(plugin *node.Plugin) {
task.Return(nil) task.Return(nil)
}, workerpool.WorkerCount(WORKER_COUNT), workerpool.QueueSize(10000)) }, workerpool.WorkerCount(WORKER_COUNT), workerpool.QueueSize(10000))
unsolidTxs = NewUnsolidTxs()
gossip.Events.TransactionReceived.Attach(events.NewClosure(func(ev *gossip.TransactionReceivedEvent) { gossip.Events.TransactionReceived.Attach(events.NewClosure(func(ev *gossip.TransactionReceivedEvent) {
//log.Info("New Transaction", ev.Body) //log.Info("New Transaction", ev.Body)
pTx := &pb.Transaction{} pTx := &pb.Transaction{}
...@@ -77,10 +85,16 @@ func checkSolidity(transaction *value_transaction.ValueTransaction) (result bool ...@@ -77,10 +85,16 @@ func checkSolidity(transaction *value_transaction.ValueTransaction) (result bool
return return
} else if branchTransaction == nil { } else if branchTransaction == nil {
//log.Info("Missing Branch (nil)", transaction.GetValue(), "Missing ", transaction.GetBranchTransactionHash())
// add BranchTransactionHash to the unsolid txs and send a transaction request
unsolidTxs.Add(transaction.GetBranchTransactionHash())
requestTransaction(transaction.GetBranchTransactionHash())
return return
} else if branchTransactionMetadata, branchErr := GetTransactionMetadata(branchTransaction.GetHash(), transactionmetadata.New); branchErr != nil { } else if branchTransactionMetadata, branchErr := GetTransactionMetadata(branchTransaction.GetHash(), transactionmetadata.New); branchErr != nil {
err = branchErr err = branchErr
//log.Info("Missing Branch", transaction.GetValue())
return return
} else if !branchTransactionMetadata.GetSolid() { } else if !branchTransactionMetadata.GetSolid() {
return return
...@@ -94,10 +108,16 @@ func checkSolidity(transaction *value_transaction.ValueTransaction) (result bool ...@@ -94,10 +108,16 @@ func checkSolidity(transaction *value_transaction.ValueTransaction) (result bool
return return
} else if trunkTransaction == nil { } else if trunkTransaction == nil {
//log.Info("Missing Trunk (nil)", transaction.GetValue())
// add TrunkTransactionHash to the unsolid txs and send a transaction request
unsolidTxs.Add(transaction.GetTrunkTransactionHash())
requestTransaction(transaction.GetTrunkTransactionHash())
return return
} else if trunkTransactionMetadata, trunkErr := GetTransactionMetadata(trunkTransaction.GetHash(), transactionmetadata.New); trunkErr != nil { } else if trunkTransactionMetadata, trunkErr := GetTransactionMetadata(trunkTransaction.GetHash(), transactionmetadata.New); trunkErr != nil {
err = trunkErr err = trunkErr
//log.Info("Missing Trunk", transaction.GetValue())
return return
} else if !trunkTransactionMetadata.GetSolid() { } else if !trunkTransactionMetadata.GetSolid() {
return return
...@@ -119,11 +139,13 @@ func IsSolid(transaction *value_transaction.ValueTransaction) (bool, errors.Iden ...@@ -119,11 +139,13 @@ func IsSolid(transaction *value_transaction.ValueTransaction) (bool, errors.Iden
if isSolid, err := checkSolidity(transaction); err != nil { if isSolid, err := checkSolidity(transaction); err != nil {
return false, err return false, err
} else if isSolid { } else if isSolid {
//log.Info("Solid ", transaction.GetValue())
if err := propagateSolidity(transaction.GetHash()); err != nil { if err := propagateSolidity(transaction.GetHash()); err != nil {
return false, err return false, err //should we return true?
} }
return true, nil
} }
//log.Info("Not solid ", transaction.GetValue())
return false, nil return false, nil
} }
...@@ -158,6 +180,7 @@ func processMetaTransaction(plugin *node.Plugin, metaTransaction *meta_transacti ...@@ -158,6 +180,7 @@ func processMetaTransaction(plugin *node.Plugin, metaTransaction *meta_transacti
}); err != nil { }); err != nil {
log.Errorf("Unable to load transaction %s: %s", metaTransaction.GetHash(), err.Error()) log.Errorf("Unable to load transaction %s: %s", metaTransaction.GetHash(), err.Error())
} else if newTransaction { } else if newTransaction {
updateUnsolidTxs(tx)
processTransaction(plugin, tx) processTransaction(plugin, tx)
} }
} }
...@@ -184,10 +207,27 @@ func processTransaction(plugin *node.Plugin, transaction *value_transaction.Valu ...@@ -184,10 +207,27 @@ func processTransaction(plugin *node.Plugin, transaction *value_transaction.Valu
} }
// update the solidity flags of this transaction and its approvers // update the solidity flags of this transaction and its approvers
if _, err := IsSolid(transaction); err != nil { _, err := IsSolid(transaction)
if err != nil {
log.Errorf("Unable to check solidity: %s", err.Error()) log.Errorf("Unable to check solidity: %s", err.Error())
return return
} }
} }
func updateUnsolidTxs(tx *value_transaction.ValueTransaction) {
unsolidTxs.Remove(tx.GetHash())
targetTime := time.Now().Add(time.Duration(-UnsolidInterval) * time.Second)
txs := unsolidTxs.Update(targetTime)
for _, tx := range txs {
requestTransaction(tx)
}
}
func requestTransaction(tx string) {
log.Info("Requesting tx: ", tx)
req := &pb.TransactionRequest{Hash: []byte(tx)}
b, _ := proto.Marshal(req)
gossip.Events.RequestTransaction.Trigger(&gossip.RequestTransactionEvent{Hash: b})
}
var WORKER_COUNT = runtime.NumCPU() var WORKER_COUNT = runtime.NumCPU()
...@@ -31,18 +31,28 @@ func TestSolidifier(t *testing.T) { ...@@ -31,18 +31,28 @@ func TestSolidifier(t *testing.T) {
transaction1 := value_transaction.New() transaction1 := value_transaction.New()
transaction1.SetNonce(trinary.Trytes("99999999999999999999999999A")) transaction1.SetNonce(trinary.Trytes("99999999999999999999999999A"))
transaction2 := value_transaction.New() transaction2 := value_transaction.New()
transaction2.SetValue(2)
transaction2.SetBranchTransactionHash(transaction1.GetHash()) transaction2.SetBranchTransactionHash(transaction1.GetHash())
transaction3 := value_transaction.New() transaction3 := value_transaction.New()
transaction3.SetValue(3)
transaction3.SetBranchTransactionHash(transaction2.GetHash()) transaction3.SetBranchTransactionHash(transaction2.GetHash())
transaction4 := value_transaction.New() transaction4 := value_transaction.New()
transaction4.SetValue(4)
transaction4.SetBranchTransactionHash(transaction3.GetHash()) transaction4.SetBranchTransactionHash(transaction3.GetHash())
// setup event handlers // setup event handlers
var wg sync.WaitGroup var wg sync.WaitGroup
Events.TransactionSolid.Attach(events.NewClosure(func(transaction *value_transaction.ValueTransaction) { Events.TransactionSolid.Attach(events.NewClosure(func(transaction *value_transaction.ValueTransaction) {
t.Log("Tx solidified", transaction.GetValue())
wg.Done() wg.Done()
})) }))
gossip.Events.RequestTransaction.Attach(events.NewClosure(func(ev *gossip.RequestTransactionEvent) {
tx := &pb.Transaction{Body: transaction3.MetaTransaction.GetBytes()}
b, _ := proto.Marshal(tx)
gossip.Events.TransactionReceived.Trigger(&gossip.TransactionReceivedEvent{Body: b})
}))
// issue transactions // issue transactions
wg.Add(4) wg.Add(4)
tx := &pb.Transaction{Body: transaction1.MetaTransaction.GetBytes()} tx := &pb.Transaction{Body: transaction1.MetaTransaction.GetBytes()}
...@@ -53,9 +63,9 @@ func TestSolidifier(t *testing.T) { ...@@ -53,9 +63,9 @@ func TestSolidifier(t *testing.T) {
b, _ = proto.Marshal(tx) b, _ = proto.Marshal(tx)
gossip.Events.TransactionReceived.Trigger(&gossip.TransactionReceivedEvent{Body: b}) gossip.Events.TransactionReceived.Trigger(&gossip.TransactionReceivedEvent{Body: b})
tx = &pb.Transaction{Body: transaction3.MetaTransaction.GetBytes()} // tx = &pb.Transaction{Body: transaction3.MetaTransaction.GetBytes()}
b, _ = proto.Marshal(tx) // b, _ = proto.Marshal(tx)
gossip.Events.TransactionReceived.Trigger(&gossip.TransactionReceivedEvent{Body: b}) // gossip.Events.TransactionReceived.Trigger(&gossip.TransactionReceivedEvent{Body: b})
tx = &pb.Transaction{Body: transaction4.MetaTransaction.GetBytes()} tx = &pb.Transaction{Body: transaction4.MetaTransaction.GetBytes()}
b, _ = proto.Marshal(tx) b, _ = proto.Marshal(tx)
...@@ -66,4 +76,5 @@ func TestSolidifier(t *testing.T) { ...@@ -66,4 +76,5 @@ func TestSolidifier(t *testing.T) {
// shutdown test node // shutdown test node
node.Shutdown() node.Shutdown()
} }
package tangle
import (
"sync"
"time"
)
type UnsolidTxs struct {
internal map[string]Info
sync.RWMutex
}
type Info struct {
lastRequest time.Time
counter int
}
func NewUnsolidTxs() *UnsolidTxs {
return &UnsolidTxs{
internal: make(map[string]Info),
}
}
func (u *UnsolidTxs) Add(hash string) {
u.Lock()
info := Info{
lastRequest: time.Now(),
counter: 1,
}
u.internal[hash] = info
u.Unlock()
}
func (u *UnsolidTxs) Remove(hash string) {
u.Lock()
if _, exists := u.internal[hash]; !exists {
delete(u.internal, hash)
}
u.Unlock()
}
func (u *UnsolidTxs) Update(targetTime time.Time) (result []string) {
u.Lock()
for k, v := range u.internal {
if v.lastRequest.Before(targetTime) {
result = append(result, k)
v.lastRequest = time.Now()
v.counter++
u.internal[k] = v
}
}
u.Unlock()
return result
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment