-
Jonas Theis authored
* Missing message investigation * Delete suddenly appeared messages *
Add IsDeleted check * Undo deletion * Add metadata check to messageExists * Fix bug * Move sendRequest and ensure timer is stopped * Remove count check * Undo request deletion and only use StopRequest * Move MissingMessageAppeared to StopRequest * Force MissingMessageReceived triggering * Enable deadlock mutex for debug * refactor reRequest lock-unlock * re-enable messageExists check * Remove waiting for the timer to be stopped * Re-enable count * Move IO access out of the lock * Remove MissingMessageAppeared * Remove messageExistsFunc check * Refactor reRequest * Increase threshold and remove print * Fix linter warnings Co-authored-by:Levente Pap <levente.pap@iota.org> Co-authored-by:
capossele <angelocapossele@gmail.com>
Jonas Theis authored* Missing message investigation * Delete suddenly appeared messages *
Add IsDeleted check * Undo deletion * Add metadata check to messageExists * Fix bug * Move sendRequest and ensure timer is stopped * Remove count check * Undo request deletion and only use StopRequest * Move MissingMessageAppeared to StopRequest * Force MissingMessageReceived triggering * Enable deadlock mutex for debug * refactor reRequest lock-unlock * re-enable messageExists check * Remove waiting for the timer to be stopped * Re-enable count * Move IO access out of the lock * Remove MissingMessageAppeared * Remove messageExistsFunc check * Refactor reRequest * Increase threshold and remove print * Fix linter warnings Co-authored-by:Levente Pap <levente.pap@iota.org> Co-authored-by:
capossele <angelocapossele@gmail.com>
plugin.go 4.64 KiB
package messagelayer
import (
"sync"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/messagefactory"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/messageparser"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/messagerequester"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/tangle"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/tipselector"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/goshimmer/plugins/database"
"github.com/iotaledger/hive.go/autopeering/peer"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
)
const (
PluginName = "MessageLayer"
DBSequenceNumber = "seq"
)
var (
// plugin is the plugin instance of the message layer plugin.
plugin *node.Plugin
pluginOnce sync.Once
messageParser *messageparser.MessageParser
msgParserOnce sync.Once
messageRequester *messagerequester.MessageRequester
msgReqOnce sync.Once
tipSelector *tipselector.TipSelector
tipSelectorOnce sync.Once
_tangle *tangle.Tangle
tangleOnce sync.Once
messageFactory *messagefactory.MessageFactory
msgFactoryOnce sync.Once
log *logger.Logger
)
// Plugin gets the plugin instance.
func Plugin() *node.Plugin {
pluginOnce.Do(func() {
plugin = node.NewPlugin(PluginName, node.Enabled, configure, run)
})
return plugin
}
// MessageParser gets the messageParser instance.
func MessageParser() *messageparser.MessageParser {
msgParserOnce.Do(func() {
messageParser = messageparser.New()
})
return messageParser
}
// TipSelector gets the tipSelector instance.
func TipSelector() *tipselector.TipSelector {
tipSelectorOnce.Do(func() {
tipSelector = tipselector.New()
})
return tipSelector
}
// Tangle gets the tangle instance.
func Tangle() *tangle.Tangle {
tangleOnce.Do(func() {
store := database.Store()
_tangle = tangle.New(store)
})
return _tangle
}
// MessageFactory gets the messageFactory instance.
func MessageFactory() *messagefactory.MessageFactory {
msgFactoryOnce.Do(func() {
messageFactory = messagefactory.New(database.Store(), []byte(DBSequenceNumber), local.GetInstance().LocalIdentity(), TipSelector())
})
return messageFactory
}
// MessageRequester gets the messageRequester instance.
func MessageRequester() *messagerequester.MessageRequester {
msgReqOnce.Do(func() {
messageRequester = messagerequester.New(messageExists)
})
return messageRequester
}
func configure(*node.Plugin) {
log = logger.NewLogger(PluginName)
// create instances
messageParser = MessageParser()
messageRequester = MessageRequester()
tipSelector = TipSelector()
_tangle = Tangle()
// Setup messageFactory (behavior + logging))
messageFactory = MessageFactory()
messageFactory.Events.MessageConstructed.Attach(events.NewClosure(_tangle.AttachMessage))
messageFactory.Events.Error.Attach(events.NewClosure(func(err error) {
log.Errorf("internal error in message factory: %v", err)
}))
// setup messageParser
messageParser.Events.MessageParsed.Attach(events.NewClosure(func(msg *message.Message, peer *peer.Peer) {
// TODO: ADD PEER
_tangle.AttachMessage(msg)
}))
// setup messageRequester
_tangle.Events.MessageMissing.Attach(events.NewClosure(messageRequester.StartRequest))
_tangle.Events.MissingMessageReceived.Attach(events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) {
cachedMessageMetadata.Release()
cachedMessage.Consume(func(msg *message.Message) {
messageRequester.StopRequest(msg.Id())
})
}))
// setup tipSelector
_tangle.Events.MessageSolid.Attach(events.NewClosure(func(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) {
cachedMessageMetadata.Release()
cachedMessage.Consume(tipSelector.AddTip)
}))
MessageRequester().Events.MissingMessageAppeared.Attach(events.NewClosure(func(id message.Id) {
_tangle.DeleteMissingMessage(id)
}))
}
func run(*node.Plugin) {
if err := daemon.BackgroundWorker("Tangle", func(shutdownSignal <-chan struct{}) {
<-shutdownSignal
messageFactory.Shutdown()
_tangle.Shutdown()
}, shutdown.PriorityTangle); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
}
// messageExists tells if a given message is present in the node
func messageExists(msgID message.Id) bool {
cachedMessage := Tangle().Message(msgID)
defer cachedMessage.Release()
return cachedMessage.Exists()
}