diff --git a/plugins/analysis/webinterface/httpserver/data_stream.go b/plugins/analysis/webinterface/httpserver/data_stream.go index a2d54db771cab43a5c012516942070bf1db3d29e..c096ffeb631cd9bd7b42808c3b7f9d943d80a217 100644 --- a/plugins/analysis/webinterface/httpserver/data_stream.go +++ b/plugins/analysis/webinterface/httpserver/data_stream.go @@ -1,7 +1,7 @@ package httpserver import ( - "fmt" + "sync" "github.com/iotaledger/goshimmer/plugins/analysis/server" "github.com/iotaledger/goshimmer/plugins/analysis/webinterface/recordedevents" @@ -11,47 +11,87 @@ import ( ) func dataStream(ws *websocket.Conn) { - func() { - eventHandlers := &types.EventHandlers{ - AddNode: func(nodeId string) { fmt.Fprint(ws, "A"+nodeId) }, - RemoveNode: func(nodeId string) { fmt.Fprint(ws, "a"+nodeId) }, - ConnectNodes: func(sourceId string, targetId string) { fmt.Fprint(ws, "C"+sourceId+targetId) }, - DisconnectNodes: func(sourceId string, targetId string) { fmt.Fprint(ws, "c"+sourceId+targetId) }, - NodeOnline: func(nodeId string) { fmt.Fprint(ws, "O"+nodeId) }, - NodeOffline: func(nodeId string) { fmt.Fprint(ws, "o"+nodeId) }, + // create a wrapper for the websocket + wsChan := NewWebSocketChannel(ws) + defer wsChan.Close() + + // variables and factory methods for the async calls after the initial replay + var replayMutex sync.RWMutex + createAsyncNodeCallback := func(wsChan *WebSocketChannel, messagePrefix string) func(string) { + return func(nodeId string) { + go func() { + replayMutex.RLock() + defer replayMutex.RUnlock() + + wsChan.TryWrite(messagePrefix + nodeId) + }() } + } + createAsyncLinkCallback := func(wsChan *WebSocketChannel, messagePrefix string) func(string, string) { + return func(sourceId string, targetId string) { + go func() { + replayMutex.RLock() + defer replayMutex.RUnlock() - addNodeClosure := events.NewClosure(eventHandlers.AddNode) - removeNodeClosure := events.NewClosure(eventHandlers.RemoveNode) - connectNodesClosure := events.NewClosure(eventHandlers.ConnectNodes) - disconnectNodesClosure := events.NewClosure(eventHandlers.DisconnectNodes) - nodeOnlineClosure := events.NewClosure(eventHandlers.NodeOnline) - nodeOfflineClosure := events.NewClosure(eventHandlers.NodeOffline) - - server.Events.AddNode.Attach(addNodeClosure) - server.Events.RemoveNode.Attach(removeNodeClosure) - server.Events.ConnectNodes.Attach(connectNodesClosure) - server.Events.DisconnectNodes.Attach(disconnectNodesClosure) - server.Events.NodeOnline.Attach(nodeOnlineClosure) - server.Events.NodeOffline.Attach(nodeOfflineClosure) - - go recordedevents.Replay(eventHandlers) - - buf := make([]byte, 1) - readFromWebsocket: - for { - if _, err := ws.Read(buf); err != nil { - break readFromWebsocket - } - - fmt.Fprint(ws, "_") + wsChan.TryWrite(messagePrefix + sourceId + targetId) + }() } + } + + // wait with firing the callbacks until the replay is complete + replayMutex.Lock() + + // create and register the dynamic callbacks + addNodeClosure := events.NewClosure(createAsyncNodeCallback(wsChan, "A")) + removeNodeClosure := events.NewClosure(createAsyncNodeCallback(wsChan, "a")) + connectNodesClosure := events.NewClosure(createAsyncLinkCallback(wsChan, "C")) + disconnectNodesClosure := events.NewClosure(createAsyncLinkCallback(wsChan, "c")) + nodeOnlineClosure := events.NewClosure(createAsyncNodeCallback(wsChan, "O")) + nodeOfflineClosure := events.NewClosure(createAsyncNodeCallback(wsChan, "o")) + server.Events.AddNode.Attach(addNodeClosure) + server.Events.RemoveNode.Attach(removeNodeClosure) + server.Events.ConnectNodes.Attach(connectNodesClosure) + server.Events.DisconnectNodes.Attach(disconnectNodesClosure) + server.Events.NodeOnline.Attach(nodeOnlineClosure) + server.Events.NodeOffline.Attach(nodeOfflineClosure) + + // replay old events + recordedevents.Replay(createEventHandlers(wsChan, createSyncNodeCallback, createSyncLinkCallback)) + + // mark replay as complete + replayMutex.Unlock() + + // wait until the connection breaks and keep it alive + wsChan.KeepAlive() + + // unregister the callbacks + server.Events.AddNode.Detach(addNodeClosure) + server.Events.RemoveNode.Detach(removeNodeClosure) + server.Events.ConnectNodes.Detach(connectNodesClosure) + server.Events.DisconnectNodes.Detach(disconnectNodesClosure) + server.Events.NodeOnline.Detach(nodeOnlineClosure) + server.Events.NodeOffline.Detach(nodeOfflineClosure) +} + +func createEventHandlers(wsChan *WebSocketChannel, nodeCallbackFactory func(*WebSocketChannel, string) func(string), linkCallbackFactory func(*WebSocketChannel, string) func(string, string)) *types.EventHandlers { + return &types.EventHandlers{ + AddNode: nodeCallbackFactory(wsChan, "A"), + RemoveNode: nodeCallbackFactory(wsChan, "a"), + ConnectNodes: linkCallbackFactory(wsChan, "C"), + DisconnectNodes: linkCallbackFactory(wsChan, "c"), + NodeOnline: nodeCallbackFactory(wsChan, "O"), + NodeOffline: nodeCallbackFactory(wsChan, "o"), + } +} + +func createSyncNodeCallback(wsChan *WebSocketChannel, messagePrefix string) func(nodeId string) { + return func(nodeId string) { + wsChan.Write(messagePrefix + nodeId) + } +} - server.Events.AddNode.Detach(addNodeClosure) - server.Events.RemoveNode.Detach(removeNodeClosure) - server.Events.ConnectNodes.Detach(connectNodesClosure) - server.Events.DisconnectNodes.Detach(disconnectNodesClosure) - server.Events.NodeOnline.Detach(nodeOnlineClosure) - server.Events.NodeOffline.Detach(nodeOfflineClosure) - }() +func createSyncLinkCallback(wsChan *WebSocketChannel, messagePrefix string) func(sourceId string, targetId string) { + return func(sourceId string, targetId string) { + wsChan.Write(messagePrefix + sourceId + targetId) + } } diff --git a/plugins/analysis/webinterface/httpserver/websocket_channel.go b/plugins/analysis/webinterface/httpserver/websocket_channel.go new file mode 100644 index 0000000000000000000000000000000000000000..0b75fefe284f958ddbdf54be568035b5a1483ac2 --- /dev/null +++ b/plugins/analysis/webinterface/httpserver/websocket_channel.go @@ -0,0 +1,58 @@ +package httpserver + +import ( + "fmt" + + "golang.org/x/net/websocket" +) + +type WebSocketChannel struct { + ws *websocket.Conn + send chan string +} + +func NewWebSocketChannel(ws *websocket.Conn) *WebSocketChannel { + wsChan := &WebSocketChannel{ + ws: ws, + send: make(chan string, 1024), + } + + go wsChan.writer() + + return wsChan +} + +func (c *WebSocketChannel) Write(update string) { + c.send <- update +} + +func (c *WebSocketChannel) TryWrite(update string) { + select { + case c.send <- update: + default: + } +} + +func (c *WebSocketChannel) KeepAlive() { + buf := make([]byte, 1) + for { + if _, err := c.ws.Read(buf); err != nil { + break + } + + _, _ = fmt.Fprint(c.ws, "_") + } +} + +func (c *WebSocketChannel) Close() { + close(c.send) + _ = c.ws.Close() +} + +func (c *WebSocketChannel) writer() { + for pkt := range c.send { + if _, err := fmt.Fprint(c.ws, pkt); err != nil { + break + } + } +} diff --git a/plugins/analysis/webinterface/recordedevents/recorded_events.go b/plugins/analysis/webinterface/recordedevents/recorded_events.go index 31dba5c72fe285221e16302f30274ff9d090ab74..2e2a9a724b3524f9ce017e6a0fa5e49e40d899ae 100644 --- a/plugins/analysis/webinterface/recordedevents/recorded_events.go +++ b/plugins/analysis/webinterface/recordedevents/recorded_events.go @@ -31,7 +31,6 @@ func Configure(plugin *node.Plugin) { defer lock.Unlock() delete(nodes, nodeId) - //nodes[nodeId] = false })) server.Events.NodeOnline.Attach(events.NewClosure(func(nodeId string) { @@ -76,11 +75,30 @@ func Configure(plugin *node.Plugin) { })) } -func Replay(handlers *types.EventHandlers) { +func getEventsToReplay() (map[string]bool, map[string]map[string]bool) { lock.Lock() defer lock.Unlock() + copiedNodes := make(map[string]bool) for nodeId, online := range nodes { + copiedNodes[nodeId] = online + } + + copiedLinks := make(map[string]map[string]bool) + for sourceId, targetMap := range links { + copiedLinks[sourceId] = make(map[string]bool) + for targetId := range targetMap { + copiedLinks[sourceId][targetId] = true + } + } + + return copiedNodes, copiedLinks +} + +func Replay(handlers *types.EventHandlers) { + copiedNodes, copiedLinks := getEventsToReplay() + + for nodeId, online := range copiedNodes { handlers.AddNode(nodeId) if online { handlers.NodeOnline(nodeId) @@ -89,7 +107,7 @@ func Replay(handlers *types.EventHandlers) { } } - for sourceId, targetMap := range links { + for sourceId, targetMap := range copiedLinks { for targetId := range targetMap { handlers.ConnectNodes(sourceId, targetId) }