Skip to content
Snippets Groups Projects
Commit cac95695 authored by capossele's avatar capossele
Browse files

:recycle: refactors livefeed

parent 91241bd2
No related branches found
No related tags found
No related merge requests found
...@@ -19,8 +19,8 @@ var liveFeedWorkerPool *workerpool.WorkerPool ...@@ -19,8 +19,8 @@ var liveFeedWorkerPool *workerpool.WorkerPool
func configureLiveFeed() { func configureLiveFeed() {
liveFeedWorkerPool = workerpool.New(func(task workerpool.Task) { liveFeedWorkerPool = workerpool.New(func(task workerpool.Task) {
task.Param(0).(*message.CachedMessage).Consume(func(transaction *message.Message) { task.Param(0).(*message.CachedMessage).Consume(func(message *message.Message) {
sendToAllWSClient(&msg{MsgTypeTx, &tx{transaction.Id().String(), 0}}) sendToAllWSClient(&msg{MsgTypeTx, &tx{message.Id().String(), 0}})
}) })
task.Return(nil) task.Return(nil)
...@@ -28,26 +28,26 @@ func configureLiveFeed() { ...@@ -28,26 +28,26 @@ func configureLiveFeed() {
} }
func runLiveFeed() { func runLiveFeed() {
newTxRateLimiter := time.NewTicker(time.Second / 10) newMsgRateLimiter := time.NewTicker(time.Second / 10)
notifyNewTx := events.NewClosure(func(tx *message.CachedMessage, metadata *tangle.CachedMessageMetadata) { notifyNewMsg := events.NewClosure(func(message *message.CachedMessage, metadata *tangle.CachedMessageMetadata) {
metadata.Release() metadata.Release()
select { select {
case <-newTxRateLimiter.C: case <-newMsgRateLimiter.C:
liveFeedWorkerPool.TrySubmit(tx) liveFeedWorkerPool.TrySubmit(message)
default: default:
tx.Release() message.Release()
} }
}) })
daemon.BackgroundWorker("SPA[TxUpdater]", func(shutdownSignal <-chan struct{}) { daemon.BackgroundWorker("SPA[MsgUpdater]", func(shutdownSignal <-chan struct{}) {
messagelayer.Tangle.Events.TransactionAttached.Attach(notifyNewTx) messagelayer.Tangle.Events.TransactionAttached.Attach(notifyNewMsg)
liveFeedWorkerPool.Start() liveFeedWorkerPool.Start()
<-shutdownSignal <-shutdownSignal
log.Info("Stopping SPA[TxUpdater] ...") log.Info("Stopping SPA[MsgUpdater] ...")
messagelayer.Tangle.Events.TransactionAttached.Detach(notifyNewTx) messagelayer.Tangle.Events.TransactionAttached.Detach(notifyNewMsg)
newTxRateLimiter.Stop() newMsgRateLimiter.Stop()
liveFeedWorkerPool.Stop() liveFeedWorkerPool.Stop()
log.Info("Stopping SPA[TxUpdater] ... done") log.Info("Stopping SPA[MsgUpdater] ... done")
}, shutdown.ShutdownPrioritySPA) }, shutdown.ShutdownPrioritySPA)
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment