From 3e654ff07e60f6df083dba07d63db2c5cb9ef67e Mon Sep 17 00:00:00 2001 From: Hans Moog <hm@mkjc.net> Date: Fri, 11 Oct 2019 00:12:05 +0200 Subject: [PATCH] Feat: CA consensus is working - started implementing test cases --- packages/ca/events.go | 5 + packages/ca/heartbeat_manager.go | 67 +++++++++---- packages/ca/heartbeat_manager_test.go | 71 ++++++++++++- packages/ca/neighbor_manager.go | 138 +++++++++++++++++++++++--- packages/ca/opinion.go | 89 +++++++++++++++++ packages/ca/opinion_register.go | 55 ++++++++++ packages/ca/opinion_register_test.go | 16 +++ packages/ca/statement_chain.go | 50 +++++----- 8 files changed, 431 insertions(+), 60 deletions(-) create mode 100644 packages/ca/opinion.go create mode 100644 packages/ca/opinion_register.go create mode 100644 packages/ca/opinion_register_test.go diff --git a/packages/ca/events.go b/packages/ca/events.go index 8611cf42..710eaa68 100644 --- a/packages/ca/events.go +++ b/packages/ca/events.go @@ -17,6 +17,7 @@ type NeighborManagerEvents struct { NeighborIdle *events.Event ChainReset *events.Event StatementMissing *events.Event + UpdateOpinion *events.Event } type StatementChainEvents struct { @@ -30,3 +31,7 @@ func HashCaller(handler interface{}, params ...interface{}) { func IdentityNeighborManagerCaller(handler interface{}, params ...interface{}) { handler.(func(*identity.Identity, *NeighborManager))(params[0].(*identity.Identity), params[1].(*NeighborManager)) } + +func StringBoolCaller(handler interface{}, params ...interface{}) { + handler.(func(string, bool))(params[0].(string), params[1].(bool)) +} diff --git a/packages/ca/heartbeat_manager.go b/packages/ca/heartbeat_manager.go index 00ae5b0b..9790df0d 100644 --- a/packages/ca/heartbeat_manager.go +++ b/packages/ca/heartbeat_manager.go @@ -4,6 +4,8 @@ import ( "sync" "time" + "github.com/iotaledger/goshimmer/packages/typeutils" + "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/identity" @@ -21,11 +23,10 @@ type HeartbeatManager struct { statementChain *StatementChain droppedNeighbors [][]byte neighborManagers map[string]*NeighborManager - initialOpinions map[string]bool + opinions *OpinionRegister droppedNeighborsMutex sync.RWMutex neighborManagersMutex sync.RWMutex - initialOpinionsMutex sync.RWMutex } func NewHeartbeatManager(identity *identity.Identity, options ...HeartbeatManagerOption) *HeartbeatManager { @@ -41,7 +42,7 @@ func NewHeartbeatManager(identity *identity.Identity, options ...HeartbeatManage statementChain: NewStatementChain(), droppedNeighbors: make([][]byte, 0), neighborManagers: make(map[string]*NeighborManager), - initialOpinions: make(map[string]bool), + opinions: NewOpinionRegister(), } } @@ -54,6 +55,10 @@ func (heartbeatManager *HeartbeatManager) AddNeighbor(neighborIdentity *identity if _, exists := heartbeatManager.neighborManagers[neighborIdentity.StringIdentifier]; !exists { newNeighborManager := NewNeighborManager() + newNeighborManager.Events.UpdateOpinion.Attach(events.NewClosure(func(transactionId string, liked bool) { + heartbeatManager.processNeighborOpinionUpdate(neighborIdentity, transactionId, liked) + })) + heartbeatManager.neighborManagers[neighborIdentity.StringIdentifier] = newNeighborManager heartbeatManager.neighborManagersMutex.Unlock() @@ -90,15 +95,45 @@ func (heartbeatManager *HeartbeatManager) RemoveNeighbor(neighborIdentity *ident } func (heartbeatManager *HeartbeatManager) InitialDislike(transactionId []byte) { - heartbeatManager.initialOpinionsMutex.Lock() - heartbeatManager.initialOpinions[string(transactionId)] = false - heartbeatManager.initialOpinionsMutex.Unlock() + heartbeatManager.opinions.CreateOpinion(typeutils.BytesToString(transactionId)).SetLiked(false) } func (heartbeatManager *HeartbeatManager) InitialLike(transactionId []byte) { - heartbeatManager.initialOpinionsMutex.Lock() - heartbeatManager.initialOpinions[string(transactionId)] = true - heartbeatManager.initialOpinionsMutex.Unlock() + heartbeatManager.opinions.CreateOpinion(typeutils.BytesToString(transactionId)).SetLiked(true) +} + +func (heartbeatManager *HeartbeatManager) processNeighborOpinionUpdate(neighbor *identity.Identity, transactionId string, liked bool) { + opinion := heartbeatManager.opinions.GetOpinion(transactionId) + if !opinion.Exists() || opinion.IsLiked() != liked { + totalWeight := len(heartbeatManager.neighborManagers) + threshold := float64(totalWeight) / 2 + + likedWeight := 0 + dislikedWeight := 0 + for _, neighborManager := range heartbeatManager.neighborManagers { + weightOfNeighbor := 1 + + if neighborOpinionLiked, exists := neighborManager.opinions[transactionId]; exists { + if neighborOpinionLiked { + likedWeight += weightOfNeighbor + } else { + dislikedWeight += weightOfNeighbor + } + } + } + + if likedWeight > dislikedWeight && likedWeight > int(threshold) { + if !opinion.Exists() || !opinion.IsLiked() { + opinion = heartbeatManager.opinions.CreateOpinion(transactionId) + opinion.SetLiked(true) + } + } else if dislikedWeight >= likedWeight && dislikedWeight >= int(threshold) { + if !opinion.Exists() || opinion.IsLiked() { + opinion = heartbeatManager.opinions.CreateOpinion(transactionId) + opinion.SetLiked(false) + } + } + } } func (heartbeatManager *HeartbeatManager) GenerateHeartbeat() (result *heartbeat.Heartbeat, err errors.IdentifiableError) { @@ -169,7 +204,7 @@ func (heartbeatManager *HeartbeatManager) generateMainStatement() (result *heart if signingErr := mainStatement.Sign(heartbeatManager.identity); signingErr == nil { result = mainStatement - heartbeatManager.resetInitialOpinions() + heartbeatManager.opinions.ApplyPendingOpinions() heartbeatManager.statementChain.tail = mainStatement } else { err = signingErr @@ -205,11 +240,11 @@ func (heartbeatManager *HeartbeatManager) generateNeighborStatements() (result m func (heartbeatManager *HeartbeatManager) generateToggledTransactions() []*heartbeat.ToggledTransaction { toggledTransactions := make([]*heartbeat.ToggledTransaction, 0) - for transactionId, liked := range heartbeatManager.initialOpinions { - if !liked { + for transactionId, opinion := range heartbeatManager.opinions.GetPendingOpinions() { + if !opinion.IsLiked() { newToggledTransaction := heartbeat.NewToggledTransaction() - newToggledTransaction.SetInitialStatement(true) - newToggledTransaction.SetFinalStatement(false) + newToggledTransaction.SetInitialStatement(opinion.IsInitial()) + newToggledTransaction.SetFinalStatement(opinion.IsFinalized()) newToggledTransaction.SetTransactionId([]byte(transactionId)) toggledTransactions = append(toggledTransactions, newToggledTransaction) @@ -218,7 +253,3 @@ func (heartbeatManager *HeartbeatManager) generateToggledTransactions() []*heart return toggledTransactions } - -func (heartbeatManager *HeartbeatManager) resetInitialOpinions() { - heartbeatManager.initialOpinions = make(map[string]bool) -} diff --git a/packages/ca/heartbeat_manager_test.go b/packages/ca/heartbeat_manager_test.go index 83cd9c3e..85b270ed 100644 --- a/packages/ca/heartbeat_manager_test.go +++ b/packages/ca/heartbeat_manager_test.go @@ -4,6 +4,9 @@ import ( "crypto/rand" "fmt" "testing" + "time" + + "github.com/iotaledger/goshimmer/packages/typeutils" "github.com/iotaledger/goshimmer/packages/events" @@ -17,6 +20,69 @@ func generateRandomTransactionId() (result []byte) { return } +type virtualNode struct { + identity *identity.Identity + heartbeatManager *HeartbeatManager +} + +func generateVirtualNetwork(numberOfNodes int) (result []*virtualNode) { + for i := 0; i < numberOfNodes; i++ { + nodeIdentity := identity.GenerateRandomIdentity() + + virtualNode := &virtualNode{ + identity: nodeIdentity, + heartbeatManager: NewHeartbeatManager(nodeIdentity), + } + + result = append(result, virtualNode) + } + + for i := 0; i < numberOfNodes; i++ { + for j := 0; j < numberOfNodes; j++ { + if i != j { + result[i].heartbeatManager.AddNeighbor(result[j].identity) + } + } + } + + go func() { + for { + for _, node := range result { + heartbeat, err := node.heartbeatManager.GenerateHeartbeat() + if err != nil { + fmt.Println(err) + + return + } + + for _, otherNode := range result { + if otherNode != node { + otherNode.heartbeatManager.ApplyHeartbeat(heartbeat) + } + } + } + + time.Sleep(700 * time.Millisecond) + } + }() + + return +} + +func TestConsensus(t *testing.T) { + virtualNetwork := generateVirtualNetwork(5) + + transactionId := generateRandomTransactionId() + + virtualNetwork[0].heartbeatManager.InitialDislike(transactionId) + virtualNetwork[1].heartbeatManager.InitialDislike(transactionId) + virtualNetwork[2].heartbeatManager.InitialDislike(transactionId) + + time.Sleep(1 * time.Second) + + fmt.Println(virtualNetwork[4].heartbeatManager.opinions.GetOpinion(typeutils.BytesToString(transactionId)).IsLiked()) +} + func TestHeartbeatManager_GenerateHeartbeat(t *testing.T) { ownIdentity := identity.GenerateRandomIdentity() neighborIdentity := identity.GenerateRandomIdentity() @@ -31,7 +97,6 @@ func TestHeartbeatManager_GenerateHeartbeat(t *testing.T) { heartbeatManager1.InitialDislike(generateRandomTransactionId()) heartbeatManager1.InitialDislike(generateRandomTransactionId()) heartbeatManager1.InitialLike(generateRandomTransactionId()) - heartbeat1, err := heartbeatManager1.GenerateHeartbeat() if err != nil { t.Error(err) @@ -67,14 +132,14 @@ func TestHeartbeatManager_GenerateHeartbeat(t *testing.T) { return } + fmt.Println(heartbeat2) + if err = heartbeatManager1.ApplyHeartbeat(heartbeat2); err != nil { t.Error(err) return } - fmt.Println(heartbeat2) - heartbeat3, err := heartbeatManager1.GenerateHeartbeat() if err != nil { t.Error(err) diff --git a/packages/ca/neighbor_manager.go b/packages/ca/neighbor_manager.go index 07e0936a..73d67896 100644 --- a/packages/ca/neighbor_manager.go +++ b/packages/ca/neighbor_manager.go @@ -2,7 +2,6 @@ package ca import ( "bytes" - "fmt" "strconv" "github.com/iotaledger/goshimmer/packages/typeutils" @@ -23,6 +22,7 @@ type NeighborManager struct { heartbeats map[string]*heartbeat.Heartbeat mainChain *StatementChain neighborChains map[string]*StatementChain + opinions map[string]bool previouslyReportedHeartbeatHash []byte } @@ -34,15 +34,23 @@ func NewNeighborManager(options ...NeighborManagerOption) *NeighborManager { }), RemoveNeighbor: events.NewEvent(func(handler interface{}, params ...interface{}) { + }), + NeighborActive: events.NewEvent(func(handler interface{}, params ...interface{}) { + + }), + NeighborIdle: events.NewEvent(func(handler interface{}, params ...interface{}) { + }), ChainReset: events.NewEvent(events.CallbackCaller), StatementMissing: events.NewEvent(HashCaller), + UpdateOpinion: events.NewEvent(StringBoolCaller), }, options: DEFAULT_NEIGHBOR_MANAGER_OPTIONS.Override(options...), mainChain: NewStatementChain(), missingHeartbeats: make(map[string]bool), pendingHeartbeats: make(map[string]*heartbeat.Heartbeat), heartbeats: make(map[string]*heartbeat.Heartbeat), + opinions: make(map[string]bool), neighborChains: make(map[string]*StatementChain), } } @@ -50,6 +58,7 @@ func NewNeighborManager(options ...NeighborManagerOption) *NeighborManager { func (neighborManager *NeighborManager) Reset() { neighborManager.mainChain.Reset() neighborManager.neighborChains = make(map[string]*StatementChain) + neighborManager.opinions = make(map[string]bool) } func (neighborManager *NeighborManager) storeHeartbeat(heartbeat *heartbeat.Heartbeat) (err errors.IdentifiableError) { @@ -113,7 +122,7 @@ func (neighborManager *NeighborManager) applyPendingHeartbeats() (err errors.Ide } for _, sortedHeartbeat := range sortedPendingHeartbeats { - if applicationErr := neighborManager.applyHeartbeat(sortedHeartbeat); applicationErr != nil { + if applicationErr := neighborManager.applySolidHeartbeat(sortedHeartbeat); applicationErr != nil { err = applicationErr return @@ -180,22 +189,21 @@ func (neighborManager *NeighborManager) markIdleNeighbors(neighborStatements map for neighborId := range neighborStatements { if _, neighborExists := idleNeighbors[neighborId]; neighborExists { - // TRIGGER ACTIVE + neighborManager.Events.NeighborActive.Trigger(neighborId) delete(idleNeighbors, neighborId) } } - for _, x := range idleNeighbors { - // TRIGGER IDLE - if false { - fmt.Println(x) - } + for neighborId := range idleNeighbors { + neighborManager.Events.NeighborIdle.Trigger(neighborId) } } func (neighborManager *NeighborManager) updateStatementChains(mainStatement *heartbeat.OpinionStatement, neighborStatements map[string][]*heartbeat.OpinionStatement) (err errors.IdentifiableError) { - neighborManager.mainChain.AddStatement(mainStatement) + if err = neighborManager.mainChain.AddStatement(mainStatement); err != nil { + return + } for neighborId, statementsOfNeighbor := range neighborStatements { neighborChain, neighborChainErr := neighborManager.addNeighborChain(neighborId) @@ -206,24 +214,128 @@ func (neighborManager *NeighborManager) updateStatementChains(mainStatement *hea } for _, neighborStatement := range statementsOfNeighbor { - neighborChain.AddStatement(neighborStatement) + if statementErr := neighborChain.AddStatement(neighborStatement); statementErr != nil { + err = statementErr + + return + } } } return } -func (neighborManager *NeighborManager) applyHeartbeat(heartbeat *heartbeat.Heartbeat) (err errors.IdentifiableError) { +func (neighborManager *NeighborManager) applySolidHeartbeat(heartbeat *heartbeat.Heartbeat) (err errors.IdentifiableError) { mainStatement := heartbeat.GetMainStatement() neighborStatements := heartbeat.GetNeighborStatements() neighborManager.removeDroppedNeighbors(heartbeat.GetDroppedNeighbors()) - neighborManager.markIdleNeighbors(neighborStatements) - if err = neighborManager.updateStatementChains(mainStatement, neighborStatements); err != nil { return } + if err = neighborManager.updateNeighborManager(); err != nil { + return + } + + return +} + +func (neighborManager *NeighborManager) updateNeighborManager() (err errors.IdentifiableError) { + updatedOpinions, verificationErr := neighborManager.retrieveAndVerifyUpdates() + if verificationErr != nil { + err = verificationErr + + return + } + + for transactionId, liked := range updatedOpinions { + if currentlyLiked, opinionExists := neighborManager.opinions[transactionId]; !opinionExists || currentlyLiked != liked { + neighborManager.opinions[transactionId] = liked + + neighborManager.Events.UpdateOpinion.Trigger(transactionId, liked) + } + } + + return +} + +func (neighborManager *NeighborManager) retrieveAndVerifyUpdates() (updates map[string]bool, err errors.IdentifiableError) { + updates = make(map[string]bool) + + // retrieve required parameters + totalWeight := len(neighborManager.neighborChains) + threshold := float64(totalWeight) / 2 + opinionsOfNeighbors := neighborManager.getAccumulatedPendingOpinionsOfNeighbors() + + mainChainInitialOpinions := make(map[string]*Opinion) + mainChainOpinionChanges := make(map[string]*Opinion) + for transactionId, opinion := range neighborManager.mainChain.GetOpinions().GetPendingOpinions() { + if opinion.IsInitial() { + mainChainInitialOpinions[transactionId] = opinion + } else { + mainChainOpinionChanges[transactionId] = opinion + } + } + // always consider initial opinions + for transactionId, opinion := range mainChainInitialOpinions { + updates[transactionId] = opinion.IsLiked() + } + + // consider opinions that have seen enough neighbors + for transactionId, opinion := range opinionsOfNeighbors { + if opinion[0] > opinion[1] && opinion[0] > int(threshold) { + // main statement "should" like it + if changedOpinion := mainChainOpinionChanges[transactionId]; !changedOpinion.Exists() || !changedOpinion.IsLiked() { + if initialOpinion := mainChainInitialOpinions[transactionId]; !initialOpinion.Exists() || !initialOpinion.IsLiked() { + err = ErrMalformedHeartbeat.Derive("main statement should like transaction") + + return + } else { + updates[transactionId] = true + } + } else { + updates[transactionId] = true + } + } else if opinion[1] > opinion[0] && opinion[1] >= int(threshold) { + // main statement "should" dislike it + if changedOpinion := mainChainOpinionChanges[transactionId]; !changedOpinion.Exists() || changedOpinion.IsLiked() { + if initialOpinion := mainChainInitialOpinions[transactionId]; !initialOpinion.Exists() || initialOpinion.IsLiked() { + err = ErrMalformedHeartbeat.Derive("main statement should dislike transaction") + + return + } else { + updates[transactionId] = false + } + } else { + updates[transactionId] = false + } + } + } + + return +} + +func (neighborManager *NeighborManager) getAccumulatedPendingOpinionsOfNeighbors() (result map[string][]int) { + result = make(map[string][]int) + + for _, neighborChain := range neighborManager.neighborChains { + for transactionId, pendingOpinion := range neighborChain.GetOpinions().GetPendingOpinions() { + opinion, exists := result[transactionId] + if !exists { + opinion = make([]int, 2) + + result[transactionId] = opinion + } + + weightOfNeighbor := 1 + if pendingOpinion.IsLiked() { + opinion[0] += weightOfNeighbor + } else { + opinion[1] += weightOfNeighbor + } + } + } return } diff --git a/packages/ca/opinion.go b/packages/ca/opinion.go new file mode 100644 index 00000000..45e269af --- /dev/null +++ b/packages/ca/opinion.go @@ -0,0 +1,89 @@ +package ca + +import ( + "sync" +) + +type Opinion struct { + initial bool + liked bool + finalized bool + pending bool + + initialMutex sync.RWMutex + likedMutex sync.RWMutex + finalizedMutex sync.RWMutex + pendingMutex sync.RWMutex +} + +func NewOpinion() *Opinion { + return &Opinion{} +} + +func (opinion *Opinion) IsLiked() bool { + opinion.likedMutex.RLock() + defer opinion.likedMutex.RUnlock() + + return opinion.liked +} + +func (opinion *Opinion) SetLiked(liked bool) *Opinion { + opinion.likedMutex.Lock() + defer opinion.likedMutex.Unlock() + + opinion.liked = liked + + return opinion +} + +func (opinion *Opinion) IsInitial() bool { + opinion.initialMutex.RLock() + defer opinion.initialMutex.RLock() + + return opinion.initial +} + +func (opinion *Opinion) SetInitial(initial bool) *Opinion { + opinion.initialMutex.Lock() + defer opinion.initialMutex.Unlock() + + opinion.initial = initial + + return opinion +} + +func (opinion *Opinion) IsFinalized() bool { + opinion.finalizedMutex.RLock() + defer opinion.finalizedMutex.RLock() + + return opinion.finalized +} + +func (opinion *Opinion) SetFinalized(finalized bool) *Opinion { + opinion.finalizedMutex.Lock() + defer opinion.finalizedMutex.Unlock() + + opinion.finalized = finalized + + return opinion +} + +func (opinion *Opinion) IsPending() bool { + opinion.pendingMutex.RLock() + defer opinion.pendingMutex.RUnlock() + + return opinion.pending +} + +func (opinion *Opinion) SetPending(pending bool) *Opinion { + opinion.pendingMutex.Lock() + defer opinion.pendingMutex.Unlock() + + opinion.pending = pending + + return opinion +} + +func (opinion *Opinion) Exists() bool { + return opinion != nil +} diff --git a/packages/ca/opinion_register.go b/packages/ca/opinion_register.go new file mode 100644 index 00000000..6714f20d --- /dev/null +++ b/packages/ca/opinion_register.go @@ -0,0 +1,55 @@ +package ca + +type OpinionRegister struct { + pendingOpinions map[string]*Opinion + appliedOpinions map[string]*Opinion +} + +func NewOpinionRegister() *OpinionRegister { + return &OpinionRegister{ + pendingOpinions: make(map[string]*Opinion), + appliedOpinions: make(map[string]*Opinion), + } +} + +func (opinionRegister *OpinionRegister) GetPendingOpinions() map[string]*Opinion { + return opinionRegister.pendingOpinions +} + +func (opinionRegister *OpinionRegister) GetAppliedOpinions() map[string]*Opinion { + return opinionRegister.appliedOpinions +} + +func (opinionRegister *OpinionRegister) GetOpinion(transactionId string) (opinion *Opinion) { + if changedOpinion := opinionRegister.pendingOpinions[transactionId]; changedOpinion.Exists() { + opinion = changedOpinion + } else { + opinion = opinionRegister.appliedOpinions[transactionId] + } + + return +} + +func (opinionRegister *OpinionRegister) CreateOpinion(transactionId string) (opinion *Opinion) { + if opinion = opinionRegister.GetOpinion(transactionId); opinion.Exists() && opinion.IsPending() { + return + } + + opinion = NewOpinion() + opinion.SetInitial(true) + opinion.SetPending(true) + + opinionRegister.pendingOpinions[transactionId] = opinion + + return opinion +} + +func (opinionRegister *OpinionRegister) ApplyPendingOpinions() { + for transactionId, opinion := range opinionRegister.pendingOpinions { + opinion.SetPending(false) + + opinionRegister.appliedOpinions[transactionId] = opinion + } + + opinionRegister.pendingOpinions = make(map[string]*Opinion) +} diff --git a/packages/ca/opinion_register_test.go b/packages/ca/opinion_register_test.go new file mode 100644 index 00000000..54b4f25d --- /dev/null +++ b/packages/ca/opinion_register_test.go @@ -0,0 +1,16 @@ +package ca + +import ( + "fmt" + "testing" +) + +func TestOpinionRegister_GetOpinion(t *testing.T) { + opinionRegister := NewOpinionRegister() + + x := opinionRegister.CreateOpinion("ABC") + + fmt.Println(x.Exists()) + + fmt.Println(opinionRegister.GetOpinion("ABC").Exists()) +} diff --git a/packages/ca/statement_chain.go b/packages/ca/statement_chain.go index 400332f3..3107ee09 100644 --- a/packages/ca/statement_chain.go +++ b/packages/ca/statement_chain.go @@ -15,10 +15,9 @@ import ( type StatementChain struct { Events StatementChainEvents - pendingTransactionStatuses map[string]bool - transactionStatuses map[string]bool - statements map[string]*heartbeat.OpinionStatement - tail *heartbeat.OpinionStatement + opinions *OpinionRegister + statements map[string]*heartbeat.OpinionStatement + tail *heartbeat.OpinionStatement } func NewStatementChain() *StatementChain { @@ -26,22 +25,11 @@ func NewStatementChain() *StatementChain { Events: StatementChainEvents{ Reset: events.NewEvent(events.CallbackCaller), }, - pendingTransactionStatuses: make(map[string]bool), - transactionStatuses: make(map[string]bool), - statements: make(map[string]*heartbeat.OpinionStatement), + opinions: NewOpinionRegister(), + statements: make(map[string]*heartbeat.OpinionStatement), } } -func (statementChain *StatementChain) getTransactionStatus(transactionId string) (result bool, exists bool) { - if result, exists = statementChain.pendingTransactionStatuses[transactionId]; exists { - return - } - - result, exists = statementChain.transactionStatuses[transactionId] - - return -} - func (statementChain *StatementChain) AddStatement(statement *heartbeat.OpinionStatement) errors.IdentifiableError { previousStatementHash := statement.GetPreviousStatementHash() lastAppliedStatement := statementChain.tail @@ -54,16 +42,25 @@ func (statementChain *StatementChain) AddStatement(statement *heartbeat.OpinionS transactionId := typeutils.BytesToString(toggledTransaction.GetTransactionId()) if toggledTransaction.IsInitialStatement() { - if _, exists := statementChain.getTransactionStatus(transactionId); exists { + opinion := statementChain.opinions.GetOpinion(transactionId) + if opinion.Exists() { return ErrMalformedHeartbeat.Derive("two initial statements for the same transaction") } - statementChain.pendingTransactionStatuses[transactionId] = false + statementChain.opinions.CreateOpinion(transactionId).SetLiked(false) } else if toggledTransaction.IsFinalStatement() { // finalize -> clean up } else { - if currentValue, exists := statementChain.getTransactionStatus(transactionId); exists { - statementChain.pendingTransactionStatuses[transactionId] = !currentValue + opinion := statementChain.opinions.GetOpinion(transactionId) + if opinion.Exists() { + if opinion.IsPending() { + return ErrMalformedHeartbeat.Derive("two changed statements for the same transaction") + } + + opinion.SetInitial(false) + opinion.SetPending(true) + + statementChain.opinions.pendingOpinions[transactionId] = opinion } } } @@ -75,11 +72,7 @@ func (statementChain *StatementChain) AddStatement(statement *heartbeat.OpinionS } func (statementChain *StatementChain) ApplyPendingTransactionStatusChanges() { - for transactionId, value := range statementChain.pendingTransactionStatuses { - statementChain.transactionStatuses[transactionId] = value - } - - statementChain.pendingTransactionStatuses = make(map[string]bool) + statementChain.opinions.ApplyPendingOpinions() } func (statementChain *StatementChain) GetStatement(statementHash []byte) *heartbeat.OpinionStatement { @@ -87,6 +80,7 @@ func (statementChain *StatementChain) GetStatement(statementHash []byte) *heartb } func (statementChain *StatementChain) Reset() { + statementChain.opinions = NewOpinionRegister() statementChain.statements = make(map[string]*heartbeat.OpinionStatement) statementChain.tail = nil @@ -96,3 +90,7 @@ func (statementChain *StatementChain) Reset() { func (statementChain *StatementChain) GetTail() *heartbeat.OpinionStatement { return statementChain.tail } + +func (statementChain *StatementChain) GetOpinions() *OpinionRegister { + return statementChain.opinions +} -- GitLab