Skip to content
Snippets Groups Projects
Select Git revision
  • 324cf7858dfe23970a9f4f59e882832522215b52
  • develop default protected
  • congestioncontrol
  • merge-v-data-collection-spammer-0.8.2
  • WIP-merge-v-data-collection-spammer-0.8.2
  • merge-v-data-collection-spammer-0.7.7
  • tmp
  • test-masterpow-fixing
  • test-masterpow
  • test-echo
  • v-data-collection
  • v-data-collection-spammer
  • tmp-dump-spam-info
  • dump-msg-info-0.3.1
  • test-dump-message-info
  • spammer-exprandom
  • extra/tutorial
  • without_tipselection
  • hacking-docker-network
  • hacking-docker-network-0.2.3
  • master
  • v0.2.3
22 results

runSimulation.bat

Blame
  • ws.go NaN GiB
    package dashboard
    
    import (
    	"net/http"
    	"sync"
    	"time"
    
    	"github.com/gorilla/websocket"
    	"github.com/iotaledger/goshimmer/packages/shutdown"
    	"github.com/iotaledger/goshimmer/plugins/messagelayer"
    	"github.com/iotaledger/goshimmer/plugins/metrics"
    	"github.com/iotaledger/hive.go/daemon"
    	"github.com/iotaledger/hive.go/events"
    	"github.com/iotaledger/hive.go/workerpool"
    	"github.com/labstack/echo"
    )
    
    var (
    	// settings
    	wsSendWorkerCount     = 1
    	wsSendWorkerQueueSize = 250
    	wsSendWorkerPool      *workerpool.WorkerPool
    	webSocketWriteTimeout = time.Duration(3) * time.Second
    
    	// clients
    	wsClientsMu    sync.Mutex
    	wsClients      = make(map[uint64]*wsclient)
    	nextWsClientID uint64
    
    	// gorilla websocket layer
    	upgrader = websocket.Upgrader{
    		HandshakeTimeout:  webSocketWriteTimeout,
    		CheckOrigin:       func(r *http.Request) bool { return true },
    		EnableCompression: true,
    	}
    )
    
    // a websocket client with a channel for downstream messages.
    type wsclient struct {
    	// downstream message channel.
    	channel chan interface{}
    	// a channel which is closed when the websocket client is disconnected.
    	exit chan struct{}
    }
    
    func configureWebSocketWorkerPool() {
    	wsSendWorkerPool = workerpool.New(func(task workerpool.Task) {
    		broadcastWsMessage(&wsmsg{MsgTypeMPSMetric, task.Param(0).(uint64)})
    		broadcastWsMessage(&wsmsg{MsgTypeNodeStatus, currentNodeStatus()})
    		broadcastWsMessage(&wsmsg{MsgTypeNeighborMetric, neighborMetrics()})
    		broadcastWsMessage(&wsmsg{MsgTypeTipsMetric, messagelayer.TipSelector.TipCount()})
    		task.Return(nil)
    	}, workerpool.WorkerCount(wsSendWorkerCount), workerpool.QueueSize(wsSendWorkerQueueSize))
    }
    
    func runWebSocketStreams() {
    	updateStatus := events.NewClosure(func(mps uint64) {
    		wsSendWorkerPool.TrySubmit(mps)
    	})
    
    	daemon.BackgroundWorker("Dashboard[StatusUpdate]", func(shutdownSignal <-chan struct{}) {
    		metrics.Events.ReceivedMPSUpdated.Attach(updateStatus)
    		wsSendWorkerPool.Start()
    		<-shutdownSignal
    		log.Info("Stopping Dashboard[StatusUpdate] ...")
    		metrics.Events.ReceivedMPSUpdated.Detach(updateStatus)
    		wsSendWorkerPool.Stop()
    		log.Info("Stopping Dashboard[StatusUpdate] ... done")
    	}, shutdown.PriorityDashboard)
    }
    
    // reigsters and creates a new websocket client.
    func registerWSClient() (uint64, *wsclient) {
    	wsClientsMu.Lock()
    	defer wsClientsMu.Unlock()
    	clientID := nextWsClientID
    	wsClient := &wsclient{
    		channel: make(chan interface{}, 500),
    		exit:    make(chan struct{}),
    	}
    	wsClients[clientID] = wsClient
    	nextWsClientID++
    	return clientID, wsClient
    }
    
    // removes the websocket client with the given id.
    func removeWsClient(clientID uint64) {
    	wsClientsMu.Lock()
    	defer wsClientsMu.Unlock()
    	wsClient := wsClients[clientID]
    	close(wsClient.exit)
    	close(wsClient.channel)
    	delete(wsClients, clientID)
    }
    
    // broadcasts the given message to all connected websocket clients.
    func broadcastWsMessage(msg interface{}, dontDrop ...bool) {
    	wsClientsMu.Lock()
    	defer wsClientsMu.Unlock()
    	for _, wsClient := range wsClients {
    		if len(dontDrop) > 0 {
    			select {
    			case wsClient.channel <- msg:
    			case <-wsClient.exit:
    				// get unblocked if the websocket connection just got closed
    			}
    			continue
    		}
    		select {
    		case wsClient.channel <- msg:
    		default:
    			// potentially drop if slow consumer
    		}
    	}
    }
    
    func websocketRoute(c echo.Context) error {
    	defer func() {
    		if r := recover(); r != nil {
    			log.Errorf("recovered from websocket handle func: %s", r)
    		}
    	}()
    
    	// upgrade to websocket connection
    	ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
    	if err != nil {
    		return err
    	}
    	defer ws.Close()
    	ws.EnableWriteCompression(true)
    
    	// cleanup client websocket
    	clientID, wsClient := registerWSClient()
    	defer removeWsClient(clientID)
    
    	for {
    		msg := <-wsClient.channel
    		if err := ws.WriteJSON(msg); err != nil {
    			break
    		}
    		if err := ws.SetWriteDeadline(time.Now().Add(webSocketWriteTimeout)); err != nil {
    			break
    		}
    	}
    	return nil
    }