diff --git a/plugins/gossip/gossip.go b/plugins/gossip/gossip.go index db2c609d88b8379a0c108627e2a4a525a51cbf27..9d2c928810d00596a89edb2f46ce5312e62ff1a8 100644 --- a/plugins/gossip/gossip.go +++ b/plugins/gossip/gossip.go @@ -122,9 +122,5 @@ func getTransaction(hash []byte) ([]byte, error) { } func requestTransaction(hash trinary.Hash) { - if contains, _ := tangle.ContainsTransaction(hash); contains { - // Do not request tx that we already know - return - } mgr.RequestTransaction(typeutils.StringToBytes(hash)) } diff --git a/plugins/tangle/solidifier.go b/plugins/tangle/solidifier.go index 7d89db6ed563055a13dd0c7926a67ec9b799447a..02d84440969ce18efcef093e23ea8e3c6e897d92 100644 --- a/plugins/tangle/solidifier.go +++ b/plugins/tangle/solidifier.go @@ -18,12 +18,12 @@ import ( // region plugin module setup ////////////////////////////////////////////////////////////////////////////////////////// -const UnsolidInterval = 30 +const UnsolidInterval = time.Minute var ( - workerCount = runtime.NumCPU() - workerPool *workerpool.WorkerPool - unsolidTxs *UnsolidTxs + workerCount = runtime.NumCPU() + workerPool *workerpool.WorkerPool + requestedTxs *UnsolidTxs requester Requester ) @@ -39,7 +39,7 @@ func configureSolidifier() { task.Return(nil) }, workerpool.WorkerCount(workerCount), workerpool.QueueSize(10000)) - unsolidTxs = NewUnsolidTxs() + requestedTxs = NewUnsolidTxs() gossip.Events.TransactionReceived.Attach(events.NewClosure(func(ev *gossip.TransactionReceivedEvent) { metaTx := meta_transaction.FromBytes(ev.Data) @@ -47,137 +47,32 @@ func configureSolidifier() { log.Warnf("invalid transaction: %s", err) return } - workerPool.Submit(metaTx) })) } func runSolidifier() { - log.Info("Starting Solidifier ...") - daemon.BackgroundWorker("Tangle Solidifier", func(shutdownSignal <-chan struct{}) { - log.Info("Starting Solidifier ... done") + log.Info("Starting Solidifier ...") workerPool.Start() + log.Info("Starting Solidifier ... done") + <-shutdownSignal + log.Info("Stopping Solidifier ...") workerPool.StopAndWait() log.Info("Stopping Solidifier ... done") }, shutdown.ShutdownPrioritySolidifier) -} - -// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// - -// Checks and updates the solid flag of a single transaction. -func checkSolidity(transaction *value_transaction.ValueTransaction) (result bool, err error) { - // abort if transaction is solid already - txMetadata, metaDataErr := GetTransactionMetadata(transaction.GetHash(), transactionmetadata.New) - if metaDataErr != nil { - err = metaDataErr - - return - } else if txMetadata.GetSolid() { - result = true - - return - } - - // check solidity of branch transaction if it is not genesis - if branchTransactionHash := transaction.GetBranchTransactionHash(); branchTransactionHash != meta_transaction.BRANCH_NULL_HASH { - // abort if branch transaction is missing - if branchTransaction, branchErr := GetTransaction(branchTransactionHash); branchErr != nil { - err = branchErr - - return - } else if branchTransaction == nil { - //log.Info("Missing Branch (nil)", transaction.GetValue(), "Missing ", transaction.GetBranchTransactionHash()) - - // add BranchTransactionHash to the unsolid txs and send a transaction request - unsolidTxs.Add(transaction.GetBranchTransactionHash()) - requestTransaction(transaction.GetBranchTransactionHash()) - - return - } else if branchTransactionMetadata, branchErr := GetTransactionMetadata(branchTransaction.GetHash(), transactionmetadata.New); branchErr != nil { - err = branchErr - //log.Info("Missing Branch", transaction.GetValue()) - return - } else if !branchTransactionMetadata.GetSolid() { - return - } - } - - // check solidity of trunk transaction if it is not genesis - if trunkTransactionHash := transaction.GetTrunkTransactionHash(); trunkTransactionHash != meta_transaction.BRANCH_NULL_HASH { - if trunkTransaction, trunkErr := GetTransaction(trunkTransactionHash); trunkErr != nil { - err = trunkErr - - return - } else if trunkTransaction == nil { - //log.Info("Missing Trunk (nil)", transaction.GetValue()) - - // add TrunkTransactionHash to the unsolid txs and send a transaction request - unsolidTxs.Add(transaction.GetTrunkTransactionHash()) - requestTransaction(transaction.GetTrunkTransactionHash()) - - return - } else if trunkTransactionMetadata, trunkErr := GetTransactionMetadata(trunkTransaction.GetHash(), transactionmetadata.New); trunkErr != nil { - err = trunkErr - //log.Info("Missing Trunk", transaction.GetValue()) - return - } else if !trunkTransactionMetadata.GetSolid() { - return - } - } - - // mark transaction as solid and trigger event - if txMetadata.SetSolid(true) { - Events.TransactionSolid.Trigger(transaction) - } - result = true - - return -} - -// Checks and updates the solid flag of a transaction and its approvers (future cone). -func IsSolid(transaction *value_transaction.ValueTransaction) (bool, error) { - if isSolid, err := checkSolidity(transaction); err != nil { - return false, err - } else if isSolid { - //log.Info("Solid ", transaction.GetValue()) - if err := propagateSolidity(transaction.GetHash()); err != nil { - return false, err //should we return true? - } - return true, nil - } - //log.Info("Not solid ", transaction.GetValue()) - return false, nil } -func propagateSolidity(transactionHash trinary.Trytes) error { - if transactionApprovers, err := GetApprovers(transactionHash, approvers.New); err != nil { - return err - } else { - for _, approverHash := range transactionApprovers.GetHashes() { - if approver, err := GetTransaction(approverHash); err != nil { - return err - } else if approver != nil { - if isSolid, err := checkSolidity(approver); err != nil { - return err - } else if isSolid { - if err := propagateSolidity(approver.GetHash()); err != nil { - return err - } - } - } - } - } - - return nil -} +// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// func processMetaTransaction(metaTransaction *meta_transaction.MetaTransaction) { + metaTransactionHash := metaTransaction.GetHash() + var newTransaction bool - tx, err := GetTransaction(metaTransaction.GetHash(), func(transactionHash trinary.Trytes) *value_transaction.ValueTransaction { + tx, err := GetTransaction(metaTransactionHash, func(transactionHash trinary.Trytes) *value_transaction.ValueTransaction { newTransaction = true tx := value_transaction.FromMetaTransaction(metaTransaction) @@ -185,17 +80,18 @@ func processMetaTransaction(metaTransaction *meta_transaction.MetaTransaction) { return tx }) if err != nil { - log.Errorf("Unable to process transaction %s: %s", metaTransaction.GetHash(), err.Error()) + log.Errorf("Unable to process transaction %s: %s", metaTransactionHash, err.Error()) return } if newTransaction { log.Debugw("process new transaction", "hash", tx.GetHash()) - updateUnsolidTxs(tx) - processTransaction(tx) + processNewTransaction(tx) + requestedTxs.Remove(tx.GetHash()) + updateRequestedTxs() } } -func processTransaction(transaction *value_transaction.ValueTransaction) { +func processNewTransaction(transaction *value_transaction.ValueTransaction) { Events.TransactionStored.Trigger(transaction) // store transaction hash for address in DB @@ -226,19 +122,136 @@ func processTransaction(transaction *value_transaction.ValueTransaction) { branchApprovers.Add(transactionHash) } - // update the solidity flags of this transaction and its approvers - if _, err := IsSolid(transaction); err != nil { + isSolid, err := isSolid(transactionHash) + if err != nil { log.Errorf("Unable to check solidity: %s", err.Error()) - return + } + // if the transaction was solidified propagate this information + if isSolid { + if err := propagateSolidity(transaction.GetHash()); err != nil { + log.Errorf("Unable to propagate solidity: %s", err.Error()) + } } } -func updateUnsolidTxs(tx *value_transaction.ValueTransaction) { - unsolidTxs.Remove(tx.GetHash()) - targetTime := time.Now().Add(time.Duration(-UnsolidInterval) * time.Second) - txs := unsolidTxs.Update(targetTime) - for _, tx := range txs { - requestTransaction(tx) +// isSolid checks whether the transaction with the given hash is solid. A transaction is solid, if it is +// either marked as solid or all its referenced transactions are in the database. +func isSolid(hash trinary.Hash) (bool, error) { + // the genesis is always solid + if hash == meta_transaction.BRANCH_NULL_HASH { + return true, nil + } + // if the transaction is not in the DB, request it + transaction, err := GetTransaction(hash) + if err != nil { + return false, err + } + if transaction == nil { + if requestedTxs.Add(hash) { + requestTransaction(hash) + } + return false, nil + } + + // check whether the transaction is marked solid + metadata, err := GetTransactionMetadata(hash, transactionmetadata.New) + if err != nil { + return false, err + } + if metadata.GetSolid() { + return true, nil + } + + branch := contains(transaction.GetBranchTransactionHash()) + trunk := contains(transaction.GetTrunkTransactionHash()) + + if !branch || !trunk { + return false, nil + } + // everything is good, mark the transaction as solid + return true, markSolid(transaction) +} + +func contains(hash trinary.Hash) bool { + if hash == meta_transaction.BRANCH_NULL_HASH { + return true + } + if contains, _ := ContainsTransaction(hash); !contains { + if requestedTxs.Add(hash) { + requestTransaction(hash) + } + return false + } + return true +} + +func isMarkedSolid(hash trinary.Hash) (bool, error) { + if hash == meta_transaction.BRANCH_NULL_HASH { + return true, nil + } + metadata, err := GetTransactionMetadata(hash, transactionmetadata.New) + if err != nil { + return false, err + } + return metadata.GetSolid(), nil +} + +func markSolid(transaction *value_transaction.ValueTransaction) error { + txMetadata, err := GetTransactionMetadata(transaction.GetHash(), transactionmetadata.New) + if err != nil { + return err + } + if txMetadata.SetSolid(true) { + log.Debugw("transaction solidified", "hash", transaction.GetHash()) + Events.TransactionSolid.Trigger(transaction) + return propagateSolidity(transaction.GetHash()) + } + return nil +} + +func propagateSolidity(transactionHash trinary.Trytes) error { + approvingTransactions, err := GetApprovers(transactionHash, approvers.New) + if err != nil { + return err + } + for _, hash := range approvingTransactions.GetHashes() { + approver, err := GetTransaction(hash) + if err != nil { + return err + } + if approver != nil { + branchSolid, err := isMarkedSolid(approver.GetBranchTransactionHash()) + if err != nil { + return err + } + if !branchSolid { + continue + } + trunkSolid, err := isMarkedSolid(approver.GetTrunkTransactionHash()) + if err != nil { + return err + } + if !trunkSolid { + continue + } + + if err := markSolid(approver); err != nil { + return err + } + } + } + return nil +} + +func updateRequestedTxs() { + targetTime := time.Now().Add(-UnsolidInterval) + txs := requestedTxs.Update(targetTime) + for _, txHash := range txs { + if contains, _ := ContainsTransaction(txHash); contains { + requestedTxs.Remove(txHash) + continue + } + requestTransaction(txHash) } } diff --git a/plugins/tangle/unsolidTxs.go b/plugins/tangle/unsolidTxs.go index 631da933afa558b0737266129e40e876fba2a7d3..36924fa75ff8ce1a757fc1f96695822ae03e6969 100644 --- a/plugins/tangle/unsolidTxs.go +++ b/plugins/tangle/unsolidTxs.go @@ -21,21 +21,24 @@ func NewUnsolidTxs() *UnsolidTxs { } } -func (u *UnsolidTxs) Add(hash string) { +func (u *UnsolidTxs) Add(hash string) bool { u.Lock() + defer u.Unlock() + _, contains := u.internal[hash] + if contains { + return false + } info := Info{ lastRequest: time.Now(), counter: 1, } u.internal[hash] = info - u.Unlock() + return true } func (u *UnsolidTxs) Remove(hash string) { u.Lock() - if _, exists := u.internal[hash]; !exists { - delete(u.internal, hash) - } + delete(u.internal, hash) u.Unlock() }