package tangle import ( "container/list" "runtime" "time" "github.com/iotaledger/hive.go/kvstore" "github.com/iotaledger/hive.go/types" "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" "github.com/iotaledger/goshimmer/packages/binary/storageprefix" "github.com/iotaledger/hive.go/async" "github.com/iotaledger/hive.go/objectstorage" ) const ( // MaxMissingTimeBeforeCleanup is the max. amount of time a message can be marked as missing // before it is ultimately un-marked as missing. MaxMissingTimeBeforeCleanup = 30 * time.Second // MissingCheckInterval is the interval on which it is checked whether a missing // message is still missing. MissingCheckInterval = 5 * time.Second cacheTime = 20 * time.Second ) // Tangle represents the base layer of messages. type Tangle struct { messageStorage *objectstorage.ObjectStorage messageMetadataStorage *objectstorage.ObjectStorage approverStorage *objectstorage.ObjectStorage missingMessageStorage *objectstorage.ObjectStorage Events Events storeMessageWorkerPool async.WorkerPool solidifierWorkerPool async.WorkerPool shutdown chan struct{} } func messageFactory(key []byte) (objectstorage.StorableObject, int, error) { return message.StorableObjectFromKey(key) } func approverFactory(key []byte) (objectstorage.StorableObject, int, error) { return ApproverFromStorageKey(key) } func missingMessageFactory(key []byte) (objectstorage.StorableObject, int, error) { return MissingMessageFromStorageKey(key) } // New creates a new Tangle. func New(store kvstore.KVStore) (result *Tangle) { osFactory := objectstorage.NewFactory(store, storageprefix.MessageLayer) result = &Tangle{ shutdown: make(chan struct{}), messageStorage: osFactory.New(PrefixMessage, messageFactory, objectstorage.CacheTime(cacheTime), objectstorage.LeakDetectionEnabled(false)), messageMetadataStorage: osFactory.New(PrefixMessageMetadata, MessageMetadataFromStorageKey, objectstorage.CacheTime(cacheTime), objectstorage.LeakDetectionEnabled(false)), approverStorage: osFactory.New(PrefixApprovers, approverFactory, objectstorage.CacheTime(cacheTime), objectstorage.PartitionKey(message.IdLength, message.IdLength), objectstorage.LeakDetectionEnabled(false)), missingMessageStorage: osFactory.New(PrefixMissingMessage, missingMessageFactory, objectstorage.CacheTime(cacheTime), objectstorage.LeakDetectionEnabled(false)), Events: *newEvents(), } result.solidifierWorkerPool.Tune(runtime.GOMAXPROCS(0)) return } // AttachMessage attaches a new message to the tangle. func (tangle *Tangle) AttachMessage(msg *message.Message) { tangle.storeMessageWorkerPool.Submit(func() { tangle.storeMessageWorker(msg) }) } // Message retrieves a message from the tangle. func (tangle *Tangle) Message(messageId message.Id) *message.CachedMessage { return &message.CachedMessage{CachedObject: tangle.messageStorage.Load(messageId[:])} } // MessageMetadata retrieves the metadata of a message from the tangle. func (tangle *Tangle) MessageMetadata(messageId message.Id) *CachedMessageMetadata { return &CachedMessageMetadata{CachedObject: tangle.messageMetadataStorage.Load(messageId[:])} } // Approvers retrieves the approvers of a message from the tangle. func (tangle *Tangle) Approvers(messageId message.Id) CachedApprovers { approvers := make(CachedApprovers, 0) tangle.approverStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool { approvers = append(approvers, &CachedApprover{CachedObject: cachedObject}) return true }, messageId[:]) return approvers } // DeleteMessage deletes a message and its association to approvees by un-marking the given // message as an approver. func (tangle *Tangle) DeleteMessage(messageId message.Id) { tangle.Message(messageId).Consume(func(currentMsg *message.Message) { trunkMsgId := currentMsg.TrunkId() tangle.deleteApprover(trunkMsgId, messageId) branchMsgId := currentMsg.BranchId() if branchMsgId != trunkMsgId { tangle.deleteApprover(branchMsgId, messageId) } tangle.messageMetadataStorage.Delete(messageId[:]) tangle.messageStorage.Delete(messageId[:]) tangle.Events.MessageRemoved.Trigger(messageId) }) } // Shutdown marks the tangle as stopped, so it will not accept any new messages (waits for all backgroundTasks to finish). func (tangle *Tangle) Shutdown() *Tangle { tangle.storeMessageWorkerPool.ShutdownGracefully() tangle.solidifierWorkerPool.ShutdownGracefully() tangle.messageStorage.Shutdown() tangle.messageMetadataStorage.Shutdown() tangle.approverStorage.Shutdown() tangle.missingMessageStorage.Shutdown() close(tangle.shutdown) return tangle } // Prune resets the database and deletes all objects (good for testing or "node resets"). func (tangle *Tangle) Prune() error { for _, storage := range []*objectstorage.ObjectStorage{ tangle.messageStorage, tangle.messageMetadataStorage, tangle.approverStorage, tangle.missingMessageStorage, } { if err := storage.Prune(); err != nil { return err } } return nil } // worker that stores the message and calls the corresponding storage events. func (tangle *Tangle) storeMessageWorker(msg *message.Message) { // store message var cachedMessage *message.CachedMessage _tmp, msgIsNew := tangle.messageStorage.StoreIfAbsent(msg) if !msgIsNew { return } cachedMessage = &message.CachedMessage{CachedObject: _tmp} // store message metadata messageId := msg.Id() cachedMsgMetadata := &CachedMessageMetadata{CachedObject: tangle.messageMetadataStorage.Store(NewMessageMetadata(messageId))} // store trunk approver trunkMsgId := msg.TrunkId() tangle.approverStorage.Store(NewApprover(trunkMsgId, messageId)).Release() // store branch approver if branchMsgId := msg.BranchId(); branchMsgId != trunkMsgId { tangle.approverStorage.Store(NewApprover(branchMsgId, messageId)).Release() } // trigger events if tangle.missingMessageStorage.DeleteIfPresent(messageId[:]) { tangle.Events.MissingMessageReceived.Trigger(cachedMessage, cachedMsgMetadata) } tangle.Events.MessageAttached.Trigger(cachedMessage, cachedMsgMetadata) // check message solidity tangle.solidifierWorkerPool.Submit(func() { tangle.checkMessageSolidityAndPropagate(cachedMessage, cachedMsgMetadata) }) } // checks whether the given message is solid and marks it as missing if it isn't known. func (tangle *Tangle) isMessageMarkedAsSolid(messageId message.Id) bool { if messageId == message.EmptyId { return true } msgMetadataCached := tangle.MessageMetadata(messageId) defer msgMetadataCached.Release() msgMetadata := msgMetadataCached.Unwrap() // mark message as missing if msgMetadata == nil { missingMessage := NewMissingMessage(messageId) if cachedMissingMessage, stored := tangle.missingMessageStorage.StoreIfAbsent(missingMessage); stored { cachedMissingMessage.Consume(func(object objectstorage.StorableObject) { tangle.Events.MessageMissing.Trigger(messageId) }) } return false } return msgMetadata.IsSolid() } // checks whether the given message is solid by examining whether its trunk and // branch messages are solid. func (tangle *Tangle) isMessageSolid(msg *message.Message, msgMetadata *MessageMetadata) bool { if msg == nil || msg.IsDeleted() { return false } if msgMetadata == nil || msgMetadata.IsDeleted() { return false } if msgMetadata.IsSolid() { return true } return tangle.isMessageMarkedAsSolid(msg.TrunkId()) && tangle.isMessageMarkedAsSolid(msg.BranchId()) } // builds up a stack from the given message and tries to solidify into the present. // missing messages which are needed for a message to become solid are marked as missing. func (tangle *Tangle) checkMessageSolidityAndPropagate(cachedMessage *message.CachedMessage, cachedMsgMetadata *CachedMessageMetadata) { popElementsFromStack := func(stack *list.List) (*message.CachedMessage, *CachedMessageMetadata) { currentSolidificationEntry := stack.Front() currentCachedMsg := currentSolidificationEntry.Value.([2]interface{})[0] currentCachedMsgMetadata := currentSolidificationEntry.Value.([2]interface{})[1] stack.Remove(currentSolidificationEntry) return currentCachedMsg.(*message.CachedMessage), currentCachedMsgMetadata.(*CachedMessageMetadata) } // initialize the stack solidificationStack := list.New() solidificationStack.PushBack([2]interface{}{cachedMessage, cachedMsgMetadata}) // processed messages that are supposed to be checked for solidity recursively for solidificationStack.Len() > 0 { currentCachedMessage, currentCachedMsgMetadata := popElementsFromStack(solidificationStack) currentMessage := currentCachedMessage.Unwrap() currentMsgMetadata := currentCachedMsgMetadata.Unwrap() if currentMessage == nil || currentMsgMetadata == nil { currentCachedMessage.Release() currentCachedMsgMetadata.Release() continue } // mark the message as solid if it has become solid if tangle.isMessageSolid(currentMessage, currentMsgMetadata) && currentMsgMetadata.SetSolid(true) { tangle.Events.MessageSolid.Trigger(currentCachedMessage, currentCachedMsgMetadata) // auto. push approvers of the newly solid message to propagate solidification tangle.Approvers(currentMessage.Id()).Consume(func(approver *Approver) { approverMessageId := approver.ApproverMessageId() solidificationStack.PushBack([2]interface{}{ tangle.Message(approverMessageId), tangle.MessageMetadata(approverMessageId), }) }) } currentCachedMessage.Release() currentCachedMsgMetadata.Release() } } // MonitorMissingMessages continuously monitors for missing messages and eventually deletes them if they // don't become available in a certain time frame. func (tangle *Tangle) MonitorMissingMessages(shutdownSignal <-chan struct{}) { reCheckInterval := time.NewTicker(MissingCheckInterval) defer reCheckInterval.Stop() for { select { case <-reCheckInterval.C: var toDelete []message.Id var toUnmark []message.Id tangle.missingMessageStorage.ForEach(func(key []byte, cachedObject objectstorage.CachedObject) bool { defer cachedObject.Release() missingMessage := cachedObject.Get().(*MissingMessage) if tangle.messageStorage.Contains(missingMessage.messageId.Bytes()) { toUnmark = append(toUnmark, missingMessage.MessageId()) return true } // check whether message is missing since over our max time delta if time.Since(missingMessage.MissingSince()) >= MaxMissingTimeBeforeCleanup { toDelete = append(toDelete, missingMessage.MessageId()) } return true }) for _, msgID := range toUnmark { tangle.missingMessageStorage.DeleteIfPresent(msgID.Bytes()) } for _, msgID := range toDelete { // delete the future cone of the missing message tangle.Events.MessageUnsolidifiable.Trigger(msgID) // TODO: obvious race condition between receiving the message and it getting deleted here tangle.deleteFutureCone(msgID) } case <-shutdownSignal: return } } } // deletes the given approver association for the given approvee to its approver. func (tangle *Tangle) deleteApprover(approvedMessageId message.Id, approvingMessage message.Id) { idToDelete := make([]byte, message.IdLength+message.IdLength) copy(idToDelete[:message.IdLength], approvedMessageId[:]) copy(idToDelete[message.IdLength:], approvingMessage[:]) tangle.approverStorage.Delete(idToDelete) } // deletes a message and its future cone of messages/approvers. func (tangle *Tangle) deleteFutureCone(messageId message.Id) { cleanupStack := list.New() cleanupStack.PushBack(messageId) processedMessages := make(map[message.Id]types.Empty) processedMessages[messageId] = types.Void for cleanupStack.Len() >= 1 { currentStackEntry := cleanupStack.Front() currentMessageId := currentStackEntry.Value.(message.Id) cleanupStack.Remove(currentStackEntry) tangle.DeleteMessage(currentMessageId) tangle.Approvers(currentMessageId).Consume(func(approver *Approver) { approverId := approver.ApproverMessageId() if _, messageProcessed := processedMessages[approverId]; !messageProcessed { cleanupStack.PushBack(approverId) processedMessages[approverId] = types.Void } }) } }