Skip to content
Snippets Groups Projects
Select Git revision
  • 8eb9708cd44f54c2af78bb12c8bbb621b5a3ff29
  • without_tipselection default
  • develop protected
  • fix/grafana-local-dashboard
  • wasp
  • fix/dashboard-explorer-freeze
  • master
  • feat/timerqueue
  • test/sync_debug_and_650
  • feat/sync_revamp_inv
  • wip/sync
  • tool/db-recovery
  • portcheck/fix
  • fix/synchronization
  • feat/new-dashboard-analysis
  • feat/refactored-analysis-dashboard
  • feat/new-analysis-dashboard
  • test/demo-prometheus-fpc
  • prometheus_metrics
  • wip/analysis-server
  • merge/fpc-test-value-transfer
  • v0.2.2
  • v0.2.1
  • v0.2.0
  • v0.1.3
  • v0.1.2
  • v0.1.1
  • v0.1.0
28 results

parameters.go

Blame
  • plugin.go 5.84 KiB
    package spa
    
    import (
    	"net"
    	"net/http"
    	"runtime"
    	"strconv"
    	"sync"
    	"time"
    
    	"github.com/gorilla/websocket"
    	"github.com/iotaledger/hive.go/autopeering/peer/service"
    	"github.com/labstack/echo"
    	"github.com/labstack/echo/middleware"
    
    	"github.com/iotaledger/goshimmer/packages/shutdown"
    	"github.com/iotaledger/goshimmer/plugins/autopeering"
    	"github.com/iotaledger/goshimmer/plugins/autopeering/local"
    	"github.com/iotaledger/goshimmer/plugins/banner"
    	"github.com/iotaledger/goshimmer/plugins/config"
    	"github.com/iotaledger/goshimmer/plugins/gossip"
    	"github.com/iotaledger/goshimmer/plugins/metrics"
    
    	"github.com/iotaledger/hive.go/daemon"
    	"github.com/iotaledger/hive.go/events"
    	"github.com/iotaledger/hive.go/logger"
    	"github.com/iotaledger/hive.go/node"
    	"github.com/iotaledger/hive.go/workerpool"
    )
    
    var (
    	PLUGIN = node.NewPlugin("SPA", node.Enabled, configure, run)
    	log    *logger.Logger
    
    	nodeStartAt = time.Now()
    
    	clientsMu    sync.Mutex
    	clients             = make(map[uint64]chan interface{})
    	nextClientID uint64 = 0
    
    	wsSendWorkerCount     = 1
    	wsSendWorkerQueueSize = 250
    	wsSendWorkerPool      *workerpool.WorkerPool
    )
    
    func configure(plugin *node.Plugin) {
    	log = logger.NewLogger(plugin.Name)
    
    	wsSendWorkerPool = workerpool.New(func(task workerpool.Task) {
    		sendToAllWSClient(&msg{MsgTypeTPSMetric, task.Param(0).(uint64)})
    		sendToAllWSClient(&msg{MsgTypeNodeStatus, currentNodeStatus()})
    		sendToAllWSClient(&msg{MsgTypeNeighborMetric, neighborMetrics()})
    		task.Return(nil)
    	}, workerpool.WorkerCount(wsSendWorkerCount), workerpool.QueueSize(wsSendWorkerQueueSize))
    
    	configureLiveFeed()
    }
    
    func run(plugin *node.Plugin) {
    	notifyStatus := events.NewClosure(func(tps uint64) {
    		wsSendWorkerPool.TrySubmit(tps)
    	})
    
    	daemon.BackgroundWorker("SPA[WSSend]", func(shutdownSignal <-chan struct{}) {
    		metrics.Events.ReceivedTPSUpdated.Attach(notifyStatus)
    		wsSendWorkerPool.Start()
    		<-shutdownSignal
    		log.Info("Stopping SPA[WSSend] ...")
    		metrics.Events.ReceivedTPSUpdated.Detach(notifyStatus)
    		wsSendWorkerPool.Stop()
    		log.Info("Stopping SPA[WSSend] ... done")
    	}, shutdown.ShutdownPrioritySPA)
    
    	runLiveFeed()
    
    	// allow any origin for websocket connections
    	upgrader.CheckOrigin = func(r *http.Request) bool {
    		return true
    	}
    
    	e := echo.New()
    	e.HideBanner = true
    	e.Use(middleware.Recover())
    
    	if config.Node.GetBool(CFG_BASIC_AUTH_ENABLED) {
    		e.Use(middleware.BasicAuth(func(username, password string, c echo.Context) (bool, error) {
    			if username == config.Node.GetString(CFG_BASIC_AUTH_USERNAME) &&
    				password == config.Node.GetString(CFG_BASIC_AUTH_PASSWORD) {
    				return true, nil
    			}
    			return false, nil
    		}))
    	}
    
    	setupRoutes(e)
    	addr := config.Node.GetString(CFG_BIND_ADDRESS)
    
    	log.Infof("You can now access the dashboard using: http://%s", addr)
    	go e.Start(addr)
    }
    
    // sends the given message to all connected websocket clients
    func sendToAllWSClient(msg interface{}) {
    	clientsMu.Lock()
    	defer clientsMu.Unlock()
    	for _, channel := range clients {
    		select {
    		case channel <- msg:
    		default:
    			// drop if buffer not drained
    		}
    	}
    }
    
    var webSocketWriteTimeout = time.Duration(3) * time.Second
    
    var (
    	upgrader = websocket.Upgrader{
    		HandshakeTimeout:  webSocketWriteTimeout,
    		EnableCompression: true,
    	}
    )
    
    const (
    	MsgTypeNodeStatus byte = iota
    	MsgTypeTPSMetric
    	MsgTypeTx
    	MsgTypeNeighborMetric
    )
    
    type msg struct {
    	Type byte        `json:"type"`
    	Data interface{} `json:"data"`
    }
    
    type tx struct {
    	Hash  string `json:"hash"`
    	Value int64  `json:"value"`
    }
    
    type nodestatus struct {
    	ID      string      `json:"id"`
    	Version string      `json:"version"`
    	Uptime  int64       `json:"uptime"`
    	Mem     *memmetrics `json:"mem"`
    }
    
    type memmetrics struct {
    	Sys          uint64 `json:"sys"`
    	HeapSys      uint64 `json:"heap_sys"`
    	HeapInuse    uint64 `json:"heap_inuse"`
    	HeapIdle     uint64 `json:"heap_idle"`
    	HeapReleased uint64 `json:"heap_released"`
    	HeapObjects  uint64 `json:"heap_objects"`
    	MSpanInuse   uint64 `json:"m_span_inuse"`
    	MCacheInuse  uint64 `json:"m_cache_inuse"`
    	StackSys     uint64 `json:"stack_sys"`
    	NumGC        uint32 `json:"num_gc"`
    	LastPauseGC  uint64 `json:"last_pause_gc"`
    }
    
    type neighbormetric struct {
    	ID               string `json:"id"`
    	Address          string `json:"address"`
    	ConnectionOrigin string `json:"connection_origin"`
    	BytesRead        uint32 `json:"bytes_read"`
    	BytesWritten     uint32 `json:"bytes_written"`
    }
    
    func neighborMetrics() []neighbormetric {
    	stats := []neighbormetric{}
    
    	// gossip plugin might be disabled
    	neighbors := gossip.GetAllNeighbors()
    	if neighbors == nil {
    		return stats
    	}
    
    	for _, neighbor := range neighbors {
    		// unfortunately the neighbor manager doesn't keep track of the origin of the connection
    		origin := "Inbound"
    		for _, peer := range autopeering.Selection.GetOutgoingNeighbors() {
    			if neighbor.Peer == peer {
    				origin = "Outbound"
    				break
    			}
    		}
    
    		host := neighbor.Peer.IP().String()
    		port := neighbor.Peer.Services().Get(service.GossipKey).Port()
    		stats = append(stats, neighbormetric{
    			ID:               neighbor.Peer.ID().String(),
    			Address:          net.JoinHostPort(host, strconv.Itoa(port)),
    			BytesRead:        neighbor.BytesRead(),
    			BytesWritten:     neighbor.BytesWritten(),
    			ConnectionOrigin: origin,
    		})
    	}
    	return stats
    }
    
    func currentNodeStatus() *nodestatus {
    	var m runtime.MemStats
    	runtime.ReadMemStats(&m)
    	status := &nodestatus{}
    	status.ID = local.GetInstance().ID().String()
    
    	// node status
    	status.Version = banner.AppVersion
    	status.Uptime = time.Since(nodeStartAt).Milliseconds()
    
    	// memory metrics
    	status.Mem = &memmetrics{
    		Sys:          m.Sys,
    		HeapSys:      m.HeapSys,
    		HeapInuse:    m.HeapInuse,
    		HeapIdle:     m.HeapIdle,
    		HeapReleased: m.HeapReleased,
    		HeapObjects:  m.HeapObjects,
    		MSpanInuse:   m.MSpanInuse,
    		MCacheInuse:  m.MCacheInuse,
    		StackSys:     m.StackSys,
    		NumGC:        m.NumGC,
    		LastPauseGC:  m.PauseNs[(m.NumGC+255)%256],
    	}
    	return status
    }