Skip to content
Snippets Groups Projects
Commit 919895a0 authored by Hans Moog's avatar Hans Moog
Browse files

Refactor: refactored the tangle

parent 6de58044
No related branches found
No related tags found
No related merge requests found
...@@ -10,8 +10,9 @@ import ( ...@@ -10,8 +10,9 @@ import (
type Events struct { type Events struct {
TransactionAttached *events.Event TransactionAttached *events.Event
TransactionSolid *events.Event TransactionSolid *events.Event
TransactionMissing *events.Event
MissingTransactionReceived *events.Event MissingTransactionReceived *events.Event
TransactionMissing *events.Event
TransactionUnsolidifiable *events.Event
TransactionRemoved *events.Event TransactionRemoved *events.Event
} }
...@@ -19,21 +20,20 @@ func newEvents() *Events { ...@@ -19,21 +20,20 @@ func newEvents() *Events {
return &Events{ return &Events{
TransactionAttached: events.NewEvent(cachedTransactionEvent), TransactionAttached: events.NewEvent(cachedTransactionEvent),
TransactionSolid: events.NewEvent(cachedTransactionEvent), TransactionSolid: events.NewEvent(cachedTransactionEvent),
MissingTransactionReceived: events.NewEvent(cachedTransactionEvent),
TransactionMissing: events.NewEvent(transactionIdEvent), TransactionMissing: events.NewEvent(transactionIdEvent),
MissingTransactionReceived: events.NewEvent(transactionIdEvent), TransactionUnsolidifiable: events.NewEvent(transactionIdEvent),
TransactionRemoved: events.NewEvent(transactionIdEvent), TransactionRemoved: events.NewEvent(transactionIdEvent),
} }
} }
func transactionIdEvent(handler interface{}, params ...interface{}) { func transactionIdEvent(handler interface{}, params ...interface{}) {
missingTransactionId := params[0].(transaction.Id) handler.(func(transaction.Id))(params[0].(transaction.Id))
handler.(func(transaction.Id))(missingTransactionId)
} }
func cachedTransactionEvent(handler interface{}, params ...interface{}) { func cachedTransactionEvent(handler interface{}, params ...interface{}) {
cachedTransaction := params[0].(*transaction.CachedTransaction) handler.(func(*transaction.CachedTransaction, *transactionmetadata.CachedTransactionMetadata))(
cachedTransactionMetadata := params[1].(*transactionmetadata.CachedTransactionMetadata) params[0].(*transaction.CachedTransaction).Retain(),
params[1].(*transactionmetadata.CachedTransactionMetadata).Retain().(*transactionmetadata.CachedTransactionMetadata),
handler.(func(*transaction.CachedTransaction, *transactionmetadata.CachedTransactionMetadata))(cachedTransaction.Retain().(*transaction.CachedTransaction), cachedTransactionMetadata.Retain().(*transactionmetadata.CachedTransactionMetadata)) )
} }
...@@ -19,3 +19,15 @@ func (cachedApprover *CachedApprover) Unwrap() *Approver { ...@@ -19,3 +19,15 @@ func (cachedApprover *CachedApprover) Unwrap() *Approver {
} }
} }
} }
type CachedApprovers []*CachedApprover
func (cachedApprovers CachedApprovers) Consume(consumer func(approver *Approver)) (consumed bool) {
for _, cachedApprover := range cachedApprovers {
consumed = consumed || cachedApprover.Consume(func(object objectstorage.StorableObject) {
consumer(object.(*Approver))
})
}
return
}
...@@ -8,10 +8,16 @@ type CachedTransaction struct { ...@@ -8,10 +8,16 @@ type CachedTransaction struct {
objectstorage.CachedObject objectstorage.CachedObject
} }
func (cachedTransaction *CachedTransaction) Retain() objectstorage.CachedObject { func (cachedTransaction *CachedTransaction) Retain() *CachedTransaction {
return &CachedTransaction{cachedTransaction.CachedObject.Retain()} return &CachedTransaction{cachedTransaction.CachedObject.Retain()}
} }
func (cachedTransaction *CachedTransaction) Consume(consumer func(object *Transaction)) bool {
return cachedTransaction.CachedObject.Consume(func(object objectstorage.StorableObject) {
consumer(object.(*Transaction))
})
}
func (cachedTransaction *CachedTransaction) Unwrap() *Transaction { func (cachedTransaction *CachedTransaction) Unwrap() *Transaction {
if untypedTransaction := cachedTransaction.Get(); untypedTransaction == nil { if untypedTransaction := cachedTransaction.Get(); untypedTransaction == nil {
return nil return nil
......
...@@ -4,7 +4,6 @@ import ( ...@@ -4,7 +4,6 @@ import (
"container/list" "container/list"
"time" "time"
"github.com/iotaledger/hive.go/syncutils"
"github.com/iotaledger/hive.go/types" "github.com/iotaledger/hive.go/types"
"github.com/iotaledger/goshimmer/packages/binary/tangle/model/approver" "github.com/iotaledger/goshimmer/packages/binary/tangle/model/approver"
...@@ -37,8 +36,6 @@ type Tangle struct { ...@@ -37,8 +36,6 @@ type Tangle struct {
storeTransactionsWorkerPool async.WorkerPool storeTransactionsWorkerPool async.WorkerPool
solidifierWorkerPool async.WorkerPool solidifierWorkerPool async.WorkerPool
cleanupWorkerPool async.WorkerPool cleanupWorkerPool async.WorkerPool
tangleMutex syncutils.RWMultiMutex
} }
// Constructor for the tangle. // Constructor for the tangle.
...@@ -104,18 +101,10 @@ func (tangle *Tangle) GetTransactionMetadata(transactionId transaction.Id) *tran ...@@ -104,18 +101,10 @@ func (tangle *Tangle) GetTransactionMetadata(transactionId transaction.Id) *tran
} }
// Retrieves the approvers of a transaction from the tangle. // Retrieves the approvers of a transaction from the tangle.
func (tangle *Tangle) GetApprover(approvedTransaction transaction.Id, approvingTransaction transaction.Id) *approver.CachedApprover { func (tangle *Tangle) GetApprovers(transactionId transaction.Id) approver.CachedApprovers {
keyToLoad := make([]byte, transaction.IdLength+transaction.IdLength) approvers := make(approver.CachedApprovers, 0)
copy(keyToLoad[:transaction.IdLength], approvedTransaction[:])
copy(keyToLoad[transaction.IdLength:], approvingTransaction[:])
return &approver.CachedApprover{CachedObject: tangle.approverStorage.Load(keyToLoad)}
}
func (tangle *Tangle) GetApprovers(transactionId transaction.Id) []*approver.CachedApprover {
approvers := make([]*approver.CachedApprover, 0)
tangle.approverStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool { tangle.approverStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool {
approvers = append(approvers, &approver.CachedApprover{CachedObject: cachedObject.Retain()}) approvers = append(approvers, &approver.CachedApprover{CachedObject: cachedObject})
return true return true
}, transactionId[:]) }, transactionId[:])
...@@ -125,37 +114,20 @@ func (tangle *Tangle) GetApprovers(transactionId transaction.Id) []*approver.Cac ...@@ -125,37 +114,20 @@ func (tangle *Tangle) GetApprovers(transactionId transaction.Id) []*approver.Cac
// Deletes a transaction from the tangle (i.e. for local snapshots) // Deletes a transaction from the tangle (i.e. for local snapshots)
func (tangle *Tangle) DeleteTransaction(transactionId transaction.Id) { func (tangle *Tangle) DeleteTransaction(transactionId transaction.Id) {
tangle.tangleMutex.RLock(transactionId) tangle.GetTransaction(transactionId).Consume(func(currentTransaction *transaction.Transaction) {
if !tangle.GetTransaction(transactionId).Consume(func(object objectstorage.StorableObject) {
tangle.tangleMutex.RUnlock(transactionId)
tangle.tangleMutex.Lock(transactionId)
currentTransaction := object.(*transaction.Transaction)
trunkTransactionId := currentTransaction.GetTrunkTransactionId() trunkTransactionId := currentTransaction.GetTrunkTransactionId()
trunkIdToDelete := make([]byte, transaction.IdLength+transaction.IdLength) tangle.deleteApprover(trunkTransactionId, transactionId)
copy(trunkIdToDelete[:transaction.IdLength], transactionId[:])
copy(trunkIdToDelete[transaction.IdLength:], trunkTransactionId[:])
tangle.approverStorage.Delete(trunkIdToDelete)
branchTransactionId := currentTransaction.GetBranchTransactionId() branchTransactionId := currentTransaction.GetBranchTransactionId()
if branchTransactionId != trunkTransactionId { if branchTransactionId != trunkTransactionId {
branchIdToDelete := make([]byte, transaction.IdLength+transaction.IdLength) tangle.deleteApprover(branchTransactionId, transactionId)
copy(branchIdToDelete[:transaction.IdLength], transactionId[:])
copy(branchIdToDelete[transaction.IdLength:], branchTransactionId[:])
tangle.approverStorage.Delete(branchIdToDelete)
} }
tangle.transactionStorage.Delete(transactionId[:])
tangle.transactionMetadataStorage.Delete(transactionId[:]) tangle.transactionMetadataStorage.Delete(transactionId[:])
tangle.missingTransactionsStorage.Delete(transactionId[:]) tangle.transactionStorage.Delete(transactionId[:])
tangle.tangleMutex.Unlock(transactionId)
tangle.Events.TransactionRemoved.Trigger(transactionId) tangle.Events.TransactionRemoved.Trigger(transactionId)
}) { })
tangle.tangleMutex.RUnlock(transactionId)
}
} }
// Marks the tangle as stopped, so it will not accept any new transactions (waits for all backgroundTasks to finish. // Marks the tangle as stopped, so it will not accept any new transactions (waits for all backgroundTasks to finish.
...@@ -164,6 +136,11 @@ func (tangle *Tangle) Shutdown() *Tangle { ...@@ -164,6 +136,11 @@ func (tangle *Tangle) Shutdown() *Tangle {
tangle.solidifierWorkerPool.ShutdownGracefully() tangle.solidifierWorkerPool.ShutdownGracefully()
tangle.cleanupWorkerPool.ShutdownGracefully() tangle.cleanupWorkerPool.ShutdownGracefully()
tangle.transactionStorage.Shutdown()
tangle.transactionMetadataStorage.Shutdown()
tangle.approverStorage.Shutdown()
tangle.missingTransactionsStorage.Shutdown()
return tangle return tangle
} }
...@@ -185,6 +162,7 @@ func (tangle *Tangle) Prune() error { ...@@ -185,6 +162,7 @@ func (tangle *Tangle) Prune() error {
// Worker that stores the transactions and calls the corresponding "Storage events" // Worker that stores the transactions and calls the corresponding "Storage events"
func (tangle *Tangle) storeTransactionWorker(tx *transaction.Transaction) { func (tangle *Tangle) storeTransactionWorker(tx *transaction.Transaction) {
// store transaction
var cachedTransaction *transaction.CachedTransaction var cachedTransaction *transaction.CachedTransaction
if _tmp, transactionIsNew := tangle.transactionStorage.StoreIfAbsent(tx); !transactionIsNew { if _tmp, transactionIsNew := tangle.transactionStorage.StoreIfAbsent(tx); !transactionIsNew {
return return
...@@ -192,35 +170,26 @@ func (tangle *Tangle) storeTransactionWorker(tx *transaction.Transaction) { ...@@ -192,35 +170,26 @@ func (tangle *Tangle) storeTransactionWorker(tx *transaction.Transaction) {
cachedTransaction = &transaction.CachedTransaction{CachedObject: _tmp} cachedTransaction = &transaction.CachedTransaction{CachedObject: _tmp}
} }
// store transaction metadata
transactionId := tx.GetId() transactionId := tx.GetId()
trunkTransactionID := tx.GetTrunkTransactionId()
branchTransactionID := tx.GetBranchTransactionId()
var lockBuilder syncutils.MultiMutexLockBuilder
if trunkTransactionID != transaction.EmptyId {
lockBuilder.AddLock(trunkTransactionID)
}
if branchTransactionID != transaction.EmptyId && branchTransactionID != trunkTransactionID {
lockBuilder.AddLock(branchTransactionID)
}
locks := lockBuilder.Build()
tangle.tangleMutex.Lock(locks...)
cachedTransactionMetadata := &transactionmetadata.CachedTransactionMetadata{CachedObject: tangle.transactionMetadataStorage.Store(transactionmetadata.New(transactionId))} cachedTransactionMetadata := &transactionmetadata.CachedTransactionMetadata{CachedObject: tangle.transactionMetadataStorage.Store(transactionmetadata.New(transactionId))}
// store trunk approver
trunkTransactionID := tx.GetTrunkTransactionId()
tangle.approverStorage.Store(approver.New(trunkTransactionID, transactionId)).Release() tangle.approverStorage.Store(approver.New(trunkTransactionID, transactionId)).Release()
if branchTransactionID != trunkTransactionID {
// store branch approver
if branchTransactionID := tx.GetBranchTransactionId(); branchTransactionID != trunkTransactionID {
tangle.approverStorage.Store(approver.New(branchTransactionID, transactionId)).Release() tangle.approverStorage.Store(approver.New(branchTransactionID, transactionId)).Release()
} }
tangle.tangleMutex.Unlock(locks...) // trigger events
if tangle.missingTransactionsStorage.DeleteIfPresent(transactionId[:]) { if tangle.missingTransactionsStorage.DeleteIfPresent(transactionId[:]) {
tangle.Events.MissingTransactionReceived.Trigger(transactionId) tangle.Events.MissingTransactionReceived.Trigger(cachedTransaction, cachedTransactionMetadata)
} }
tangle.Events.TransactionAttached.Trigger(cachedTransaction, cachedTransactionMetadata) tangle.Events.TransactionAttached.Trigger(cachedTransaction, cachedTransactionMetadata)
// check solidity
tangle.solidifierWorkerPool.Submit(func() { tangle.solidifierWorkerPool.Submit(func() {
tangle.solidifyTransactionWorker(cachedTransaction, cachedTransactionMetadata) tangle.solidifyTransactionWorker(cachedTransaction, cachedTransactionMetadata)
}) })
...@@ -268,15 +237,7 @@ func (tangle *Tangle) solidifyTransactionWorker(cachedTransaction *transaction.C ...@@ -268,15 +237,7 @@ func (tangle *Tangle) solidifyTransactionWorker(cachedTransaction *transaction.C
return true return true
} }
// 1. check tangle solidity return isTransactionMarkedAsSolid(transaction.GetTrunkTransactionId()) && isTransactionMarkedAsSolid(transaction.GetBranchTransactionId())
isTrunkSolid := isTransactionMarkedAsSolid(transaction.GetTrunkTransactionId())
isBranchSolid := isTransactionMarkedAsSolid(transaction.GetBranchTransactionId())
if isTrunkSolid && isBranchSolid {
// 2. check payload solidity
return true
}
return false
} }
popElementsFromStack := func(stack *list.List) (*transaction.CachedTransaction, *transactionmetadata.CachedTransactionMetadata) { popElementsFromStack := func(stack *list.List) (*transaction.CachedTransaction, *transactionmetadata.CachedTransactionMetadata) {
...@@ -309,16 +270,14 @@ func (tangle *Tangle) solidifyTransactionWorker(cachedTransaction *transaction.C ...@@ -309,16 +270,14 @@ func (tangle *Tangle) solidifyTransactionWorker(cachedTransaction *transaction.C
if isTransactionSolid(currentTransaction, currentTransactionMetadata) && currentTransactionMetadata.SetSolid(true) { if isTransactionSolid(currentTransaction, currentTransactionMetadata) && currentTransactionMetadata.SetSolid(true) {
tangle.Events.TransactionSolid.Trigger(currentCachedTransaction, currentCachedTransactionMetadata) tangle.Events.TransactionSolid.Trigger(currentCachedTransaction, currentCachedTransactionMetadata)
for _, cachedApprover := range tangle.GetApprovers(currentTransaction.GetId()) { tangle.GetApprovers(currentTransaction.GetId()).Consume(func(approver *approver.Approver) {
cachedApprover.Consume(func(object objectstorage.StorableObject) { approverTransactionId := approver.GetApprovingTransactionId()
approverTransactionId := object.(*approver.Approver).GetApprovingTransactionId()
solidificationStack.PushBack([2]interface{}{ solidificationStack.PushBack([2]interface{}{
tangle.GetTransaction(approverTransactionId), tangle.GetTransaction(approverTransactionId),
tangle.GetTransactionMetadata(approverTransactionId), tangle.GetTransactionMetadata(approverTransactionId),
})
}) })
} })
} }
// release cached results // release cached results
...@@ -336,20 +295,34 @@ func (tangle *Tangle) monitorMissingTransactionWorker(transactionId transaction. ...@@ -336,20 +295,34 @@ func (tangle *Tangle) monitorMissingTransactionWorker(transactionId transaction.
missingTransaction := object.(*missingtransaction.MissingTransaction) missingTransaction := object.(*missingtransaction.MissingTransaction)
if time.Since(missingTransaction.GetMissingSince()) >= MAX_MISSING_TIME_BEFORE_CLEANUP { if time.Since(missingTransaction.GetMissingSince()) >= MAX_MISSING_TIME_BEFORE_CLEANUP {
tangle.cleanupWorkerPool.Submit(func() { tangle.cleanupWorker(missingTransaction.GetTransactionId()) }) tangle.cleanupWorkerPool.Submit(func() {
tangle.Events.TransactionUnsolidifiable.Trigger(transactionId)
tangle.deleteSubtangle(missingTransaction.GetTransactionId())
})
} else { } else {
// TRIGGER STILL MISSING EVENT?
scheduleNextMissingCheck(transactionId) scheduleNextMissingCheck(transactionId)
} }
}) })
}) })
} }
tangle.Events.TransactionMissing.Trigger(transactionId) tangle.Events.TransactionMissing.Trigger(transactionId)
scheduleNextMissingCheck(transactionId) scheduleNextMissingCheck(transactionId)
} }
// Worker that recursively cleans up the approvers of a unsolidifiable missing transaction. func (tangle *Tangle) deleteApprover(approvedTransaction transaction.Id, approvingTransaction transaction.Id) {
func (tangle *Tangle) cleanupWorker(transactionId transaction.Id) { idToDelete := make([]byte, transaction.IdLength+transaction.IdLength)
copy(idToDelete[:transaction.IdLength], approvedTransaction[:])
copy(idToDelete[transaction.IdLength:], approvingTransaction[:])
tangle.approverStorage.Delete(idToDelete)
}
// Deletes a transaction and all of its approvers (recursively).
func (tangle *Tangle) deleteSubtangle(transactionId transaction.Id) {
cleanupStack := list.New() cleanupStack := list.New()
cleanupStack.PushBack(transactionId) cleanupStack.PushBack(transactionId)
...@@ -363,16 +336,14 @@ func (tangle *Tangle) cleanupWorker(transactionId transaction.Id) { ...@@ -363,16 +336,14 @@ func (tangle *Tangle) cleanupWorker(transactionId transaction.Id) {
tangle.DeleteTransaction(currentTransactionId) tangle.DeleteTransaction(currentTransactionId)
for _, cachedApprover := range tangle.GetApprovers(currentTransactionId) { tangle.GetApprovers(currentTransactionId).Consume(func(approver *approver.Approver) {
cachedApprover.Consume(func(object objectstorage.StorableObject) { approverId := approver.GetApprovingTransactionId()
approverId := object.(*approver.Approver).GetApprovingTransactionId()
if _, transactionProcessed := processedTransactions[approverId]; !transactionProcessed { if _, transactionProcessed := processedTransactions[approverId]; !transactionProcessed {
cleanupStack.PushBack(approverId) cleanupStack.PushBack(approverId)
processedTransactions[approverId] = types.Void processedTransactions[approverId] = types.Void
} }
}) })
}
} }
} }
...@@ -47,11 +47,19 @@ func TestTangle_AttachTransaction(t *testing.T) { ...@@ -47,11 +47,19 @@ func TestTangle_AttachTransaction(t *testing.T) {
} }
tangle.Events.TransactionAttached.Attach(events.NewClosure(func(cachedTransaction *transaction.CachedTransaction, cachedTransactionMetadata *transactionmetadata.CachedTransactionMetadata) { tangle.Events.TransactionAttached.Attach(events.NewClosure(func(cachedTransaction *transaction.CachedTransaction, cachedTransactionMetadata *transactionmetadata.CachedTransactionMetadata) {
fmt.Println("ATTACHED:", cachedTransaction.Unwrap().GetId()) cachedTransaction.Consume(func(transaction *transaction.Transaction) {
fmt.Println("ATTACHED:", transaction.GetId())
})
})) }))
tangle.Events.TransactionSolid.Attach(events.NewClosure(func(cachedTransaction *transaction.CachedTransaction, cachedTransactionMetadata *transactionmetadata.CachedTransactionMetadata) { tangle.Events.TransactionSolid.Attach(events.NewClosure(func(cachedTransaction *transaction.CachedTransaction, cachedTransactionMetadata *transactionmetadata.CachedTransactionMetadata) {
fmt.Println("SOLID:", cachedTransaction.Unwrap().GetId()) cachedTransaction.Consume(func(transaction *transaction.Transaction) {
fmt.Println("SOLID:", transaction.GetId())
})
}))
tangle.Events.TransactionUnsolidifiable.Attach(events.NewClosure(func(transactionId transaction.Id) {
fmt.Println("UNSOLIDIFIABLE:", transactionId)
})) }))
tangle.Events.TransactionMissing.Attach(events.NewClosure(func(transactionId transaction.Id) { tangle.Events.TransactionMissing.Attach(events.NewClosure(func(transactionId transaction.Id) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment