package dashboard import ( "net/http" "sync" "time" "github.com/gorilla/websocket" "github.com/iotaledger/goshimmer/packages/shutdown" "github.com/iotaledger/goshimmer/plugins/messagelayer" "github.com/iotaledger/goshimmer/plugins/metrics" "github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/workerpool" "github.com/labstack/echo" ) var ( // settings wsSendWorkerCount = 1 wsSendWorkerQueueSize = 250 wsSendWorkerPool *workerpool.WorkerPool webSocketWriteTimeout = time.Duration(3) * time.Second // clients wsClientsMu sync.Mutex wsClients = make(map[uint64]*wsclient) nextWsClientID uint64 // gorilla websocket layer upgrader = websocket.Upgrader{ HandshakeTimeout: webSocketWriteTimeout, CheckOrigin: func(r *http.Request) bool { return true }, EnableCompression: true, } ) // a websocket client with a channel for downstream messages. type wsclient struct { // downstream message channel. channel chan interface{} // a channel which is closed when the websocket client is disconnected. exit chan struct{} } func configureWebSocketWorkerPool() { wsSendWorkerPool = workerpool.New(func(task workerpool.Task) { broadcastWsMessage(&wsmsg{MsgTypeMPSMetric, task.Param(0).(uint64)}) broadcastWsMessage(&wsmsg{MsgTypeNodeStatus, currentNodeStatus()}) broadcastWsMessage(&wsmsg{MsgTypeNeighborMetric, neighborMetrics()}) broadcastWsMessage(&wsmsg{MsgTypeTipsMetric, messagelayer.TipSelector.TipCount()}) task.Return(nil) }, workerpool.WorkerCount(wsSendWorkerCount), workerpool.QueueSize(wsSendWorkerQueueSize)) } func runWebSocketStreams() { updateStatus := events.NewClosure(func(mps uint64) { wsSendWorkerPool.TrySubmit(mps) }) if err := daemon.BackgroundWorker("Dashboard[StatusUpdate]", func(shutdownSignal <-chan struct{}) { metrics.Events.ReceivedMPSUpdated.Attach(updateStatus) wsSendWorkerPool.Start() <-shutdownSignal log.Info("Stopping Dashboard[StatusUpdate] ...") metrics.Events.ReceivedMPSUpdated.Detach(updateStatus) wsSendWorkerPool.Stop() log.Info("Stopping Dashboard[StatusUpdate] ... done") }, shutdown.PriorityDashboard); err != nil { log.Panicf("Failed to start as daemon: %s", err) } } // reigsters and creates a new websocket client. func registerWSClient() (uint64, *wsclient) { wsClientsMu.Lock() defer wsClientsMu.Unlock() clientID := nextWsClientID wsClient := &wsclient{ channel: make(chan interface{}, 500), exit: make(chan struct{}), } wsClients[clientID] = wsClient nextWsClientID++ return clientID, wsClient } // removes the websocket client with the given id. func removeWsClient(clientID uint64) { wsClientsMu.Lock() defer wsClientsMu.Unlock() wsClient := wsClients[clientID] close(wsClient.exit) close(wsClient.channel) delete(wsClients, clientID) } // broadcasts the given message to all connected websocket clients. func broadcastWsMessage(msg interface{}, dontDrop ...bool) { wsClientsMu.Lock() defer wsClientsMu.Unlock() for _, wsClient := range wsClients { if len(dontDrop) > 0 { select { case wsClient.channel <- msg: case <-wsClient.exit: // get unblocked if the websocket connection just got closed } continue } select { case wsClient.channel <- msg: default: // potentially drop if slow consumer } } } func websocketRoute(c echo.Context) error { defer func() { if r := recover(); r != nil { log.Errorf("recovered from websocket handle func: %s", r) } }() // upgrade to websocket connection ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil) if err != nil { return err } defer ws.Close() ws.EnableWriteCompression(true) // cleanup client websocket clientID, wsClient := registerWSClient() defer removeWsClient(clientID) for { msg := <-wsClient.channel if err := ws.WriteJSON(msg); err != nil { break } if err := ws.SetWriteDeadline(time.Now().Add(webSocketWriteTimeout)); err != nil { break } } return nil }