-
Levente Pap authoredLevente Pap authored
plugin.go 6.69 KiB
package dashboard
import (
"context"
"errors"
"net"
"net/http"
"runtime"
"strconv"
"sync"
"time"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/goshimmer/plugins/banner"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/goshimmer/plugins/drng"
"github.com/iotaledger/goshimmer/plugins/gossip"
"github.com/iotaledger/goshimmer/plugins/metrics"
"github.com/iotaledger/hive.go/autopeering/peer/service"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
"github.com/labstack/echo"
"github.com/labstack/echo/middleware"
)
// PluginName is the name of the dashboard plugin.
const PluginName = "Dashboard"
var (
// plugin is the plugin instance of the dashboard plugin.
plugin *node.Plugin
once sync.Once
log *logger.Logger
server *echo.Echo
nodeStartAt = time.Now()
)
// Plugin gets the plugin instance.
func Plugin() *node.Plugin {
once.Do(func() {
plugin = node.NewPlugin(PluginName, node.Enabled, configure, run)
})
return plugin
}
func configure(plugin *node.Plugin) {
log = logger.NewLogger(plugin.Name)
configureWebSocketWorkerPool()
configureLiveFeed()
configureDrngLiveFeed()
configureVisualizer()
configureServer()
}
func configureServer() {
server = echo.New()
server.HideBanner = true
server.HidePort = true
server.Use(middleware.Recover())
if config.Node().GetBool(CfgBasicAuthEnabled) {
server.Use(middleware.BasicAuth(func(username, password string, c echo.Context) (bool, error) {
if username == config.Node().GetString(CfgBasicAuthUsername) &&
password == config.Node().GetString(CfgBasicAuthPassword) {
return true, nil
}
return false, nil
}))
}
setupRoutes(server)
}
func run(*node.Plugin) {
// run message broker
runWebSocketStreams()
// run the message live feed
runLiveFeed()
// run the visualizer vertex feed
runVisualizer()
// run dRNG live feed if dRNG plugin is enabled
if !node.IsSkipped(drng.Plugin()) {
runDrngLiveFeed()
}
log.Infof("Starting %s ...", PluginName)
if err := daemon.BackgroundWorker(PluginName, worker, shutdown.PriorityAnalysis); err != nil {
log.Panicf("Error starting as daemon: %s", err)
}
}
func worker(shutdownSignal <-chan struct{}) {
defer log.Infof("Stopping %s ... done", PluginName)
// start the web socket worker pool
wsSendWorkerPool.Start()
defer wsSendWorkerPool.Stop()
// submit the mps to the worker pool when triggered
notifyStatus := events.NewClosure(func(mps uint64) { wsSendWorkerPool.TrySubmit(mps) })
metrics.Events.ReceivedMPSUpdated.Attach(notifyStatus)
defer metrics.Events.ReceivedMPSUpdated.Detach(notifyStatus)
stopped := make(chan struct{})
bindAddr := config.Node().GetString(CfgBindAddress)
go func() {
log.Infof("%s started, bind-address=%s", PluginName, bindAddr)
if err := server.Start(bindAddr); err != nil {
if !errors.Is(err, http.ErrServerClosed) {
log.Errorf("Error serving: %s", err)
}
close(stopped)
}
}()
// stop if we are shutting down or the server could not be started
select {
case <-shutdownSignal:
case <-stopped:
}
log.Infof("Stopping %s ...", PluginName)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
log.Errorf("Error stopping: %s", err)
}
}
const (
// MsgTypeNodeStatus is the type of the NodeStatus message.
MsgTypeNodeStatus byte = iota
// MsgTypeMPSMetric is the type of the message per second (MPS) metric message.
MsgTypeMPSMetric
// MsgTypeMessage is the type of the message.
MsgTypeMessage
// MsgTypeNeighborMetric is the type of the NeighborMetric message.
MsgTypeNeighborMetric
// MsgTypeDrng is the type of the dRNG message.
MsgTypeDrng
// MsgTypeTipsMetric is the type of the TipsMetric message.
MsgTypeTipsMetric
// MsgTypeVertex defines a vertex message.
MsgTypeVertex
// MsgTypeTipInfo defines a tip info message.
MsgTypeTipInfo
)
type wsmsg struct {
Type byte `json:"type"`
Data interface{} `json:"data"`
}
type msg struct {
ID string `json:"id"`
Value int64 `json:"value"`
}
type nodestatus struct {
ID string `json:"id"`
Version string `json:"version"`
Uptime int64 `json:"uptime"`
Synced bool `json:"synced"`
Mem *memmetrics `json:"mem"`
}
type memmetrics struct {
Sys uint64 `json:"sys"`
HeapSys uint64 `json:"heap_sys"`
HeapInuse uint64 `json:"heap_inuse"`
HeapIdle uint64 `json:"heap_idle"`
HeapReleased uint64 `json:"heap_released"`
HeapObjects uint64 `json:"heap_objects"`
MSpanInuse uint64 `json:"m_span_inuse"`
MCacheInuse uint64 `json:"m_cache_inuse"`
StackSys uint64 `json:"stack_sys"`
NumGC uint32 `json:"num_gc"`
LastPauseGC uint64 `json:"last_pause_gc"`
}
type neighbormetric struct {
ID string `json:"id"`
Address string `json:"address"`
ConnectionOrigin string `json:"connection_origin"`
BytesRead uint32 `json:"bytes_read"`
BytesWritten uint32 `json:"bytes_written"`
}
func neighborMetrics() []neighbormetric {
var stats []neighbormetric
// gossip plugin might be disabled
neighbors := gossip.Manager().AllNeighbors()
if neighbors == nil {
return stats
}
for _, neighbor := range neighbors {
// unfortunately the neighbor manager doesn't keep track of the origin of the connection
origin := "Inbound"
for _, peer := range autopeering.Selection().GetOutgoingNeighbors() {
if neighbor.Peer == peer {
origin = "Outbound"
break
}
}
host := neighbor.Peer.IP().String()
port := neighbor.Peer.Services().Get(service.GossipKey).Port()
stats = append(stats, neighbormetric{
ID: neighbor.Peer.ID().String(),
Address: net.JoinHostPort(host, strconv.Itoa(port)),
BytesRead: neighbor.BytesRead(),
BytesWritten: neighbor.BytesWritten(),
ConnectionOrigin: origin,
})
}
return stats
}
func currentNodeStatus() *nodestatus {
var m runtime.MemStats
runtime.ReadMemStats(&m)
status := &nodestatus{}
status.ID = local.GetInstance().ID().String()
// node status
status.Version = banner.AppVersion
status.Uptime = time.Since(nodeStartAt).Milliseconds()
status.Synced = metrics.Synced()
// memory metrics
status.Mem = &memmetrics{
Sys: m.Sys,
HeapSys: m.HeapSys,
HeapInuse: m.HeapInuse,
HeapIdle: m.HeapIdle,
HeapReleased: m.HeapReleased,
HeapObjects: m.HeapObjects,
MSpanInuse: m.MSpanInuse,
MCacheInuse: m.MCacheInuse,
StackSys: m.StackSys,
NumGC: m.NumGC,
LastPauseGC: m.PauseNs[(m.NumGC+255)%256],
}
return status
}