From 825a5dc31db4b3be8acd1c2fb17cf0e4b65dbd88 Mon Sep 17 00:00:00 2001 From: Hans Moog <hm@mkjc.net> Date: Fri, 4 Oct 2019 17:02:54 +0200 Subject: [PATCH] Feat: heartbeats now update opinions --- ...{heartbeat_manager_events.go => events.go} | 17 ++ packages/ca/heartbeat/heartbeat.go | 19 ++ packages/ca/heartbeat/proto/heartbeat.pb.go | 26 ++- packages/ca/heartbeat/proto/heartbeat.proto | 4 +- packages/ca/heartbeat_chain.go | 16 -- packages/ca/heartbeat_manager.go | 105 +++++++-- packages/ca/heartbeat_manager_test.go | 3 + packages/ca/neighbor_manager.go | 209 +++++++++++------- packages/ca/neighbor_manager_events.go | 10 - packages/ca/neighbor_manager_options.go | 15 ++ packages/ca/statement_chain.go | 101 ++++----- packages/ca/statement_chain_events.go | 14 -- 12 files changed, 342 insertions(+), 197 deletions(-) rename packages/ca/{heartbeat_manager_events.go => events.go} (51%) delete mode 100644 packages/ca/heartbeat_chain.go delete mode 100644 packages/ca/neighbor_manager_events.go delete mode 100644 packages/ca/statement_chain_events.go diff --git a/packages/ca/heartbeat_manager_events.go b/packages/ca/events.go similarity index 51% rename from packages/ca/heartbeat_manager_events.go rename to packages/ca/events.go index 01179eea..8611cf42 100644 --- a/packages/ca/heartbeat_manager_events.go +++ b/packages/ca/events.go @@ -10,6 +10,23 @@ type HeartbeatManagerEvents struct { RemoveNeighbor *events.Event } +type NeighborManagerEvents struct { + AddNeighbor *events.Event + RemoveNeighbor *events.Event + NeighborActive *events.Event + NeighborIdle *events.Event + ChainReset *events.Event + StatementMissing *events.Event +} + +type StatementChainEvents struct { + Reset *events.Event +} + +func HashCaller(handler interface{}, params ...interface{}) { + handler.(func([]byte))(params[0].([]byte)) +} + func IdentityNeighborManagerCaller(handler interface{}, params ...interface{}) { handler.(func(*identity.Identity, *NeighborManager))(params[0].(*identity.Identity), params[1].(*NeighborManager)) } diff --git a/packages/ca/heartbeat/heartbeat.go b/packages/ca/heartbeat/heartbeat.go index 0873ff99..fe7df8e5 100644 --- a/packages/ca/heartbeat/heartbeat.go +++ b/packages/ca/heartbeat/heartbeat.go @@ -17,11 +17,13 @@ import ( type Heartbeat struct { nodeId string mainStatement *OpinionStatement + droppedNeighbors [][]byte neighborStatements map[string][]*OpinionStatement signature []byte nodeIdMutex sync.RWMutex mainStatementMutex sync.RWMutex + droppedNeighborsMutex sync.RWMutex neighborStatementsMutex sync.RWMutex signatureMutex sync.RWMutex } @@ -58,6 +60,20 @@ func (heartbeat *Heartbeat) SetMainStatement(mainStatement *OpinionStatement) { heartbeat.mainStatement = mainStatement } +func (heartbeat *Heartbeat) GetDroppedNeighbors() [][]byte { + heartbeat.droppedNeighborsMutex.RLock() + defer heartbeat.droppedNeighborsMutex.RUnlock() + + return heartbeat.droppedNeighbors +} + +func (heartbeat *Heartbeat) SetDroppedNeighbors(droppedNeighbors [][]byte) { + heartbeat.droppedNeighborsMutex.Lock() + defer heartbeat.droppedNeighborsMutex.Unlock() + + heartbeat.droppedNeighbors = droppedNeighbors +} + func (heartbeat *Heartbeat) GetNeighborStatements() map[string][]*OpinionStatement { heartbeat.neighborStatementsMutex.RLock() defer heartbeat.neighborStatementsMutex.RUnlock() @@ -141,6 +157,7 @@ func (heartbeat *Heartbeat) FromProto(proto proto.Message) { heartbeat.nodeId = protoHeartbeat.NodeId heartbeat.mainStatement = &mainStatement + heartbeat.droppedNeighbors = protoHeartbeat.DroppedNeighbors heartbeat.neighborStatements = neighborStatements heartbeat.signature = protoHeartbeat.Signature } @@ -158,6 +175,7 @@ func (heartbeat *Heartbeat) ToProto() proto.Message { return &heartbeatProto.HeartBeat{ NodeId: heartbeat.nodeId, + DroppedNeighbors: heartbeat.droppedNeighbors, MainStatement: heartbeat.mainStatement.ToProto().(*heartbeatProto.OpinionStatement), NeighborStatements: neighborStatements, Signature: heartbeat.signature, @@ -176,6 +194,7 @@ func (heartbeat *Heartbeat) String() string { return stringify.Struct("Heartbeat", stringify.StructField("nodeId", heartbeat.nodeId), stringify.StructField("mainStatement", heartbeat.mainStatement), + stringify.StructField("droppedNeighbors", heartbeat.droppedNeighbors), stringify.StructField("neighborStatements", heartbeat.neighborStatements), stringify.StructField("signature", heartbeat.signature), ) diff --git a/packages/ca/heartbeat/proto/heartbeat.pb.go b/packages/ca/heartbeat/proto/heartbeat.pb.go index ee8c13d2..71d92c9b 100644 --- a/packages/ca/heartbeat/proto/heartbeat.pb.go +++ b/packages/ca/heartbeat/proto/heartbeat.pb.go @@ -23,8 +23,9 @@ const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package type HeartBeat struct { NodeId string `protobuf:"bytes,1,opt,name=nodeId,proto3" json:"nodeId,omitempty"` MainStatement *OpinionStatement `protobuf:"bytes,2,opt,name=mainStatement,proto3" json:"mainStatement,omitempty"` - NeighborStatements []*OpinionStatement `protobuf:"bytes,3,rep,name=neighborStatements,proto3" json:"neighborStatements,omitempty"` - Signature []byte `protobuf:"bytes,4,opt,name=signature,proto3" json:"signature,omitempty"` + DroppedNeighbors [][]byte `protobuf:"bytes,3,rep,name=droppedNeighbors,proto3" json:"droppedNeighbors,omitempty"` + NeighborStatements []*OpinionStatement `protobuf:"bytes,4,rep,name=neighborStatements,proto3" json:"neighborStatements,omitempty"` + Signature []byte `protobuf:"bytes,5,opt,name=signature,proto3" json:"signature,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -69,6 +70,13 @@ func (m *HeartBeat) GetMainStatement() *OpinionStatement { return nil } +func (m *HeartBeat) GetDroppedNeighbors() [][]byte { + if m != nil { + return m.DroppedNeighbors + } + return nil +} + func (m *HeartBeat) GetNeighborStatements() []*OpinionStatement { if m != nil { return m.NeighborStatements @@ -90,16 +98,18 @@ func init() { func init() { proto.RegisterFile("heartbeat.proto", fileDescriptor_3c667767fb9826a9) } var fileDescriptor_3c667767fb9826a9 = []byte{ - // 176 bytes of a gzipped FileDescriptorProto + // 200 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcf, 0x48, 0x4d, 0x2c, 0x2a, 0x49, 0x4a, 0x4d, 0x2c, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x53, 0x52, 0xe2, 0xf9, 0x05, 0x99, 0x79, 0x99, 0xf9, 0x79, 0xf1, 0xc5, 0x25, 0x89, 0x25, 0xa9, 0xb9, 0xa9, - 0x79, 0x50, 0x79, 0xa5, 0x93, 0x8c, 0x5c, 0x9c, 0x1e, 0x20, 0x3d, 0x4e, 0xa9, 0x89, 0x25, 0x42, + 0x79, 0x50, 0x79, 0xa5, 0xaf, 0x8c, 0x5c, 0x9c, 0x1e, 0x20, 0x3d, 0x4e, 0xa9, 0x89, 0x25, 0x42, 0x62, 0x5c, 0x6c, 0x79, 0xf9, 0x29, 0xa9, 0x9e, 0x29, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0x50, 0x9e, 0x90, 0x2d, 0x17, 0x6f, 0x6e, 0x62, 0x66, 0x5e, 0x30, 0x4c, 0xb3, 0x04, 0x93, 0x02, 0xa3, 0x06, 0xb7, 0x91, 0x38, 0xc4, 0x10, 0x3d, 0x7f, 0x88, 0xe1, 0x70, 0xe9, 0x20, 0x54, 0xd5, - 0x42, 0xee, 0x5c, 0x42, 0x79, 0xa9, 0x99, 0xe9, 0x19, 0x49, 0xf9, 0x45, 0x70, 0xc1, 0x62, 0x09, - 0x66, 0x05, 0x66, 0x7c, 0x66, 0x60, 0xd1, 0x22, 0x24, 0xc3, 0xc5, 0x59, 0x9c, 0x99, 0x9e, 0x97, - 0x58, 0x52, 0x5a, 0x94, 0x2a, 0xc1, 0xa2, 0xc0, 0xa8, 0xc1, 0x13, 0x84, 0x10, 0x48, 0x62, 0x03, - 0x9b, 0x64, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0xde, 0x82, 0x59, 0xb6, 0x05, 0x01, 0x00, 0x00, + 0x42, 0x5a, 0x5c, 0x02, 0x29, 0x45, 0xf9, 0x05, 0x05, 0xa9, 0x29, 0x7e, 0xa9, 0x99, 0xe9, 0x19, + 0x49, 0xf9, 0x45, 0xc5, 0x12, 0xcc, 0x0a, 0xcc, 0x1a, 0x3c, 0x41, 0x18, 0xe2, 0x42, 0xee, 0x5c, + 0x42, 0x79, 0x50, 0x0e, 0xdc, 0x80, 0x62, 0x09, 0x16, 0x05, 0x66, 0x7c, 0xf6, 0x61, 0xd1, 0x22, + 0x24, 0xc3, 0xc5, 0x59, 0x9c, 0x99, 0x9e, 0x97, 0x58, 0x52, 0x5a, 0x94, 0x2a, 0xc1, 0xaa, 0xc0, + 0xa8, 0xc1, 0x13, 0x84, 0x10, 0x48, 0x62, 0x03, 0x9b, 0x64, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, + 0xc2, 0x6a, 0xf1, 0xb9, 0x31, 0x01, 0x00, 0x00, } diff --git a/packages/ca/heartbeat/proto/heartbeat.proto b/packages/ca/heartbeat/proto/heartbeat.proto index afce2528..8ddc8d63 100644 --- a/packages/ca/heartbeat/proto/heartbeat.proto +++ b/packages/ca/heartbeat/proto/heartbeat.proto @@ -7,8 +7,8 @@ package proto; message HeartBeat { string nodeId = 1; OpinionStatement mainStatement = 2; - repeated OpinionStatement neighborStatements = 3; - repeated string removedNeighbors = 4; + repeated bytes droppedNeighbors = 3; + repeated OpinionStatement neighborStatements = 4; bytes signature = 5; } diff --git a/packages/ca/heartbeat_chain.go b/packages/ca/heartbeat_chain.go deleted file mode 100644 index ed90265d..00000000 --- a/packages/ca/heartbeat_chain.go +++ /dev/null @@ -1,16 +0,0 @@ -package ca - -import ( - "github.com/iotaledger/goshimmer/packages/ca/heartbeat" -) - -type HeartbeatChain struct { - missingHeartbeats map[string]bool - heartbeats map[string]*heartbeat.Heartbeat -} - -func NewHeartbeatChain() *HeartbeatChain { - return &HeartbeatChain{ - heartbeats: make(map[string]*heartbeat.Heartbeat), - } -} diff --git a/packages/ca/heartbeat_manager.go b/packages/ca/heartbeat_manager.go index 16afa1db..00ae5b0b 100644 --- a/packages/ca/heartbeat_manager.go +++ b/packages/ca/heartbeat_manager.go @@ -19,10 +19,13 @@ type HeartbeatManager struct { identity *identity.Identity options *HeartbeatManagerOptions statementChain *StatementChain + droppedNeighbors [][]byte neighborManagers map[string]*NeighborManager initialOpinions map[string]bool + droppedNeighborsMutex sync.RWMutex neighborManagersMutex sync.RWMutex + initialOpinionsMutex sync.RWMutex } func NewHeartbeatManager(identity *identity.Identity, options ...HeartbeatManagerOption) *HeartbeatManager { @@ -36,49 +39,96 @@ func NewHeartbeatManager(identity *identity.Identity, options ...HeartbeatManage options: DEFAULT_OPTIONS.Override(options...), statementChain: NewStatementChain(), + droppedNeighbors: make([][]byte, 0), neighborManagers: make(map[string]*NeighborManager), initialOpinions: make(map[string]bool), } } func (heartbeatManager *HeartbeatManager) AddNeighbor(neighborIdentity *identity.Identity) { + heartbeatManager.neighborManagersMutex.RLock() if _, exists := heartbeatManager.neighborManagers[neighborIdentity.StringIdentifier]; !exists { - newNeighborManager := NewNeighborManager() + heartbeatManager.neighborManagersMutex.RUnlock() + + heartbeatManager.neighborManagersMutex.Lock() + if _, exists := heartbeatManager.neighborManagers[neighborIdentity.StringIdentifier]; !exists { + newNeighborManager := NewNeighborManager() - heartbeatManager.neighborManagers[neighborIdentity.StringIdentifier] = newNeighborManager + heartbeatManager.neighborManagers[neighborIdentity.StringIdentifier] = newNeighborManager + heartbeatManager.neighborManagersMutex.Unlock() - heartbeatManager.Events.AddNeighbor.Trigger(neighborIdentity, newNeighborManager) + heartbeatManager.Events.AddNeighbor.Trigger(neighborIdentity, newNeighborManager) + } else { + heartbeatManager.neighborManagersMutex.Unlock() + } + } else { + heartbeatManager.neighborManagersMutex.RUnlock() + } +} + +func (heartbeatManager *HeartbeatManager) RemoveNeighbor(neighborIdentity *identity.Identity) { + heartbeatManager.neighborManagersMutex.RLock() + if _, exists := heartbeatManager.neighborManagers[neighborIdentity.StringIdentifier]; exists { + heartbeatManager.neighborManagersMutex.RUnlock() + + heartbeatManager.neighborManagersMutex.Lock() + if neighborManager, exists := heartbeatManager.neighborManagers[neighborIdentity.StringIdentifier]; exists { + delete(heartbeatManager.neighborManagers, neighborIdentity.StringIdentifier) + heartbeatManager.neighborManagersMutex.Unlock() + + heartbeatManager.droppedNeighborsMutex.Lock() + heartbeatManager.droppedNeighbors = append(heartbeatManager.droppedNeighbors, neighborIdentity.Identifier) + heartbeatManager.droppedNeighborsMutex.Unlock() + + heartbeatManager.Events.RemoveNeighbor.Trigger(neighborIdentity, neighborManager) + } else { + heartbeatManager.neighborManagersMutex.Unlock() + } + } else { + heartbeatManager.neighborManagersMutex.RUnlock() } } func (heartbeatManager *HeartbeatManager) InitialDislike(transactionId []byte) { + heartbeatManager.initialOpinionsMutex.Lock() heartbeatManager.initialOpinions[string(transactionId)] = false + heartbeatManager.initialOpinionsMutex.Unlock() } func (heartbeatManager *HeartbeatManager) InitialLike(transactionId []byte) { + heartbeatManager.initialOpinionsMutex.Lock() heartbeatManager.initialOpinions[string(transactionId)] = true + heartbeatManager.initialOpinionsMutex.Unlock() } func (heartbeatManager *HeartbeatManager) GenerateHeartbeat() (result *heartbeat.Heartbeat, err errors.IdentifiableError) { - if mainStatement, mainStatementErr := heartbeatManager.generateMainStatement(); mainStatementErr == nil { - if neighborStatements, neighborStatementErr := heartbeatManager.generateNeighborStatements(); neighborStatementErr == nil { - generatedHeartbeat := heartbeat.NewHeartbeat() - generatedHeartbeat.SetNodeId(heartbeatManager.identity.StringIdentifier) - generatedHeartbeat.SetMainStatement(mainStatement) - generatedHeartbeat.SetNeighborStatements(neighborStatements) - - if signingErr := generatedHeartbeat.Sign(heartbeatManager.identity); signingErr == nil { - result = generatedHeartbeat - } else { - err = signingErr - } - } else { - err = neighborStatementErr - } - } else { + mainStatement, mainStatementErr := heartbeatManager.generateMainStatement() + if mainStatementErr != nil { err = mainStatementErr + + return } + neighborStatements, neighborStatementErr := heartbeatManager.generateNeighborStatements() + if neighborStatementErr != nil { + err = neighborStatementErr + + return + } + + generatedHeartbeat := heartbeat.NewHeartbeat() + generatedHeartbeat.SetNodeId(heartbeatManager.identity.StringIdentifier) + generatedHeartbeat.SetMainStatement(mainStatement) + generatedHeartbeat.SetDroppedNeighbors(heartbeatManager.generateDroppedNeighbors()) + generatedHeartbeat.SetNeighborStatements(neighborStatements) + + signingErr := generatedHeartbeat.Sign(heartbeatManager.identity) + if signingErr != nil { + err = signingErr + } + + result = generatedHeartbeat + return } @@ -112,7 +162,7 @@ func (heartbeatManager *HeartbeatManager) generateMainStatement() (result *heart mainStatement.SetTime(uint64(time.Now().Unix())) mainStatement.SetToggledTransactions(heartbeatManager.generateToggledTransactions()) - if lastAppliedStatement := heartbeatManager.statementChain.lastAppliedStatement; lastAppliedStatement != nil { + if lastAppliedStatement := heartbeatManager.statementChain.GetTail(); lastAppliedStatement != nil { mainStatement.SetPreviousStatementHash(lastAppliedStatement.GetHash()) } @@ -120,7 +170,7 @@ func (heartbeatManager *HeartbeatManager) generateMainStatement() (result *heart result = mainStatement heartbeatManager.resetInitialOpinions() - heartbeatManager.statementChain.lastAppliedStatement = mainStatement + heartbeatManager.statementChain.tail = mainStatement } else { err = signingErr } @@ -128,6 +178,19 @@ func (heartbeatManager *HeartbeatManager) generateMainStatement() (result *heart return } +func (heartbeatManager *HeartbeatManager) generateDroppedNeighbors() (result [][]byte) { + heartbeatManager.droppedNeighborsMutex.RLock() + result = make([][]byte, len(heartbeatManager.droppedNeighbors)) + copy(result, heartbeatManager.droppedNeighbors) + heartbeatManager.droppedNeighborsMutex.RUnlock() + + heartbeatManager.droppedNeighborsMutex.Lock() + heartbeatManager.droppedNeighbors = make([][]byte, 0) + heartbeatManager.droppedNeighborsMutex.Unlock() + + return +} + func (heartbeatManager *HeartbeatManager) generateNeighborStatements() (result map[string][]*heartbeat.OpinionStatement, err errors.IdentifiableError) { result = make(map[string][]*heartbeat.OpinionStatement) diff --git a/packages/ca/heartbeat_manager_test.go b/packages/ca/heartbeat_manager_test.go index 5b90540a..83cd9c3e 100644 --- a/packages/ca/heartbeat_manager_test.go +++ b/packages/ca/heartbeat_manager_test.go @@ -20,11 +20,14 @@ func generateRandomTransactionId() (result []byte) { func TestHeartbeatManager_GenerateHeartbeat(t *testing.T) { ownIdentity := identity.GenerateRandomIdentity() neighborIdentity := identity.GenerateRandomIdentity() + droppedNeighborIdentity := identity.GenerateRandomIdentity() // generate first heartbeat //////////////////////////////////////////////////////////////////////////////////////// heartbeatManager1 := NewHeartbeatManager(ownIdentity) heartbeatManager1.AddNeighbor(neighborIdentity) + heartbeatManager1.AddNeighbor(droppedNeighborIdentity) + heartbeatManager1.RemoveNeighbor(droppedNeighborIdentity) heartbeatManager1.InitialDislike(generateRandomTransactionId()) heartbeatManager1.InitialDislike(generateRandomTransactionId()) heartbeatManager1.InitialLike(generateRandomTransactionId()) diff --git a/packages/ca/neighbor_manager.go b/packages/ca/neighbor_manager.go index d5b77118..07e0936a 100644 --- a/packages/ca/neighbor_manager.go +++ b/packages/ca/neighbor_manager.go @@ -2,7 +2,7 @@ package ca import ( "bytes" - "sort" + "fmt" "strconv" "github.com/iotaledger/goshimmer/packages/typeutils" @@ -29,12 +29,21 @@ type NeighborManager struct { func NewNeighborManager(options ...NeighborManagerOption) *NeighborManager { return &NeighborManager{ Events: NeighborManagerEvents{ + AddNeighbor: events.NewEvent(func(handler interface{}, params ...interface{}) { + + }), + RemoveNeighbor: events.NewEvent(func(handler interface{}, params ...interface{}) { + + }), ChainReset: events.NewEvent(events.CallbackCaller), StatementMissing: events.NewEvent(HashCaller), }, - options: DEFAULT_NEIGHBOR_MANAGER_OPTIONS.Override(options...), - mainChain: NewStatementChain(), - neighborChains: make(map[string]*StatementChain), + options: DEFAULT_NEIGHBOR_MANAGER_OPTIONS.Override(options...), + mainChain: NewStatementChain(), + missingHeartbeats: make(map[string]bool), + pendingHeartbeats: make(map[string]*heartbeat.Heartbeat), + heartbeats: make(map[string]*heartbeat.Heartbeat), + neighborChains: make(map[string]*StatementChain), } } @@ -75,6 +84,10 @@ func (neighborManager *NeighborManager) storeHeartbeat(heartbeat *heartbeat.Hear } } + // endregion /////////////////////////////////////////////////////////////////////////////////////////////////////// + + // region store heartbeat and update lists ///////////////////////////////////////////////////////////////////////// + heartbeatHash := typeutils.BytesToString(mainStatement.GetHash()) if neighborManager.lastReceivedHeartbeat == nil || mainStatement.GetTime() > neighborManager.lastReceivedHeartbeat.GetMainStatement().GetTime() { @@ -92,67 +105,141 @@ func (neighborManager *NeighborManager) storeHeartbeat(heartbeat *heartbeat.Hear func (neighborManager *NeighborManager) applyPendingHeartbeats() (err errors.IdentifiableError) { if len(neighborManager.missingHeartbeats) == 0 && len(neighborManager.pendingHeartbeats) >= 1 { - // cycle through heartbeats and apply them one by one + sortedPendingHeartbeats, sortingErr := neighborManager.getPendingHeartbeatsSorted() + if sortingErr != nil { + err = sortingErr + + return + } + + for _, sortedHeartbeat := range sortedPendingHeartbeats { + if applicationErr := neighborManager.applyHeartbeat(sortedHeartbeat); applicationErr != nil { + err = applicationErr + + return + } + } } return } -func (neighborManager *NeighborManager) ApplyHeartbeat(heartbeat *heartbeat.Heartbeat) (err errors.IdentifiableError) { - if storeErr := neighborManager.storeHeartbeat(heartbeat); storeErr != nil { - err = storeErr +func (neighborManager *NeighborManager) getPendingHeartbeatsSorted() (result []*heartbeat.Heartbeat, err errors.IdentifiableError) { + pendingHeartbeatCount := len(neighborManager.pendingHeartbeats) + result = make([]*heartbeat.Heartbeat, pendingHeartbeatCount) - return + processedHeartbeats := 0 + currentHeartbeat := neighborManager.lastReceivedHeartbeat + for currentHeartbeat != nil && len(neighborManager.pendingHeartbeats) >= 1 { + mainStatement := currentHeartbeat.GetMainStatement() + if mainStatement == nil { + result = nil + err = ErrInternalError.Derive("missing main statement in heartbeat") + + return + } + + currentHeartbeatHash := typeutils.BytesToString(mainStatement.GetHash()) + previousHeartbeatHash := typeutils.BytesToString(mainStatement.GetPreviousStatementHash()) + + if _, exists := neighborManager.pendingHeartbeats[currentHeartbeatHash]; !exists { + result = nil + err = ErrInternalError.Derive("pending heartbeats list is out of sync") + + return + } + delete(neighborManager.pendingHeartbeats, currentHeartbeatHash) + + result[pendingHeartbeatCount-processedHeartbeats-1] = currentHeartbeat + + currentHeartbeat = neighborManager.heartbeats[previousHeartbeatHash] + processedHeartbeats++ } - // region mark idle neighbors ////////////////////////////////////////////////////////////////////////////////////// + return +} + +// this method removes dropped neighbors from the neighborChains +func (neighborManager *NeighborManager) removeDroppedNeighbors(droppedNeighbors [][]byte) { + for _, droppedNeighbor := range droppedNeighbors { + neighborIdString := typeutils.BytesToString(droppedNeighbor) + + if _, exists := neighborManager.neighborChains[neighborIdString]; exists { + delete(neighborManager.neighborChains, neighborIdString) + + neighborManager.Events.RemoveNeighbor.Trigger(droppedNeighbor) + } + } +} - existingNeighbors := make(map[string]*StatementChain) +func (neighborManager *NeighborManager) markIdleNeighbors(neighborStatements map[string][]*heartbeat.OpinionStatement) { + idleNeighbors := make(map[string]*StatementChain) for neighborId, neighborChain := range neighborManager.neighborChains { - existingNeighbors[neighborId] = neighborChain + idleNeighbors[neighborId] = neighborChain } for neighborId := range neighborStatements { - if neighborStatementChain, neighborExists := existingNeighbors[neighborId]; neighborExists { - neighborStatementChain.ResetIdleCounter() + if _, neighborExists := idleNeighbors[neighborId]; neighborExists { + // TRIGGER ACTIVE - delete(existingNeighbors, neighborId) + delete(idleNeighbors, neighborId) } } - for _, neighborStatementChain := range existingNeighbors { - neighborStatementChain.IncreaseIdleCounter() + for _, x := range idleNeighbors { + // TRIGGER IDLE + if false { + fmt.Println(x) + } } +} - // endregion /////////////////////////////////////////////////////////////////////////////////////////////////////// - - // region add received statements ////////////////////////////////////////////////////////////////////////////////// - - applyStatements := neighborManager.mainChain.AddStatement(mainStatement) +func (neighborManager *NeighborManager) updateStatementChains(mainStatement *heartbeat.OpinionStatement, neighborStatements map[string][]*heartbeat.OpinionStatement) (err errors.IdentifiableError) { + neighborManager.mainChain.AddStatement(mainStatement) - // add neighbor statements to chain for neighborId, statementsOfNeighbor := range neighborStatements { - neighborChain, exists := neighborManager.neighborChains[neighborId] - if !exists { - neighborChain = neighborManager.addNeighborChain(neighborId) + neighborChain, neighborChainErr := neighborManager.addNeighborChain(neighborId) + if neighborChainErr != nil { + err = neighborChainErr + + return } - if neighborChain != nil { - for _, neighborStatement := range statementsOfNeighbor { - neighborChain.AddStatement(neighborStatement) - } + for _, neighborStatement := range statementsOfNeighbor { + neighborChain.AddStatement(neighborStatement) } } - // endregion /////////////////////////////////////////////////////////////////////////////////////////////////////// + return +} + +func (neighborManager *NeighborManager) applyHeartbeat(heartbeat *heartbeat.Heartbeat) (err errors.IdentifiableError) { + mainStatement := heartbeat.GetMainStatement() + neighborStatements := heartbeat.GetNeighborStatements() + + neighborManager.removeDroppedNeighbors(heartbeat.GetDroppedNeighbors()) - // region apply pending statements ///////////////////////////////////////////////////////////////////////////////// + neighborManager.markIdleNeighbors(neighborStatements) - if applyStatements { - neighborManager.mainChain.lastAppliedStatement = mainStatement + if err = neighborManager.updateStatementChains(mainStatement, neighborStatements); err != nil { + return } - // endregion /////////////////////////////////////////////////////////////////////////////////////////////////////// + return +} + +func (neighborManager *NeighborManager) ApplyHeartbeat(heartbeat *heartbeat.Heartbeat) (err errors.IdentifiableError) { + if storeErr := neighborManager.storeHeartbeat(heartbeat); storeErr != nil { + err = storeErr + + return + } + + if applicationErr := neighborManager.applyPendingHeartbeats(); applicationErr != nil { + err = applicationErr + + return + } return } @@ -160,7 +247,7 @@ func (neighborManager *NeighborManager) ApplyHeartbeat(heartbeat *heartbeat.Hear func (neighborManager *NeighborManager) GenerateHeartbeatStatements() (result []*heartbeat.OpinionStatement) { result = make([]*heartbeat.OpinionStatement, 0) - if lastAppliedStatement := neighborManager.mainChain.GetLastAppliedStatement(); lastAppliedStatement != nil { + if lastAppliedStatement := neighborManager.mainChain.GetTail(); lastAppliedStatement != nil { currentStatement := lastAppliedStatement for currentStatement != nil && !bytes.Equal(currentStatement.GetHash(), neighborManager.previouslyReportedHeartbeatHash) { result = append([]*heartbeat.OpinionStatement{currentStatement}, result...) @@ -174,49 +261,23 @@ func (neighborManager *NeighborManager) GenerateHeartbeatStatements() (result [] return } -func (neighborManager *NeighborManager) addNeighborChain(neighborId string) *StatementChain { - if len(neighborManager.neighborChains) < MAX_NEIGHBOR_COUNT { - newNeighborChain := NewStatementChain() - neighborManager.neighborChains[neighborId] = newNeighborChain +func (neighborManager *NeighborManager) addNeighborChain(neighborId string) (result *StatementChain, err errors.IdentifiableError) { + if existingNeighborChain, exists := neighborManager.neighborChains[neighborId]; exists { + result = existingNeighborChain - return newNeighborChain - } - - neighbors := make([]string, 0, len(neighborManager.neighborChains)) - for neighborId := range neighborManager.neighborChains { - neighbors = append(neighbors, neighborId) + return } - sort.Slice(neighbors, func(i, j int) bool { - neighborChainI := neighborManager.neighborChains[neighbors[i]] - neighborChainJ := neighborManager.neighborChains[neighbors[i]] - - switch true { - case neighborChainI.GetIdleCounter() < neighborChainJ.GetIdleCounter(): - return true - default: - return false - - } - }) - newNeighborChain := NewStatementChain() + if len(neighborManager.neighborChains) >= MAX_NEIGHBOR_COUNT { + err = ErrTooManyNeighbors.Derive("failed to add new neighbor: too many neighbors") - neighborManager.neighborChains[neighborId] = newNeighborChain + return + } - return newNeighborChain -} + result = NewStatementChain() + neighborManager.neighborChains[neighborId] = result -type NeighborManagerOptions struct { - maxNeighborChains int -} + neighborManager.Events.AddNeighbor.Trigger(neighborId, result) -func (options NeighborManagerOptions) Override(optionalOptions ...NeighborManagerOption) *NeighborManagerOptions { - result := &options - for _, option := range optionalOptions { - option(result) - } - - return result + return } - -type NeighborManagerOption func(*NeighborManagerOptions) diff --git a/packages/ca/neighbor_manager_events.go b/packages/ca/neighbor_manager_events.go deleted file mode 100644 index 753b80fc..00000000 --- a/packages/ca/neighbor_manager_events.go +++ /dev/null @@ -1,10 +0,0 @@ -package ca - -import ( - "github.com/iotaledger/goshimmer/packages/events" -) - -type NeighborManagerEvents struct { - ChainReset *events.Event - StatementMissing *events.Event -} diff --git a/packages/ca/neighbor_manager_options.go b/packages/ca/neighbor_manager_options.go index 3e23aff0..7f650e08 100644 --- a/packages/ca/neighbor_manager_options.go +++ b/packages/ca/neighbor_manager_options.go @@ -3,3 +3,18 @@ package ca var DEFAULT_NEIGHBOR_MANAGER_OPTIONS = &NeighborManagerOptions{ maxNeighborChains: 8, } + +type NeighborManagerOptions struct { + maxNeighborChains int +} + +func (options NeighborManagerOptions) Override(optionalOptions ...NeighborManagerOption) *NeighborManagerOptions { + result := &options + for _, option := range optionalOptions { + option(result) + } + + return result +} + +type NeighborManagerOption func(*NeighborManagerOptions) diff --git a/packages/ca/statement_chain.go b/packages/ca/statement_chain.go index c04af8cf..400332f3 100644 --- a/packages/ca/statement_chain.go +++ b/packages/ca/statement_chain.go @@ -3,77 +3,83 @@ package ca import ( "bytes" + "github.com/iotaledger/goshimmer/packages/errors" + "github.com/iotaledger/goshimmer/packages/typeutils" "github.com/iotaledger/goshimmer/packages/events" "github.com/iotaledger/goshimmer/packages/ca/heartbeat" - "github.com/iotaledger/goshimmer/packages/errors" ) type StatementChain struct { Events StatementChainEvents - statements map[string]*heartbeat.OpinionStatement - missingStatements map[string]bool - idleCounter int - lastAppliedStatement *heartbeat.OpinionStatement - lastReceivedStatement *heartbeat.OpinionStatement + pendingTransactionStatuses map[string]bool + transactionStatuses map[string]bool + statements map[string]*heartbeat.OpinionStatement + tail *heartbeat.OpinionStatement } func NewStatementChain() *StatementChain { return &StatementChain{ Events: StatementChainEvents{ - Reset: events.NewEvent(events.CallbackCaller), - StatementMissing: events.NewEvent(HashCaller), + Reset: events.NewEvent(events.CallbackCaller), }, - statements: make(map[string]*heartbeat.OpinionStatement), - missingStatements: make(map[string]bool), + pendingTransactionStatuses: make(map[string]bool), + transactionStatuses: make(map[string]bool), + statements: make(map[string]*heartbeat.OpinionStatement), } } -func (statementChain *StatementChain) IncreaseIdleCounter() { - statementChain.idleCounter++ -} +func (statementChain *StatementChain) getTransactionStatus(transactionId string) (result bool, exists bool) { + if result, exists = statementChain.pendingTransactionStatuses[transactionId]; exists { + return + } -func (statementChain *StatementChain) ResetIdleCounter() { - statementChain.idleCounter = 0 -} + result, exists = statementChain.transactionStatuses[transactionId] -func (statementChain *StatementChain) GetIdleCounter() int { - return statementChain.idleCounter + return } -func (statementChain *StatementChain) AddStatement(statement *heartbeat.OpinionStatement) bool { +func (statementChain *StatementChain) AddStatement(statement *heartbeat.OpinionStatement) errors.IdentifiableError { previousStatementHash := statement.GetPreviousStatementHash() - lastAppliedMainStatement := statementChain.lastAppliedStatement + lastAppliedStatement := statementChain.tail - if len(previousStatementHash) == 0 { + if len(previousStatementHash) == 0 || lastAppliedStatement != nil && !bytes.Equal(lastAppliedStatement.GetHash(), previousStatementHash) { statementChain.Reset() - } else if lastAppliedMainStatement != nil && !bytes.Equal(lastAppliedMainStatement.GetHash(), previousStatementHash) { - if (statement.GetTime() - lastAppliedMainStatement.GetTime()) >= MAX_STATEMENT_TIMEOUT { - statementChain.Reset() - } else if !statementChain.StatementExists(previousStatementHash) { - statementChain.addMissingStatement(previousStatementHash) - statementChain.addStatement(statement) - - return false - } } - statementChain.addStatement(statement) - - return true -} + for _, toggledTransaction := range statement.GetToggledTransactions() { + transactionId := typeutils.BytesToString(toggledTransaction.GetTransactionId()) + + if toggledTransaction.IsInitialStatement() { + if _, exists := statementChain.getTransactionStatus(transactionId); exists { + return ErrMalformedHeartbeat.Derive("two initial statements for the same transaction") + } + + statementChain.pendingTransactionStatuses[transactionId] = false + } else if toggledTransaction.IsFinalStatement() { + // finalize -> clean up + } else { + if currentValue, exists := statementChain.getTransactionStatus(transactionId); exists { + statementChain.pendingTransactionStatuses[transactionId] = !currentValue + } + } + } -func (statementChain *StatementChain) addStatement(statement *heartbeat.OpinionStatement) { statementChain.statements[typeutils.BytesToString(statement.GetHash())] = statement + statementChain.tail = statement + + return nil } -func (statementChain *StatementChain) addMissingStatement(statementHash []byte) { - statementChain.missingStatements[typeutils.BytesToString(statementHash)] = true +func (statementChain *StatementChain) ApplyPendingTransactionStatusChanges() { + for transactionId, value := range statementChain.pendingTransactionStatuses { + statementChain.transactionStatuses[transactionId] = value + } - statementChain.Events.StatementMissing.Trigger(statementHash) + statementChain.pendingTransactionStatuses = make(map[string]bool) } func (statementChain *StatementChain) GetStatement(statementHash []byte) *heartbeat.OpinionStatement { @@ -81,21 +87,12 @@ func (statementChain *StatementChain) GetStatement(statementHash []byte) *heartb } func (statementChain *StatementChain) Reset() { - statementChain.Events.Reset.Trigger() -} + statementChain.statements = make(map[string]*heartbeat.OpinionStatement) + statementChain.tail = nil -func (statementChain *StatementChain) Apply(statement *heartbeat.OpinionStatement) (err errors.IdentifiableError) { - return -} - -func (statementChain *StatementChain) SetLastReceivedStatement(statement *heartbeat.OpinionStatement) { - statementChain.lastReceivedStatement = statement -} - -func (statementChain *StatementChain) GetLastAppliedStatement() *heartbeat.OpinionStatement { - return statementChain.lastAppliedStatement + statementChain.Events.Reset.Trigger() } -func (statementChain *StatementChain) StatementExists(statementHash []byte) bool { - return true +func (statementChain *StatementChain) GetTail() *heartbeat.OpinionStatement { + return statementChain.tail } diff --git a/packages/ca/statement_chain_events.go b/packages/ca/statement_chain_events.go deleted file mode 100644 index 21ec16ee..00000000 --- a/packages/ca/statement_chain_events.go +++ /dev/null @@ -1,14 +0,0 @@ -package ca - -import ( - "github.com/iotaledger/goshimmer/packages/events" -) - -type StatementChainEvents struct { - Reset *events.Event - StatementMissing *events.Event -} - -func HashCaller(handler interface{}, params ...interface{}) { - handler.(func([]byte))(params[0].([]byte)) -} -- GitLab