Skip to content
Snippets Groups Projects
livefeed.go 1.73 KiB
Newer Older
package dashboard
Hans Moog's avatar
Hans Moog committed
	"github.com/iotaledger/goshimmer/packages/shutdown"
	"github.com/iotaledger/goshimmer/packages/tangle"
	"github.com/iotaledger/goshimmer/plugins/messagelayer"
	"github.com/iotaledger/hive.go/daemon"
	"github.com/iotaledger/hive.go/events"
	"github.com/iotaledger/hive.go/workerpool"
)

var liveFeedWorkerCount = 1
var liveFeedWorkerQueueSize = 50
var liveFeedWorkerPool *workerpool.WorkerPool

func configureLiveFeed() {
	liveFeedWorkerPool = workerpool.New(func(task workerpool.Task) {
		task.Param(0).(*tangle.CachedMessage).Consume(func(message *tangle.Message) {
Acha Bill's avatar
Acha Bill committed
			broadcastWsMessage(&wsmsg{MsgTypeMessage, &msg{message.ID().String(), 0}})
		task.Return(nil)
	}, workerpool.WorkerCount(liveFeedWorkerCount), workerpool.QueueSize(liveFeedWorkerQueueSize))
}

func runLiveFeed() {
capossele's avatar
capossele committed
	newMsgRateLimiter := time.NewTicker(time.Second / 10)
	notifyNewMsg := events.NewClosure(func(cachedMsgEvent *tangle.CachedMessageEvent) {
		cachedMsgEvent.MessageMetadata.Release()
capossele's avatar
capossele committed
		case <-newMsgRateLimiter.C:
			_, ok := liveFeedWorkerPool.TrySubmit(cachedMsgEvent.Message)
			if !ok {
				cachedMsgEvent.Message.Release()
			cachedMsgEvent.Message.Release()
	if err := daemon.BackgroundWorker("Dashboard[MsgUpdater]", func(shutdownSignal <-chan struct{}) {
		messagelayer.Tangle().Events.MessageAttached.Attach(notifyNewMsg)
		liveFeedWorkerPool.Start()
		<-shutdownSignal
		log.Info("Stopping Dashboard[MsgUpdater] ...")
		messagelayer.Tangle().Events.MessageAttached.Detach(notifyNewMsg)
capossele's avatar
capossele committed
		newMsgRateLimiter.Stop()
		liveFeedWorkerPool.Stop()
		log.Info("Stopping Dashboard[MsgUpdater] ... done")
	}, shutdown.PriorityDashboard); err != nil {
		log.Panicf("Failed to start as daemon: %s", err)
	}