Skip to content
Snippets Groups Projects
  • Hans Moog's avatar
    b9404478
    Feat: started reworking output model (#311) · b9404478
    Hans Moog authored
    * Feat: started reworking output model
    
    * Refactor: refactored some of the model
    
    * Refactor: started to refactor some additional models
    
    * Refactor: started to refactor message layer
    
    * Refactor: still refactoring :/
    
    * Refactor: refactored some more
    
    * Refactor: some error messages are gone YAY
    
    * Refactor: refactor complete
    Feat: started reworking output model (#311)
    Hans Moog authored
    * Feat: started reworking output model
    
    * Refactor: refactored some of the model
    
    * Refactor: started to refactor some additional models
    
    * Refactor: started to refactor message layer
    
    * Refactor: still refactoring :/
    
    * Refactor: refactored some more
    
    * Refactor: some error messages are gone YAY
    
    * Refactor: refactor complete
plugin.go 3.89 KiB
package graph

import (
	"errors"
	"net/http"
	"time"

	"github.com/iotaledger/goshimmer/plugins/config"
	"github.com/iotaledger/goshimmer/plugins/messagelayer"

	"golang.org/x/net/context"

	"github.com/iotaledger/goshimmer/packages/model/value_transaction"
	"github.com/iotaledger/goshimmer/packages/shutdown"

	engineio "github.com/googollee/go-engine.io"
	"github.com/googollee/go-engine.io/transport"
	"github.com/googollee/go-engine.io/transport/polling"
	"github.com/googollee/go-engine.io/transport/websocket"
	socketio "github.com/googollee/go-socket.io"

	"github.com/iotaledger/hive.go/daemon"
	"github.com/iotaledger/hive.go/events"
	"github.com/iotaledger/hive.go/logger"
	"github.com/iotaledger/hive.go/node"
	"github.com/iotaledger/hive.go/workerpool"
)

var (
	PLUGIN = node.NewPlugin("Graph", node.Disabled, configure, run)

	log *logger.Logger

	newTxWorkerCount     = 1
	newTxWorkerQueueSize = 10000
	newTxWorkerPool      *workerpool.WorkerPool

	server         *http.Server
	router         *http.ServeMux
	socketioServer *socketio.Server
)

func downloadSocketIOHandler(w http.ResponseWriter, r *http.Request) {
	http.ServeFile(w, r, config.Node.GetString(CFG_SOCKET_IO))
}

func configureSocketIOServer() error {
	var err error

	socketioServer, err = socketio.NewServer(&engineio.Options{
		PingTimeout:  time.Second * 20,
		PingInterval: time.Second * 5,
		Transports: []transport.Transport{
			polling.Default,
			websocket.Default,
		},
	})
	if err != nil {
		return err
	}

	socketioServer.OnConnect("/", onConnectHandler)
	socketioServer.OnError("/", onErrorHandler)
	socketioServer.OnDisconnect("/", onDisconnectHandler)

	return nil
}

func configure(plugin *node.Plugin) {
	log = logger.NewLogger("Graph")
	initRingBuffers()

	router = http.NewServeMux()

	// socket.io and web server
	server = &http.Server{
		Addr:    config.Node.GetString(CFG_BIND_ADDRESS),
		Handler: router,
	}

	fs := http.FileServer(http.Dir(config.Node.GetString(CFG_WEBROOT)))

	if err := configureSocketIOServer(); err != nil {
		log.Panicf("Graph: %v", err.Error())
	}

	router.Handle("/", fs)
	router.HandleFunc("/socket.io/socket.io.js", downloadSocketIOHandler)
	router.Handle("/socket.io/", socketioServer)

	newTxWorkerPool = workerpool.New(func(task workerpool.Task) {
		onNewTx(task.Param(0).(*value_transaction.ValueTransaction))
		task.Return(nil)
	}, workerpool.WorkerCount(newTxWorkerCount), workerpool.QueueSize(newTxWorkerQueueSize))
}

func run(*node.Plugin) {

	notifyNewTx := events.NewClosure(func(transaction *value_transaction.ValueTransaction) {
		newTxWorkerPool.TrySubmit(transaction)
	})

	daemon.BackgroundWorker("Graph[NewTxWorker]", func(shutdownSignal <-chan struct{}) {
		log.Info("Starting Graph[NewTxWorker] ... done")
		messagelayer.Tangle.Events.TransactionAttached.Attach(notifyNewTx)
		newTxWorkerPool.Start()
		<-shutdownSignal
		messagelayer.Tangle.Events.TransactionAttached.Detach(notifyNewTx)
		newTxWorkerPool.Stop()
		log.Info("Stopping Graph[NewTxWorker] ... done")
	}, shutdown.ShutdownPriorityGraph)

	daemon.BackgroundWorker("Graph Webserver", func(shutdownSignal <-chan struct{}) {
		go socketioServer.Serve()

		stopped := make(chan struct{})
		go func() {
			log.Infof("You can now access IOTA Tangle Visualiser using: http://%s", config.Node.GetString(CFG_BIND_ADDRESS))
			if err := server.ListenAndServe(); err != nil {
				if !errors.Is(err, http.ErrServerClosed) {
					log.Errorf("Error serving: %s", err)
				}
			}
			close(stopped)
		}()

		select {
		case <-shutdownSignal:
		case <-stopped:
		}

		log.Info("Stopping Graph Webserver ...")
		ctx, cancel := context.WithTimeout(context.Background(), time.Second)
		defer cancel()

		if err := server.Shutdown(ctx); err != nil {
			log.Errorf("Error stopping: %s", err)
		}

		if err := socketioServer.Close(); err != nil {
			log.Errorf("Error closing Socket.IO server: %s", err)
		}
		log.Info("Stopping Graph Webserver ... done")
	}, shutdown.ShutdownPriorityGraph)
}