Skip to content
Snippets Groups Projects
Unverified Commit 56213be0 authored by Hans Moog's avatar Hans Moog Committed by GitHub
Browse files

Fixes analysis server event replay issues (#234)

* Fix: fixed long locks when replaying events

* Feat: fixed locks in replay method

* Refactor: undone accidental rename

* Feat: WebSocketChannel can be shutdown now

* Fix: fixed return variables overriding access to global vars
parent 148a7d23
No related branches found
No related tags found
No related merge requests found
package httpserver
import (
"fmt"
"sync"
"github.com/iotaledger/goshimmer/plugins/analysis/server"
"github.com/iotaledger/goshimmer/plugins/analysis/webinterface/recordedevents"
......@@ -11,47 +11,87 @@ import (
)
func dataStream(ws *websocket.Conn) {
func() {
eventHandlers := &types.EventHandlers{
AddNode: func(nodeId string) { fmt.Fprint(ws, "A"+nodeId) },
RemoveNode: func(nodeId string) { fmt.Fprint(ws, "a"+nodeId) },
ConnectNodes: func(sourceId string, targetId string) { fmt.Fprint(ws, "C"+sourceId+targetId) },
DisconnectNodes: func(sourceId string, targetId string) { fmt.Fprint(ws, "c"+sourceId+targetId) },
NodeOnline: func(nodeId string) { fmt.Fprint(ws, "O"+nodeId) },
NodeOffline: func(nodeId string) { fmt.Fprint(ws, "o"+nodeId) },
// create a wrapper for the websocket
wsChan := NewWebSocketChannel(ws)
defer wsChan.Close()
// variables and factory methods for the async calls after the initial replay
var replayMutex sync.RWMutex
createAsyncNodeCallback := func(wsChan *WebSocketChannel, messagePrefix string) func(string) {
return func(nodeId string) {
go func() {
replayMutex.RLock()
defer replayMutex.RUnlock()
wsChan.TryWrite(messagePrefix + nodeId)
}()
}
}
createAsyncLinkCallback := func(wsChan *WebSocketChannel, messagePrefix string) func(string, string) {
return func(sourceId string, targetId string) {
go func() {
replayMutex.RLock()
defer replayMutex.RUnlock()
addNodeClosure := events.NewClosure(eventHandlers.AddNode)
removeNodeClosure := events.NewClosure(eventHandlers.RemoveNode)
connectNodesClosure := events.NewClosure(eventHandlers.ConnectNodes)
disconnectNodesClosure := events.NewClosure(eventHandlers.DisconnectNodes)
nodeOnlineClosure := events.NewClosure(eventHandlers.NodeOnline)
nodeOfflineClosure := events.NewClosure(eventHandlers.NodeOffline)
server.Events.AddNode.Attach(addNodeClosure)
server.Events.RemoveNode.Attach(removeNodeClosure)
server.Events.ConnectNodes.Attach(connectNodesClosure)
server.Events.DisconnectNodes.Attach(disconnectNodesClosure)
server.Events.NodeOnline.Attach(nodeOnlineClosure)
server.Events.NodeOffline.Attach(nodeOfflineClosure)
go recordedevents.Replay(eventHandlers)
buf := make([]byte, 1)
readFromWebsocket:
for {
if _, err := ws.Read(buf); err != nil {
break readFromWebsocket
}
fmt.Fprint(ws, "_")
wsChan.TryWrite(messagePrefix + sourceId + targetId)
}()
}
}
// wait with firing the callbacks until the replay is complete
replayMutex.Lock()
// create and register the dynamic callbacks
addNodeClosure := events.NewClosure(createAsyncNodeCallback(wsChan, "A"))
removeNodeClosure := events.NewClosure(createAsyncNodeCallback(wsChan, "a"))
connectNodesClosure := events.NewClosure(createAsyncLinkCallback(wsChan, "C"))
disconnectNodesClosure := events.NewClosure(createAsyncLinkCallback(wsChan, "c"))
nodeOnlineClosure := events.NewClosure(createAsyncNodeCallback(wsChan, "O"))
nodeOfflineClosure := events.NewClosure(createAsyncNodeCallback(wsChan, "o"))
server.Events.AddNode.Attach(addNodeClosure)
server.Events.RemoveNode.Attach(removeNodeClosure)
server.Events.ConnectNodes.Attach(connectNodesClosure)
server.Events.DisconnectNodes.Attach(disconnectNodesClosure)
server.Events.NodeOnline.Attach(nodeOnlineClosure)
server.Events.NodeOffline.Attach(nodeOfflineClosure)
// replay old events
recordedevents.Replay(createEventHandlers(wsChan, createSyncNodeCallback, createSyncLinkCallback))
// mark replay as complete
replayMutex.Unlock()
// wait until the connection breaks and keep it alive
wsChan.KeepAlive()
// unregister the callbacks
server.Events.AddNode.Detach(addNodeClosure)
server.Events.RemoveNode.Detach(removeNodeClosure)
server.Events.ConnectNodes.Detach(connectNodesClosure)
server.Events.DisconnectNodes.Detach(disconnectNodesClosure)
server.Events.NodeOnline.Detach(nodeOnlineClosure)
server.Events.NodeOffline.Detach(nodeOfflineClosure)
}
func createEventHandlers(wsChan *WebSocketChannel, nodeCallbackFactory func(*WebSocketChannel, string) func(string), linkCallbackFactory func(*WebSocketChannel, string) func(string, string)) *types.EventHandlers {
return &types.EventHandlers{
AddNode: nodeCallbackFactory(wsChan, "A"),
RemoveNode: nodeCallbackFactory(wsChan, "a"),
ConnectNodes: linkCallbackFactory(wsChan, "C"),
DisconnectNodes: linkCallbackFactory(wsChan, "c"),
NodeOnline: nodeCallbackFactory(wsChan, "O"),
NodeOffline: nodeCallbackFactory(wsChan, "o"),
}
}
func createSyncNodeCallback(wsChan *WebSocketChannel, messagePrefix string) func(nodeId string) {
return func(nodeId string) {
wsChan.Write(messagePrefix + nodeId)
}
}
server.Events.AddNode.Detach(addNodeClosure)
server.Events.RemoveNode.Detach(removeNodeClosure)
server.Events.ConnectNodes.Detach(connectNodesClosure)
server.Events.DisconnectNodes.Detach(disconnectNodesClosure)
server.Events.NodeOnline.Detach(nodeOnlineClosure)
server.Events.NodeOffline.Detach(nodeOfflineClosure)
}()
func createSyncLinkCallback(wsChan *WebSocketChannel, messagePrefix string) func(sourceId string, targetId string) {
return func(sourceId string, targetId string) {
wsChan.Write(messagePrefix + sourceId + targetId)
}
}
package httpserver
import (
"fmt"
"golang.org/x/net/websocket"
)
type WebSocketChannel struct {
ws *websocket.Conn
send chan string
}
func NewWebSocketChannel(ws *websocket.Conn) *WebSocketChannel {
wsChan := &WebSocketChannel{
ws: ws,
send: make(chan string, 1024),
}
go wsChan.writer()
return wsChan
}
func (c *WebSocketChannel) Write(update string) {
c.send <- update
}
func (c *WebSocketChannel) TryWrite(update string) {
select {
case c.send <- update:
default:
}
}
func (c *WebSocketChannel) KeepAlive() {
buf := make([]byte, 1)
for {
if _, err := c.ws.Read(buf); err != nil {
break
}
_, _ = fmt.Fprint(c.ws, "_")
}
}
func (c *WebSocketChannel) Close() {
close(c.send)
_ = c.ws.Close()
}
func (c *WebSocketChannel) writer() {
for pkt := range c.send {
if _, err := fmt.Fprint(c.ws, pkt); err != nil {
break
}
}
}
......@@ -31,7 +31,6 @@ func Configure(plugin *node.Plugin) {
defer lock.Unlock()
delete(nodes, nodeId)
//nodes[nodeId] = false
}))
server.Events.NodeOnline.Attach(events.NewClosure(func(nodeId string) {
......@@ -76,11 +75,30 @@ func Configure(plugin *node.Plugin) {
}))
}
func Replay(handlers *types.EventHandlers) {
func getEventsToReplay() (map[string]bool, map[string]map[string]bool) {
lock.Lock()
defer lock.Unlock()
copiedNodes := make(map[string]bool)
for nodeId, online := range nodes {
copiedNodes[nodeId] = online
}
copiedLinks := make(map[string]map[string]bool)
for sourceId, targetMap := range links {
copiedLinks[sourceId] = make(map[string]bool)
for targetId := range targetMap {
copiedLinks[sourceId][targetId] = true
}
}
return copiedNodes, copiedLinks
}
func Replay(handlers *types.EventHandlers) {
copiedNodes, copiedLinks := getEventsToReplay()
for nodeId, online := range copiedNodes {
handlers.AddNode(nodeId)
if online {
handlers.NodeOnline(nodeId)
......@@ -89,7 +107,7 @@ func Replay(handlers *types.EventHandlers) {
}
}
for sourceId, targetMap := range links {
for sourceId, targetMap := range copiedLinks {
for targetId := range targetMap {
handlers.ConnectNodes(sourceId, targetId)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment