Skip to content
Snippets Groups Projects
Commit 8ae11a7e authored by Wolfgang Welz's avatar Wolfgang Welz Committed by Luca Moser
Browse files

Fix: Solidifier (#190)

* fix solidifier

Request branch and trunk if absent. Remove solidified transactions from unsolidTxs.

* Recursively request not yet requesting transactions

* remove existing transactions from request queue

* :recycle: refactor solidifier
parent 277041d5
No related branches found
No related tags found
No related merge requests found
...@@ -122,9 +122,5 @@ func getTransaction(hash []byte) ([]byte, error) { ...@@ -122,9 +122,5 @@ func getTransaction(hash []byte) ([]byte, error) {
} }
func requestTransaction(hash trinary.Hash) { func requestTransaction(hash trinary.Hash) {
if contains, _ := tangle.ContainsTransaction(hash); contains {
// Do not request tx that we already know
return
}
mgr.RequestTransaction(typeutils.StringToBytes(hash)) mgr.RequestTransaction(typeutils.StringToBytes(hash))
} }
...@@ -18,12 +18,12 @@ import ( ...@@ -18,12 +18,12 @@ import (
// region plugin module setup ////////////////////////////////////////////////////////////////////////////////////////// // region plugin module setup //////////////////////////////////////////////////////////////////////////////////////////
const UnsolidInterval = 30 const UnsolidInterval = time.Minute
var ( var (
workerCount = runtime.NumCPU() workerCount = runtime.NumCPU()
workerPool *workerpool.WorkerPool workerPool *workerpool.WorkerPool
unsolidTxs *UnsolidTxs requestedTxs *UnsolidTxs
requester Requester requester Requester
) )
...@@ -39,7 +39,7 @@ func configureSolidifier() { ...@@ -39,7 +39,7 @@ func configureSolidifier() {
task.Return(nil) task.Return(nil)
}, workerpool.WorkerCount(workerCount), workerpool.QueueSize(10000)) }, workerpool.WorkerCount(workerCount), workerpool.QueueSize(10000))
unsolidTxs = NewUnsolidTxs() requestedTxs = NewUnsolidTxs()
gossip.Events.TransactionReceived.Attach(events.NewClosure(func(ev *gossip.TransactionReceivedEvent) { gossip.Events.TransactionReceived.Attach(events.NewClosure(func(ev *gossip.TransactionReceivedEvent) {
metaTx := meta_transaction.FromBytes(ev.Data) metaTx := meta_transaction.FromBytes(ev.Data)
...@@ -47,137 +47,32 @@ func configureSolidifier() { ...@@ -47,137 +47,32 @@ func configureSolidifier() {
log.Warnf("invalid transaction: %s", err) log.Warnf("invalid transaction: %s", err)
return return
} }
workerPool.Submit(metaTx) workerPool.Submit(metaTx)
})) }))
} }
func runSolidifier() { func runSolidifier() {
log.Info("Starting Solidifier ...")
daemon.BackgroundWorker("Tangle Solidifier", func(shutdownSignal <-chan struct{}) { daemon.BackgroundWorker("Tangle Solidifier", func(shutdownSignal <-chan struct{}) {
log.Info("Starting Solidifier ... done") log.Info("Starting Solidifier ...")
workerPool.Start() workerPool.Start()
log.Info("Starting Solidifier ... done")
<-shutdownSignal <-shutdownSignal
log.Info("Stopping Solidifier ...") log.Info("Stopping Solidifier ...")
workerPool.StopAndWait() workerPool.StopAndWait()
log.Info("Stopping Solidifier ... done") log.Info("Stopping Solidifier ... done")
}, shutdown.ShutdownPrioritySolidifier) }, shutdown.ShutdownPrioritySolidifier)
}
// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
// Checks and updates the solid flag of a single transaction.
func checkSolidity(transaction *value_transaction.ValueTransaction) (result bool, err error) {
// abort if transaction is solid already
txMetadata, metaDataErr := GetTransactionMetadata(transaction.GetHash(), transactionmetadata.New)
if metaDataErr != nil {
err = metaDataErr
return
} else if txMetadata.GetSolid() {
result = true
return
}
// check solidity of branch transaction if it is not genesis
if branchTransactionHash := transaction.GetBranchTransactionHash(); branchTransactionHash != meta_transaction.BRANCH_NULL_HASH {
// abort if branch transaction is missing
if branchTransaction, branchErr := GetTransaction(branchTransactionHash); branchErr != nil {
err = branchErr
return
} else if branchTransaction == nil {
//log.Info("Missing Branch (nil)", transaction.GetValue(), "Missing ", transaction.GetBranchTransactionHash())
// add BranchTransactionHash to the unsolid txs and send a transaction request
unsolidTxs.Add(transaction.GetBranchTransactionHash())
requestTransaction(transaction.GetBranchTransactionHash())
return
} else if branchTransactionMetadata, branchErr := GetTransactionMetadata(branchTransaction.GetHash(), transactionmetadata.New); branchErr != nil {
err = branchErr
//log.Info("Missing Branch", transaction.GetValue())
return
} else if !branchTransactionMetadata.GetSolid() {
return
}
}
// check solidity of trunk transaction if it is not genesis
if trunkTransactionHash := transaction.GetTrunkTransactionHash(); trunkTransactionHash != meta_transaction.BRANCH_NULL_HASH {
if trunkTransaction, trunkErr := GetTransaction(trunkTransactionHash); trunkErr != nil {
err = trunkErr
return
} else if trunkTransaction == nil {
//log.Info("Missing Trunk (nil)", transaction.GetValue())
// add TrunkTransactionHash to the unsolid txs and send a transaction request
unsolidTxs.Add(transaction.GetTrunkTransactionHash())
requestTransaction(transaction.GetTrunkTransactionHash())
return
} else if trunkTransactionMetadata, trunkErr := GetTransactionMetadata(trunkTransaction.GetHash(), transactionmetadata.New); trunkErr != nil {
err = trunkErr
//log.Info("Missing Trunk", transaction.GetValue())
return
} else if !trunkTransactionMetadata.GetSolid() {
return
}
}
// mark transaction as solid and trigger event
if txMetadata.SetSolid(true) {
Events.TransactionSolid.Trigger(transaction)
}
result = true
return
}
// Checks and updates the solid flag of a transaction and its approvers (future cone).
func IsSolid(transaction *value_transaction.ValueTransaction) (bool, error) {
if isSolid, err := checkSolidity(transaction); err != nil {
return false, err
} else if isSolid {
//log.Info("Solid ", transaction.GetValue())
if err := propagateSolidity(transaction.GetHash()); err != nil {
return false, err //should we return true?
}
return true, nil
}
//log.Info("Not solid ", transaction.GetValue())
return false, nil
} }
func propagateSolidity(transactionHash trinary.Trytes) error { // endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
if transactionApprovers, err := GetApprovers(transactionHash, approvers.New); err != nil {
return err
} else {
for _, approverHash := range transactionApprovers.GetHashes() {
if approver, err := GetTransaction(approverHash); err != nil {
return err
} else if approver != nil {
if isSolid, err := checkSolidity(approver); err != nil {
return err
} else if isSolid {
if err := propagateSolidity(approver.GetHash()); err != nil {
return err
}
}
}
}
}
return nil
}
func processMetaTransaction(metaTransaction *meta_transaction.MetaTransaction) { func processMetaTransaction(metaTransaction *meta_transaction.MetaTransaction) {
metaTransactionHash := metaTransaction.GetHash()
var newTransaction bool var newTransaction bool
tx, err := GetTransaction(metaTransaction.GetHash(), func(transactionHash trinary.Trytes) *value_transaction.ValueTransaction { tx, err := GetTransaction(metaTransactionHash, func(transactionHash trinary.Trytes) *value_transaction.ValueTransaction {
newTransaction = true newTransaction = true
tx := value_transaction.FromMetaTransaction(metaTransaction) tx := value_transaction.FromMetaTransaction(metaTransaction)
...@@ -185,17 +80,18 @@ func processMetaTransaction(metaTransaction *meta_transaction.MetaTransaction) { ...@@ -185,17 +80,18 @@ func processMetaTransaction(metaTransaction *meta_transaction.MetaTransaction) {
return tx return tx
}) })
if err != nil { if err != nil {
log.Errorf("Unable to process transaction %s: %s", metaTransaction.GetHash(), err.Error()) log.Errorf("Unable to process transaction %s: %s", metaTransactionHash, err.Error())
return return
} }
if newTransaction { if newTransaction {
log.Debugw("process new transaction", "hash", tx.GetHash()) log.Debugw("process new transaction", "hash", tx.GetHash())
updateUnsolidTxs(tx) processNewTransaction(tx)
processTransaction(tx) requestedTxs.Remove(tx.GetHash())
updateRequestedTxs()
} }
} }
func processTransaction(transaction *value_transaction.ValueTransaction) { func processNewTransaction(transaction *value_transaction.ValueTransaction) {
Events.TransactionStored.Trigger(transaction) Events.TransactionStored.Trigger(transaction)
// store transaction hash for address in DB // store transaction hash for address in DB
...@@ -226,19 +122,136 @@ func processTransaction(transaction *value_transaction.ValueTransaction) { ...@@ -226,19 +122,136 @@ func processTransaction(transaction *value_transaction.ValueTransaction) {
branchApprovers.Add(transactionHash) branchApprovers.Add(transactionHash)
} }
// update the solidity flags of this transaction and its approvers isSolid, err := isSolid(transactionHash)
if _, err := IsSolid(transaction); err != nil { if err != nil {
log.Errorf("Unable to check solidity: %s", err.Error()) log.Errorf("Unable to check solidity: %s", err.Error())
return }
// if the transaction was solidified propagate this information
if isSolid {
if err := propagateSolidity(transaction.GetHash()); err != nil {
log.Errorf("Unable to propagate solidity: %s", err.Error())
}
} }
} }
func updateUnsolidTxs(tx *value_transaction.ValueTransaction) { // isSolid checks whether the transaction with the given hash is solid. A transaction is solid, if it is
unsolidTxs.Remove(tx.GetHash()) // either marked as solid or all its referenced transactions are in the database.
targetTime := time.Now().Add(time.Duration(-UnsolidInterval) * time.Second) func isSolid(hash trinary.Hash) (bool, error) {
txs := unsolidTxs.Update(targetTime) // the genesis is always solid
for _, tx := range txs { if hash == meta_transaction.BRANCH_NULL_HASH {
requestTransaction(tx) return true, nil
}
// if the transaction is not in the DB, request it
transaction, err := GetTransaction(hash)
if err != nil {
return false, err
}
if transaction == nil {
if requestedTxs.Add(hash) {
requestTransaction(hash)
}
return false, nil
}
// check whether the transaction is marked solid
metadata, err := GetTransactionMetadata(hash, transactionmetadata.New)
if err != nil {
return false, err
}
if metadata.GetSolid() {
return true, nil
}
branch := contains(transaction.GetBranchTransactionHash())
trunk := contains(transaction.GetTrunkTransactionHash())
if !branch || !trunk {
return false, nil
}
// everything is good, mark the transaction as solid
return true, markSolid(transaction)
}
func contains(hash trinary.Hash) bool {
if hash == meta_transaction.BRANCH_NULL_HASH {
return true
}
if contains, _ := ContainsTransaction(hash); !contains {
if requestedTxs.Add(hash) {
requestTransaction(hash)
}
return false
}
return true
}
func isMarkedSolid(hash trinary.Hash) (bool, error) {
if hash == meta_transaction.BRANCH_NULL_HASH {
return true, nil
}
metadata, err := GetTransactionMetadata(hash, transactionmetadata.New)
if err != nil {
return false, err
}
return metadata.GetSolid(), nil
}
func markSolid(transaction *value_transaction.ValueTransaction) error {
txMetadata, err := GetTransactionMetadata(transaction.GetHash(), transactionmetadata.New)
if err != nil {
return err
}
if txMetadata.SetSolid(true) {
log.Debugw("transaction solidified", "hash", transaction.GetHash())
Events.TransactionSolid.Trigger(transaction)
return propagateSolidity(transaction.GetHash())
}
return nil
}
func propagateSolidity(transactionHash trinary.Trytes) error {
approvingTransactions, err := GetApprovers(transactionHash, approvers.New)
if err != nil {
return err
}
for _, hash := range approvingTransactions.GetHashes() {
approver, err := GetTransaction(hash)
if err != nil {
return err
}
if approver != nil {
branchSolid, err := isMarkedSolid(approver.GetBranchTransactionHash())
if err != nil {
return err
}
if !branchSolid {
continue
}
trunkSolid, err := isMarkedSolid(approver.GetTrunkTransactionHash())
if err != nil {
return err
}
if !trunkSolid {
continue
}
if err := markSolid(approver); err != nil {
return err
}
}
}
return nil
}
func updateRequestedTxs() {
targetTime := time.Now().Add(-UnsolidInterval)
txs := requestedTxs.Update(targetTime)
for _, txHash := range txs {
if contains, _ := ContainsTransaction(txHash); contains {
requestedTxs.Remove(txHash)
continue
}
requestTransaction(txHash)
} }
} }
......
...@@ -21,21 +21,24 @@ func NewUnsolidTxs() *UnsolidTxs { ...@@ -21,21 +21,24 @@ func NewUnsolidTxs() *UnsolidTxs {
} }
} }
func (u *UnsolidTxs) Add(hash string) { func (u *UnsolidTxs) Add(hash string) bool {
u.Lock() u.Lock()
defer u.Unlock()
_, contains := u.internal[hash]
if contains {
return false
}
info := Info{ info := Info{
lastRequest: time.Now(), lastRequest: time.Now(),
counter: 1, counter: 1,
} }
u.internal[hash] = info u.internal[hash] = info
u.Unlock() return true
} }
func (u *UnsolidTxs) Remove(hash string) { func (u *UnsolidTxs) Remove(hash string) {
u.Lock() u.Lock()
if _, exists := u.internal[hash]; !exists { delete(u.internal, hash)
delete(u.internal, hash)
}
u.Unlock() u.Unlock()
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment