Skip to content
Snippets Groups Projects
Unverified Commit 5ffec2d8 authored by Santiago Ruano Rincón's avatar Santiago Ruano Rincón
Browse files

rm files deleted by upstream between 0.2.1 and 0.2.3

parent 94a210e5
No related tags found
No related merge requests found
package dashboard
import (
"strings"
"sync"
"time"
"gitlab.imt-atlantique.fr/iota-imt/goshimmer/packages/graph@without_tipselection"
"gitlab.imt-atlantique.fr/iota-imt/goshimmer/packages/shutdown@without_tipselection"
"gitlab.imt-atlantique.fr/iota-imt/goshimmer/plugins/analysis/packet@without_tipselection"
analysisserver "gitlab.imt-atlantique.fr/iota-imt/goshimmer/plugins/analysis/server@without_tipselection"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/identity"
)
// the period in which we scan and delete old data.
const cleanUpPeriod = 15 * time.Second
var (
// maps nodeId to the latest arrival of a heartbeat.
nodes = make(map[string]time.Time)
// maps nodeId to outgoing connections + latest arrival of heartbeat.
links = make(map[string]map[string]time.Time)
lock sync.RWMutex
)
// NeighborMetric contains the number of inbound/outbound neighbors.
type NeighborMetric struct {
Inbound uint
Outbound uint
}
// NumOfNeighbors returns a map of nodeIDs to their neighbor count.
func NumOfNeighbors() map[string]*NeighborMetric {
lock.RLock()
defer lock.RUnlock()
result := make(map[string]*NeighborMetric)
for nodeID := range nodes {
// number of outgoing neighbors
if _, exist := result[nodeID]; !exist {
result[nodeID] = &NeighborMetric{Outbound: uint(len(links[nodeID]))}
} else {
result[nodeID].Outbound = uint(len(links[nodeID]))
}
// fill in incoming neighbors
for outNeighborID := range links[nodeID] {
if _, exist := result[outNeighborID]; !exist {
result[outNeighborID] = &NeighborMetric{Inbound: 1}
} else {
result[outNeighborID].Inbound++
}
}
}
return result
}
// NetworkGraph returns the autopeering network graph.
func NetworkGraph() *graph.Graph {
lock.RLock()
defer lock.RUnlock()
var nodeIDs []string
for id := range nodes {
nodeIDs = append(nodeIDs, id)
}
g := graph.New(nodeIDs)
for src, trgMap := range links {
for dst := range trgMap {
g.AddEdge(src, dst)
}
}
return g
}
// configures the event recording by attaching to the analysis server's events.
func configureEventsRecording() {
analysisserver.Events.Heartbeat.Attach(events.NewClosure(func(hb *packet.Heartbeat) {
var out strings.Builder
for _, value := range hb.OutboundIDs {
out.WriteString(shortNodeIDString(value))
}
var in strings.Builder
for _, value := range hb.InboundIDs {
in.WriteString(shortNodeIDString(value))
}
log.Debugw(
"Heartbeat",
"nodeId", shortNodeIDString(hb.OwnID),
"outboundIds", out.String(),
"inboundIds", in.String(),
)
lock.Lock()
defer lock.Unlock()
nodeIDString := shortNodeIDString(hb.OwnID)
timestamp := time.Now()
// when node is new, add to graph
if _, isAlready := nodes[nodeIDString]; !isAlready {
analysisserver.Events.AddNode.Trigger(nodeIDString)
}
// save it + update timestamp
nodes[nodeIDString] = timestamp
// outgoing neighbor links update
for _, outgoingNeighbor := range hb.OutboundIDs {
outgoingNeighborString := shortNodeIDString(outgoingNeighbor)
// do we already know about this neighbor?
// if no, add it and set it online
if _, isAlready := nodes[outgoingNeighborString]; !isAlready {
// first time we see this particular node
analysisserver.Events.AddNode.Trigger(outgoingNeighborString)
}
// we have indirectly heard about the neighbor.
nodes[outgoingNeighborString] = timestamp
// do we have any links already with src=nodeIdString?
if _, isAlready := links[nodeIDString]; !isAlready {
// nope, so we have to allocate an empty map to be nested in links for nodeIdString
links[nodeIDString] = make(map[string]time.Time)
}
// update graph when connection hasn't been seen before
if _, isAlready := links[nodeIDString][outgoingNeighborString]; !isAlready {
analysisserver.Events.ConnectNodes.Trigger(nodeIDString, outgoingNeighborString)
}
// update links
links[nodeIDString][outgoingNeighborString] = timestamp
}
// incoming neighbor links update
for _, incomingNeighbor := range hb.InboundIDs {
incomingNeighborString := shortNodeIDString(incomingNeighbor)
// do we already know about this neighbor?
// if no, add it and set it online
if _, isAlready := nodes[incomingNeighborString]; !isAlready {
// First time we see this particular node
analysisserver.Events.AddNode.Trigger(incomingNeighborString)
}
// we have indirectly heard about the neighbor.
nodes[incomingNeighborString] = timestamp
// do we have any links already with src=incomingNeighborString?
if _, isAlready := links[incomingNeighborString]; !isAlready {
// nope, so we have to allocate an empty map to be nested in links for incomingNeighborString
links[incomingNeighborString] = make(map[string]time.Time)
}
// update graph when connection hasn't been seen before
if _, isAlready := links[incomingNeighborString][nodeIDString]; !isAlready {
analysisserver.Events.ConnectNodes.Trigger(incomingNeighborString, nodeIDString)
}
// update links map
links[incomingNeighborString][nodeIDString] = timestamp
}
}))
}
// starts record manager that initiates a record cleanup periodically
func runEventsRecordManager() {
if err := daemon.BackgroundWorker("Dashboard Analysis Server Record Manager", func(shutdownSignal <-chan struct{}) {
ticker := time.NewTicker(cleanUpPeriod)
defer ticker.Stop()
for {
select {
case <-shutdownSignal:
return
case <-ticker.C:
cleanUp(cleanUpPeriod)
}
}
}, shutdown.PriorityAnalysis); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
}
// removes nodes and links we haven't seen for at least 3 times the heartbeat interval.
func cleanUp(interval time.Duration) {
lock.Lock()
defer lock.Unlock()
now := time.Now()
// go through the list of connections. Remove connections that are older than interval time.
for srcNode, targetMap := range links {
for trgNode, lastSeen := range targetMap {
if now.Sub(lastSeen) > interval {
delete(targetMap, trgNode)
analysisserver.Events.DisconnectNodes.Trigger(srcNode, trgNode)
}
}
// delete src node from links if it doesn't have any connections
if len(targetMap) == 0 {
delete(links, srcNode)
}
}
// go through the list of nodes. Remove nodes that haven't been seen for interval time
for node, lastSeen := range nodes {
if now.Sub(lastSeen) > interval {
delete(nodes, node)
analysisserver.Events.RemoveNode.Trigger(node)
}
}
}
func getEventsToReplay() (map[string]time.Time, map[string]map[string]time.Time) {
lock.RLock()
defer lock.RUnlock()
copiedNodes := make(map[string]time.Time, len(nodes))
for nodeID, lastHeartbeat := range nodes {
copiedNodes[nodeID] = lastHeartbeat
}
copiedLinks := make(map[string]map[string]time.Time, len(links))
for sourceID, targetMap := range links {
copiedLinks[sourceID] = make(map[string]time.Time, len(targetMap))
for targetID, lastHeartbeat := range targetMap {
copiedLinks[sourceID][targetID] = lastHeartbeat
}
}
return copiedNodes, copiedLinks
}
// replays recorded events on the given event handler.
func replayAutopeeringEvents(handlers *EventHandlers) {
copiedNodes, copiedLinks := getEventsToReplay()
// when a node is present in the list, it means we heard about it directly
// or indirectly, but within CLEAN_UP_PERIOD, therefore it is online
for nodeID := range copiedNodes {
handlers.AddNode(nodeID)
}
for sourceID, targetMap := range copiedLinks {
for targetID := range targetMap {
handlers.ConnectNodes(sourceID, targetID)
}
}
}
// EventHandlers holds the handler for each event of the record manager.
type EventHandlers struct {
// Addnode defines the handler called when adding a new node.
AddNode func(nodeId string)
// RemoveNode defines the handler called when adding removing a node.
RemoveNode func(nodeId string)
// ConnectNodes defines the handler called when connecting two nodes.
ConnectNodes func(sourceId string, targetId string)
// DisconnectNodes defines the handler called when connecting two nodes.
DisconnectNodes func(sourceId string, targetId string)
}
// EventHandlersConsumer defines the consumer function of an *EventHandlers.
type EventHandlersConsumer = func(handler *EventHandlers)
func shortNodeIDString(b []byte) string {
var id identity.ID
copy(id[:], b)
return id.String()
}
package bootstrap
import (
goSync "sync"
"time"
"gitlab.imt-atlantique.fr/iota-imt/goshimmer/packages/binary/spammer@without_tipselection"
"gitlab.imt-atlantique.fr/iota-imt/goshimmer/packages/shutdown@without_tipselection"
"gitlab.imt-atlantique.fr/iota-imt/goshimmer/plugins/config@without_tipselection"
"gitlab.imt-atlantique.fr/iota-imt/goshimmer/plugins/issuer@without_tipselection"
"gitlab.imt-atlantique.fr/iota-imt/goshimmer/plugins/sync@without_tipselection"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
flag "github.com/spf13/pflag"
)
const (
// PluginName is the plugin name of the bootstrap plugin.
PluginName = "Bootstrap"
// CfgBootstrapInitialIssuanceTimePeriodSec defines the initial time period of how long the node should be
// issuing messages when started in bootstrapping mode. If the value is set to -1, the issuance is continuous.
CfgBootstrapInitialIssuanceTimePeriodSec = "bootstrap.initialIssuance.timePeriodSec"
// CfgBootstrapTimeUnit defines the time unit (in seconds) of the issuance rate (e.g., 3 messages per 12 seconds).
CfgBootstrapTimeUnit = "bootstrap.timeUnit"
// the messages per period to issue when in bootstrapping mode.
initialIssuanceRate = 1
// the value which determines a continuous issuance of messages from the bootstrap plugin.
continuousIssuance = -1
)
func init() {
flag.Int(CfgBootstrapInitialIssuanceTimePeriodSec, -1, "the initial time period of how long the node should be issuing messages when started in bootstrapping mode. "+
"if the value is set to -1, the issuance is continuous.")
flag.Int(CfgBootstrapTimeUnit, 5, "the time unit (in seconds) of the issuance rate (e.g., 1 messages per 5 seconds).")
}
var (
// plugin is the plugin instance of the bootstrap plugin.
plugin *node.Plugin
once goSync.Once
log *logger.Logger
)
// Plugin gets the plugin instance.
func Plugin() *node.Plugin {
once.Do(func() {
plugin = node.NewPlugin(PluginName, node.Disabled, configure, run)
})
return plugin
}
func configure(_ *node.Plugin) {
log = logger.NewLogger(PluginName)
// we're auto. synced if we start in bootstrapping mode
sync.OverwriteSyncedState(true)
log.Infof("starting node in bootstrapping mode")
}
func run(_ *node.Plugin) {
messageSpammer := spammer.New(issuer.IssuePayload)
issuancePeriodSec := config.Node().GetInt(CfgBootstrapInitialIssuanceTimePeriodSec)
timeUnit := config.Node().GetInt(CfgBootstrapTimeUnit)
if timeUnit <= 0 {
log.Panicf("Invalid Bootsrap time unit: %d seconds", timeUnit)
}
issuancePeriod := time.Duration(issuancePeriodSec) * time.Second
// issue messages on top of the genesis
if err := daemon.BackgroundWorker("Bootstrapping-Issuer", func(shutdownSignal <-chan struct{}) {
messageSpammer.Start(initialIssuanceRate, time.Duration(timeUnit)*time.Second)
defer messageSpammer.Shutdown()
// don't stop issuing messages if in continuous mode
if issuancePeriodSec == continuousIssuance {
log.Info("continuously issuing bootstrapping messages")
<-shutdownSignal
return
}
log.Infof("issuing bootstrapping messages for %d seconds", issuancePeriodSec)
select {
case <-time.After(issuancePeriod):
case <-shutdownSignal:
}
}, shutdown.PriorityBootstrap); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
}
package sync
import (
"errors"
"sync"
"time"
"gitlab.imt-atlantique.fr/iota-imt/goshimmer/packages/binary/messagelayer/message@without_tipselection"
"gitlab.imt-atlantique.fr/iota-imt/goshimmer/packages/binary/messagelayer/tangle@without_tipselection"
gossipPkg "gitlab.imt-atlantique.fr/iota-imt/goshimmer/packages/gossip@without_tipselection"
"gitlab.imt-atlantique.fr/iota-imt/goshimmer/packages/shutdown@without_tipselection"
"gitlab.imt-atlantique.fr/iota-imt/goshimmer/plugins/autopeering/local@without_tipselection"
"gitlab.imt-atlantique.fr/iota-imt/goshimmer/plugins/config@without_tipselection"
"gitlab.imt-atlantique.fr/iota-imt/goshimmer/plugins/gossip@without_tipselection"
"gitlab.imt-atlantique.fr/iota-imt/goshimmer/plugins/messagelayer@without_tipselection"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
"github.com/iotaledger/hive.go/types"
flag "github.com/spf13/pflag"
"go.uber.org/atomic"
)
const (
// PluginName is the plugin name of the sync plugin.
PluginName = "Sync"
// CfgSyncAnchorPointsCount defines the amount of anchor points to use to determine
// whether a node is synchronized.
CfgSyncAnchorPointsCount = "sync.anchorPointsCount"
// CfgSyncAnchorPointsCleanupAfterSec defines the amount of time which is allowed to pass between setting an anchor
// point and it not becoming solid (to clean its slot for another anchor point). It basically defines the expectancy
// of how long it should take for an anchor point to become solid. Even if this value is set too low, usually a node
// would eventually solidify collected anchor points.
CfgSyncAnchorPointsCleanupAfterSec = "sync.anchorPointsCleanupAfterSec"
// CfgSyncAnchorPointsCleanupIntervalSec defines the interval at which it is checked whether anchor points fall
// into the cleanup window.
CfgSyncAnchorPointsCleanupIntervalSec = "sync.anchorPointsCleanupIntervalSec"
// CfgSyncDesyncedIfNoMessageAfterSec defines the time period in which new messages must be received and if not
// the node is marked as desynced.
CfgSyncDesyncedIfNoMessageAfterSec = "sync.desyncedIfNoMessagesAfterSec"
// defines the max. divergence a potential new anchor point's issuance time can have
// from the current issuance threshold. say the current threshold is at 1000, the boundary at 10,
// we allow a new potential anchor point's issuance time to be within >=990 / 10 seconds older
// than the current threshold.
issuanceThresholdBeforeTimeBoundary = 20 * time.Second
)
func init() {
flag.Int(CfgSyncAnchorPointsCount, 3, "the amount of anchor points to use to determine whether a node is synchronized")
flag.Int(CfgSyncDesyncedIfNoMessageAfterSec, 300, "the time period in seconds which sets the node as desynced if no new messages are received")
flag.Int(CfgSyncAnchorPointsCleanupIntervalSec, 10, "the interval at which it is checked whether anchor points fall into the cleanup window")
flag.Int(CfgSyncAnchorPointsCleanupAfterSec, 60, "the amount of time which is allowed to pass between setting an anchor point and it not becoming solid (to clean its slot for another anchor point)")
}
var (
// plugin is the plugin instance of the sync plugin.
plugin *node.Plugin
once sync.Once
// ErrNodeNotSynchronized is returned when an operation can't be executed because
// the node is not synchronized.
ErrNodeNotSynchronized = errors.New("node is not synchronized")
// tells whether the node is synced or not.
synced atomic.Bool
log *logger.Logger
)
// Plugin gets the plugin instance.
func Plugin() *node.Plugin {
once.Do(func() {
plugin = node.NewPlugin(PluginName, node.Enabled, configure, run)
})
return plugin
}
// Synced tells whether the node is in a state we consider synchronized, meaning
// it has the relevant past and present message data.
func Synced() bool {
return synced.Load()
}
// OverwriteSyncedState overwrites the synced state with the given value.
func OverwriteSyncedState(syncedOverwrite bool) {
synced.Store(syncedOverwrite)
}
func configure(_ *node.Plugin) {
log = logger.NewLogger(PluginName)
}
func run(_ *node.Plugin) {
// per default the node starts in a desynced state
if !Synced() {
monitorForSynchronization()
return
}
// however, another plugin might want to overwrite the synced state (i.e. the bootstrap plugin)
// in order to start issuing messages
monitorForDesynchronization()
}
// marks the node as synced and spawns the background worker to monitor desynchronization.
func markSynced() {
synced.Store(true)
monitorForDesynchronization()
}
// marks the node as desynced and spawns the background worker to monitor synchronization.
func markDesynced() {
synced.Store(false)
monitorForSynchronization()
}
// starts a background worker and event handlers to check whether the node is desynchronized by checking
// whether the node has no more peers or didn't receive any message in a given time period.
func monitorForDesynchronization() {
log.Info("monitoring for desynchronization")
// monitors the peer count of the manager and sets the node as desynced if it has no more peers.
noPeers := make(chan types.Empty)
monitorPeerCountClosure := events.NewClosure(func(_ *gossipPkg.Neighbor) {
anyPeers := len(gossip.Manager().AllNeighbors()) > 0
if anyPeers {
return
}
noPeers <- types.Empty{}
})
msgReceived := make(chan types.Empty, 1)
monitorMessageInflowClosure := events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) {
defer cachedMessage.Release()
defer cachedMessageMetadata.Release()
// ignore messages sent by the node itself
if local.GetInstance().LocalIdentity().PublicKey() == cachedMessage.Unwrap().IssuerPublicKey() {
return
}
select {
case msgReceived <- types.Empty{}:
default:
// via this default clause, a slow desync-monitor select-loop
// worker should not increase latency as it auto. falls through
}
})
if err := daemon.BackgroundWorker("Desync-Monitor", func(shutdownSignal <-chan struct{}) {
gossip.Manager().Events().NeighborRemoved.Attach(monitorPeerCountClosure)
defer gossip.Manager().Events().NeighborRemoved.Detach(monitorPeerCountClosure)
messagelayer.Tangle().Events.MessageAttached.Attach(monitorMessageInflowClosure)
defer messagelayer.Tangle().Events.MessageAttached.Detach(monitorMessageInflowClosure)
timeForDesync := config.Node().GetDuration(CfgSyncDesyncedIfNoMessageAfterSec) * time.Second
timer := time.NewTimer(timeForDesync)
for {
select {
case <-msgReceived:
// we received a message, therefore reset the timer to check for message receives
if !timer.Stop() {
<-timer.C
}
// TODO: perhaps find a better way instead of constantly resetting the timer
timer.Reset(timeForDesync)
case <-timer.C:
log.Infof("no message received in %d seconds, marking node as desynced", int(timeForDesync.Seconds()))
markDesynced()
return
case <-noPeers:
log.Info("all peers have been lost, marking node as desynced")
markDesynced()
return
case <-shutdownSignal:
return
}
}
}, shutdown.PrioritySynchronization); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
}
// starts a background worker and event handlers to check whether the node is synchronized by first collecting
// a set of newly received messages and then waiting for them to become solid.
func monitorForSynchronization() {
wantedAnchorPointsCount := config.Node().GetInt(CfgSyncAnchorPointsCount)
anchorPoints := newAnchorPoints(wantedAnchorPointsCount)
log.Infof("monitoring for synchronization, awaiting %d anchor point messages to become solid", wantedAnchorPointsCount)
synced := make(chan types.Empty)
initAnchorPointClosure := events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) {
defer cachedMessage.Release()
defer cachedMessageMetadata.Release()
if addedAnchorID := initAnchorPoint(anchorPoints, cachedMessage.Unwrap()); addedAnchorID != nil {
anchorPoints.Lock()
defer anchorPoints.Unlock()
log.Infof("added message %s as anchor point (%d of %d collected)", addedAnchorID.String()[:10], anchorPoints.collectedCount(), anchorPoints.wanted)
}
})
checkAnchorPointSolidityClosure := events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) {
defer cachedMessage.Release()
defer cachedMessageMetadata.Release()
allSolid, newSolidAnchorID := checkAnchorPointSolidity(anchorPoints, cachedMessage.Unwrap())
if newSolidAnchorID != nil {
log.Infof("anchor message %s has become solid", newSolidAnchorID.String()[:10])
}
if !allSolid {
return
}
synced <- types.Empty{}
})
if err := daemon.BackgroundWorker("Sync-Monitor", func(shutdownSignal <-chan struct{}) {
messagelayer.Tangle().Events.MessageAttached.Attach(initAnchorPointClosure)
defer messagelayer.Tangle().Events.MessageAttached.Detach(initAnchorPointClosure)
messagelayer.Tangle().Events.MessageSolid.Attach(checkAnchorPointSolidityClosure)
defer messagelayer.Tangle().Events.MessageSolid.Detach(checkAnchorPointSolidityClosure)
cleanupDelta := config.Node().GetDuration(CfgSyncAnchorPointsCleanupAfterSec) * time.Second
ticker := time.NewTicker(config.Node().GetDuration(CfgSyncAnchorPointsCleanupIntervalSec) * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
anchorPoints.Lock()
for id, itGotAdded := range anchorPoints.ids {
if time.Since(itGotAdded) > cleanupDelta {
log.Infof("freeing anchor point slot of %s as it didn't become solid within %v", id.String()[:10], cleanupDelta)
delete(anchorPoints.ids, id)
}
}
anchorPoints.Unlock()
case <-shutdownSignal:
return
case <-synced:
log.Infof("all anchor messages have become solid, marking node as synced")
markSynced()
return
}
}
}, shutdown.PrioritySynchronization); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
}
// fills up the anchor points with newly attached messages which then are used to determine whether we are synchronized.
func initAnchorPoint(anchorPoints *anchorpoints, msg *message.Message) *message.Id {
if synced.Load() {
return nil
}
anchorPoints.Lock()
defer anchorPoints.Unlock()
// we don't need to add additional anchor points if the set was already filled once
if anchorPoints.wasFilled() {
return nil
}
// as a rule, we don't consider messages attaching directly to genesis anchors
if msg.TrunkId() == message.EmptyId || msg.BranchId() == message.EmptyId {
return nil
}
// add a new anchor point if its issuance time is newer than any other anchor point
id := msg.Id()
if !anchorPoints.add(id, msg.IssuingTime()) {
return nil
}
return &id
}
// checks whether an anchor point message became solid.
// if all anchor points became solid, it sets the node's state to synchronized.
func checkAnchorPointSolidity(anchorPoints *anchorpoints, msg *message.Message) (bool, *message.Id) {
anchorPoints.Lock()
defer anchorPoints.Unlock()
if synced.Load() || len(anchorPoints.ids) == 0 {
return false, nil
}
// check whether an anchor message become solid
msgID := msg.Id()
if !anchorPoints.has(msgID) {
return false, nil
}
// an anchor became solid
anchorPoints.markAsSolidified(msgID)
if !anchorPoints.wereAllSolidified() {
return false, &msgID
}
// all anchor points have become solid
return true, &msgID
}
func newAnchorPoints(wantedAnchorPointsCount int) *anchorpoints {
return &anchorpoints{
ids: make(map[message.Id]time.Time),
wanted: wantedAnchorPointsCount,
}
}
// anchorpoints are a set of messages which we use to determine whether the node has become synchronized.
type anchorpoints struct {
sync.Mutex
// the ids of the anchor points with their addition time.
ids map[message.Id]time.Time
// the wanted amount of anchor points which should become solid.
wanted int
// how many anchor points have been solidified.
solidified int
// holds the highest issuance time of any message which was an anchor point.
// this is used to determine whether further attached messages should become an
// anchor point by matching their issuance time against this time.
issuanceTimeThreshold time.Time
}
// adds the given message to the anchor points set if its issuance time is newer than
// any other existing anchor point's.
func (ap *anchorpoints) add(id message.Id, issuanceTime time.Time) bool {
if !ap.issuanceTimeThreshold.IsZero() &&
ap.issuanceTimeThreshold.Add(-issuanceThresholdBeforeTimeBoundary).After(issuanceTime) {
return false
}
ap.ids[id] = time.Now()
ap.issuanceTimeThreshold = issuanceTime
return true
}
func (ap *anchorpoints) has(id message.Id) bool {
_, has := ap.ids[id]
return has
}
// marks the given anchor point as solidified which removes it from the set and bumps the solidified count.
func (ap *anchorpoints) markAsSolidified(id message.Id) {
delete(ap.ids, id)
ap.solidified++
}
// tells whether the anchor points set was filled at some point.
func (ap *anchorpoints) wasFilled() bool {
return ap.collectedCount() == ap.wanted
}
// tells whether all anchor points have become solid.
func (ap *anchorpoints) wereAllSolidified() bool {
return ap.solidified == ap.wanted
}
// tells the number of effectively collected anchor points.
func (ap *anchorpoints) collectedCount() int {
// since an anchor point potentially was solidified before the set became full,
// we need to incorporate that count too
return ap.solidified + len(ap.ids)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment