plugin.go 6.11 KiB
package spa
import (
"net"
"net/http"
"runtime"
"strconv"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/iotaledger/hive.go/autopeering/peer/service"
"github.com/labstack/echo"
"github.com/labstack/echo/middleware"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/goshimmer/plugins/banner"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/goshimmer/plugins/gossip"
"github.com/iotaledger/goshimmer/plugins/metrics"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
"github.com/iotaledger/hive.go/workerpool"
)
var (
PLUGIN = node.NewPlugin("SPA", node.Enabled, configure, run)
log *logger.Logger
nodeStartAt = time.Now()
clientsMu sync.Mutex
clients = make(map[uint64]chan interface{})
nextClientID uint64 = 0
wsSendWorkerCount = 1
wsSendWorkerQueueSize = 250
wsSendWorkerPool *workerpool.WorkerPool
)
func configure(plugin *node.Plugin) {
log = logger.NewLogger(plugin.Name)
wsSendWorkerPool = workerpool.New(func(task workerpool.Task) {
sendToAllWSClient(&msg{MsgTypeTPSMetric, task.Param(0).(uint64)})
sendToAllWSClient(&msg{MsgTypeNodeStatus, currentNodeStatus()})
sendToAllWSClient(&msg{MsgTypeNeighborMetric, neighborMetrics()})
task.Return(nil)
}, workerpool.WorkerCount(wsSendWorkerCount), workerpool.QueueSize(wsSendWorkerQueueSize))
configureLiveFeed()
configureDrngLiveFeed()
}
func run(plugin *node.Plugin) {
notifyStatus := events.NewClosure(func(tps uint64) {
wsSendWorkerPool.TrySubmit(tps)
})
daemon.BackgroundWorker("SPA[WSSend]", func(shutdownSignal <-chan struct{}) {
metrics.Events.ReceivedTPSUpdated.Attach(notifyStatus)
wsSendWorkerPool.Start()
<-shutdownSignal
log.Info("Stopping SPA[WSSend] ...")
metrics.Events.ReceivedTPSUpdated.Detach(notifyStatus)
wsSendWorkerPool.Stop()
log.Info("Stopping SPA[WSSend] ... done")
}, shutdown.ShutdownPrioritySPA)
runLiveFeed()
runDrngLiveFeed()
// allow any origin for websocket connections
upgrader.CheckOrigin = func(r *http.Request) bool {
return true
}
e := echo.New()
e.HideBanner = true
e.Use(middleware.Recover())
if config.Node.GetBool(CFG_BASIC_AUTH_ENABLED) {
e.Use(middleware.BasicAuth(func(username, password string, c echo.Context) (bool, error) {
if username == config.Node.GetString(CFG_BASIC_AUTH_USERNAME) &&
password == config.Node.GetString(CFG_BASIC_AUTH_PASSWORD) {
return true, nil
}
return false, nil
}))
}
setupRoutes(e)
addr := config.Node.GetString(CFG_BIND_ADDRESS)
log.Infof("You can now access the dashboard using: http://%s", addr)
go e.Start(addr)
}
// sends the given message to all connected websocket clients
func sendToAllWSClient(msg interface{}) {
clientsMu.Lock()
defer clientsMu.Unlock()
for _, channel := range clients {
select {
case channel <- msg:
default:
// drop if buffer not drained
}
}
}
var webSocketWriteTimeout = time.Duration(3) * time.Second
var (
upgrader = websocket.Upgrader{
HandshakeTimeout: webSocketWriteTimeout,
EnableCompression: true,
}
)
const (
MsgTypeNodeStatus byte = iota
MsgTypeTPSMetric
MsgTypeTx
MsgTypeNeighborMetric
MsgTypeDrng
)
type msg struct {
Type byte `json:"type"`
Data interface{} `json:"data"`
}
type tx struct {
Hash string `json:"hash"`
Value int64 `json:"value"`
}
type drngMsg struct {
Instance uint32 `json:"instance"`
DistributedPK string `json:"dpk"`
Round uint64 `json:"round"`
Randomness string `json:"randomness"`
Timestamp string `json:"timestamp"`
}
type nodestatus struct {
ID string `json:"id"`
Version string `json:"version"`
Uptime int64 `json:"uptime"`
Mem *memmetrics `json:"mem"`
}
type memmetrics struct {
Sys uint64 `json:"sys"`
HeapSys uint64 `json:"heap_sys"`
HeapInuse uint64 `json:"heap_inuse"`
HeapIdle uint64 `json:"heap_idle"`
HeapReleased uint64 `json:"heap_released"`
HeapObjects uint64 `json:"heap_objects"`
MSpanInuse uint64 `json:"m_span_inuse"`
MCacheInuse uint64 `json:"m_cache_inuse"`
StackSys uint64 `json:"stack_sys"`
NumGC uint32 `json:"num_gc"`
LastPauseGC uint64 `json:"last_pause_gc"`
}
type neighbormetric struct {
ID string `json:"id"`
Address string `json:"address"`
ConnectionOrigin string `json:"connection_origin"`
BytesRead uint32 `json:"bytes_read"`
BytesWritten uint32 `json:"bytes_written"`
}
func neighborMetrics() []neighbormetric {
stats := []neighbormetric{}
// gossip plugin might be disabled
neighbors := gossip.GetAllNeighbors()
if neighbors == nil {
return stats
}
for _, neighbor := range neighbors {
// unfortunately the neighbor manager doesn't keep track of the origin of the connection
origin := "Inbound"
for _, peer := range autopeering.Selection.GetOutgoingNeighbors() {
if neighbor.Peer == peer {
origin = "Outbound"
break
}
}
host := neighbor.Peer.IP().String()
port := neighbor.Peer.Services().Get(service.GossipKey).Port()
stats = append(stats, neighbormetric{
ID: neighbor.Peer.ID().String(),
Address: net.JoinHostPort(host, strconv.Itoa(port)),
BytesRead: neighbor.BytesRead(),
BytesWritten: neighbor.BytesWritten(),
ConnectionOrigin: origin,
})
}
return stats
}
func currentNodeStatus() *nodestatus {
var m runtime.MemStats
runtime.ReadMemStats(&m)
status := &nodestatus{}
status.ID = local.GetInstance().ID().String()
// node status
status.Version = banner.AppVersion
status.Uptime = time.Since(nodeStartAt).Milliseconds()
// memory metrics
status.Mem = &memmetrics{
Sys: m.Sys,
HeapSys: m.HeapSys,
HeapInuse: m.HeapInuse,
HeapIdle: m.HeapIdle,
HeapReleased: m.HeapReleased,
HeapObjects: m.HeapObjects,
MSpanInuse: m.MSpanInuse,
MCacheInuse: m.MCacheInuse,
StackSys: m.StackSys,
NumGC: m.NumGC,
LastPauseGC: m.PauseNs[(m.NumGC+255)%256],
}
return status
}