Skip to content
Snippets Groups Projects
Unverified Commit 257a37c3 authored by Luca Moser's avatar Luca Moser Committed by GitHub
Browse files

Adds graph visualizer to dashboard (#365)

* visualizer-skeleton

* adds limit, metrics, pause/resume, approvers/approvees coloring

* prevent ws msg drop on certain msg types

* build n' pack

* adds search, background data collection stop button

* :bone::bone::bone::bone::bone::bone::bone::bone::bone::bone:

* cleanup

* defer detach visualizer events
parent 660a3c8a
No related branches found
No related tags found
No related merge requests found
Showing
with 1066 additions and 134 deletions
...@@ -16,11 +16,19 @@ var drngLiveFeedWorkerCount = 1 ...@@ -16,11 +16,19 @@ var drngLiveFeedWorkerCount = 1
var drngLiveFeedWorkerQueueSize = 50 var drngLiveFeedWorkerQueueSize = 50
var drngLiveFeedWorkerPool *workerpool.WorkerPool var drngLiveFeedWorkerPool *workerpool.WorkerPool
type drngMsg struct {
Instance uint32 `json:"instance"`
DistributedPK string `json:"dpk"`
Round uint64 `json:"round"`
Randomness string `json:"randomness"`
Timestamp string `json:"timestamp"`
}
func configureDrngLiveFeed() { func configureDrngLiveFeed() {
drngLiveFeedWorkerPool = workerpool.New(func(task workerpool.Task) { drngLiveFeedWorkerPool = workerpool.New(func(task workerpool.Task) {
newRandomness := task.Param(0).(state.Randomness) newRandomness := task.Param(0).(state.Randomness)
sendToAllWSClient(&wsmsg{MsgTypeDrng, &drngMsg{ broadcastWsMessage(&wsmsg{MsgTypeDrng, &drngMsg{
Instance: drng.Instance().State.Committee().InstanceID, Instance: drng.Instance().State.Committee().InstanceID,
DistributedPK: hex.EncodeToString(drng.Instance().State.Committee().DistributedPK), DistributedPK: hex.EncodeToString(drng.Instance().State.Committee().DistributedPK),
Round: newRandomness.Round, Round: newRandomness.Round,
......
...@@ -77,6 +77,7 @@ ...@@ -77,6 +77,7 @@
"react-dom": "^16.7.0", "react-dom": "^16.7.0",
"react-router": "^4.3.1", "react-router": "^4.3.1",
"react-router-bootstrap": "^0.25.0", "react-router-bootstrap": "^0.25.0",
"react-router-dom": "^5.1.2" "react-router-dom": "^5.1.2",
"vivagraphjs": "^0.12.0"
} }
} }
...@@ -15,6 +15,7 @@ import {ExplorerMessageQueryResult} from "app/components/ExplorerMessageQueryRes ...@@ -15,6 +15,7 @@ import {ExplorerMessageQueryResult} from "app/components/ExplorerMessageQueryRes
import {ExplorerAddressQueryResult} from "app/components/ExplorerAddressResult"; import {ExplorerAddressQueryResult} from "app/components/ExplorerAddressResult";
import {Explorer404} from "app/components/Explorer404"; import {Explorer404} from "app/components/Explorer404";
import {Neighbors} from "app/components/Neighbors"; import {Neighbors} from "app/components/Neighbors";
import {Visualizer} from "app/components/Visualizer";
interface Props { interface Props {
history: any; history: any;
...@@ -51,7 +52,12 @@ export class Root extends React.Component<Props, any> { ...@@ -51,7 +52,12 @@ export class Root extends React.Component<Props, any> {
</LinkContainer> </LinkContainer>
<LinkContainer to="/explorer"> <LinkContainer to="/explorer">
<Nav.Link> <Nav.Link>
Tangle Explorer Explorer
</Nav.Link>
</LinkContainer>
<LinkContainer to="/visualizer">
<Nav.Link>
Visualizer
</Nav.Link> </Nav.Link>
</LinkContainer> </LinkContainer>
<LinkContainer to="/drng"> <LinkContainer to="/drng">
...@@ -76,6 +82,7 @@ export class Root extends React.Component<Props, any> { ...@@ -76,6 +82,7 @@ export class Root extends React.Component<Props, any> {
<Route exact path="/explorer/address/:id" component={ExplorerAddressQueryResult}/> <Route exact path="/explorer/address/:id" component={ExplorerAddressQueryResult}/>
<Route exact path="/explorer/404/:search" component={Explorer404}/> <Route exact path="/explorer/404/:search" component={Explorer404}/>
<Route exact path="/explorer" component={Explorer}/> <Route exact path="/explorer" component={Explorer}/>
<Route exact path="/visualizer" component={Visualizer}/>
<Route exact path="/drng" component={Drng}/> <Route exact path="/drng" component={Drng}/>
<Redirect to="/dashboard"/> <Redirect to="/dashboard"/>
</Switch> </Switch>
......
import * as React from 'react';
import {KeyboardEvent} from 'react';
import Container from "react-bootstrap/Container";
import {inject, observer} from "mobx-react";
import {Link} from 'react-router-dom';
import VisualizerStore from "app/stores/VisualizerStore";
import NodeStore from "app/stores/NodeStore";
import Badge from "react-bootstrap/Badge";
import FormControl from "react-bootstrap/FormControl";
import InputGroup from "react-bootstrap/InputGroup";
import Row from "react-bootstrap/Row";
import Col from "react-bootstrap/Col";
import Button from "react-bootstrap/Button";
import Popover from "react-bootstrap/Popover";
import OverlayTrigger from "react-bootstrap/OverlayTrigger";
interface Props {
visualizerStore?: VisualizerStore;
nodeStore?: NodeStore;
}
@inject("visualizerStore")
@inject("nodeStore")
@observer
export class Visualizer extends React.Component<Props, any> {
componentDidMount(): void {
this.props.visualizerStore.start();
}
componentWillUnmount(): void {
this.props.visualizerStore.stop();
this.props.nodeStore.registerHandlers();
}
updateVerticesLimit = (e) => {
this.props.visualizerStore.updateVerticesLimit(e.target.value);
}
pauseResumeVisualizer = (e) => {
this.props.visualizerStore.pauseResume();
}
updateSearch = (e) => {
this.props.visualizerStore.updateSearch(e.target.value);
}
searchAndHighlight = (e: KeyboardEvent) => {
if (e.key !== 'Enter') return;
this.props.visualizerStore.searchAndHighlight();
}
toggleBackgroundDataCollection = () => {
if (this.props.nodeStore.collecting) {
this.props.nodeStore.unregisterHandlers();
return;
}
this.props.nodeStore.registerHandlers();
}
render() {
let {
vertices, solid_count, selected,
selected_approvers_count, selected_approvees_count,
verticesLimit, tips_count, paused, search
} = this.props.visualizerStore;
let {last_mps_metric, collecting} = this.props.nodeStore;
return (
<Container>
<h3>Visualizer</h3>
<Row className={"mb-1"}>
<Col xs={5}>
<InputGroup className="mb-1" size="sm">
<InputGroup.Prepend>
<InputGroup.Text id="vertices-limit">Vertices Limit</InputGroup.Text>
</InputGroup.Prepend>
<FormControl
placeholder="limit"
type="number" value={verticesLimit.toString()} onChange={this.updateVerticesLimit}
aria-label="vertices-limit"
aria-describedby="vertices-limit"
/>
</InputGroup>
<InputGroup className="mb-1" size="sm">
<InputGroup.Prepend>
<InputGroup.Text id="vertices-limit">
Search Vertex
</InputGroup.Text>
</InputGroup.Prepend>
<FormControl
placeholder="search"
type="text" value={search} onChange={this.updateSearch}
aria-label="vertices-search" onKeyUp={this.searchAndHighlight}
aria-describedby="vertices-search"
/>
</InputGroup>
<InputGroup className="mb-1" size="sm">
<OverlayTrigger
trigger={['hover', 'focus']} placement="right" overlay={
<Popover id="popover-basic">
<Popover.Content>
Ensures that only data needed for the visualizer is collected.
</Popover.Content>
</Popover>}
>
<Button variant="outline-secondary" onClick={this.toggleBackgroundDataCollection}
size="sm">
{collecting ? "Stop Background Data Collection" : "Collect Background data"}
</Button>
</OverlayTrigger>
<br/>
</InputGroup>
<InputGroup className="mb-1" size="sm">
<OverlayTrigger
trigger={['hover', 'focus']} placement="right" overlay={
<Popover id="popover-basic">
<Popover.Content>
Pauses/resumes rendering the graph.
</Popover.Content>
</Popover>}
>
<Button onClick={this.pauseResumeVisualizer} size="sm" variant="outline-secondary">
{paused ? "Resume Rendering" : "Pause Rendering"}
</Button>
</OverlayTrigger>
</InputGroup>
</Col>
<Col xs={{span: 5, offset: 2}}>
<p>
<Badge pill style={{background: "#6c71c4", color: "white"}}>
Solid
</Badge>
{' '}
<Badge pill style={{background: "#2aa198", color: "white"}}>
Unsolid
</Badge>
{' '}
<Badge pill style={{background: "#cb4b16", color: "white"}}>
Tip
</Badge>
{' '}
<Badge pill style={{background: "#b58900", color: "white"}}>
Unknown
</Badge>
<br/>
Vertices: {vertices.size}, Tips: {tips_count},
Solid/Unsolid: {solid_count}/{vertices.size - solid_count},{' '}
MPS: {last_mps_metric.mps}
<br/>
Selected: {selected ?
<Link to={`/explorer/message/${selected.id}`}>
{selected.id.substr(0, 10)}
</Link>
: "-"}
<br/>
Approvers/Approvees: {selected ?
<span>{selected_approvers_count}/{selected_approvees_count}</span>
: '-/-'}
<br/>
Trunk/Branch:{' '}
{
selected && selected.trunk_id && selected.branch_id ?
<span>
<Link to={` / explorer / message /${selected.trunk_id}`}>
{selected.trunk_id.substr(0, 10)}
</Link>
/
<Link to={` / explorer / message /${selected.branch_id}`}>
{selected.branch_id.substr(0, 10)}
</Link>
</span>
: "-"}
</p>
</Col>
</Row>
<div className={"visualizer"} style={{
zIndex: -1, position: "absolute",
top: 0, left: 0,
width: "100%",
height: "100%",
background: "#ededed"
}} id={"visualizer"}/>
</Container>
);
}
}
...@@ -5,6 +5,8 @@ export enum WSMsgType { ...@@ -5,6 +5,8 @@ export enum WSMsgType {
NeighborStats, NeighborStats,
Drng, Drng,
TipsMetrics, TipsMetrics,
Vertex,
TipInfo,
} }
export interface WSMessage { export interface WSMessage {
...@@ -20,6 +22,10 @@ export function registerHandler(msgTypeID: number, handler: DataHandler) { ...@@ -20,6 +22,10 @@ export function registerHandler(msgTypeID: number, handler: DataHandler) {
handlers[msgTypeID] = handler; handlers[msgTypeID] = handler;
} }
export function unregisterHandler(msgTypeID: number) {
delete handlers[msgTypeID];
}
export function connectWebSocket(path: string, onOpen, onClose, onError) { export function connectWebSocket(path: string, onOpen, onClose, onError) {
let loc = window.location; let loc = window.location;
let uri = 'ws:'; let uri = 'ws:';
......
import {action, computed, observable, ObservableMap} from 'mobx'; import {action, computed, observable, ObservableMap} from 'mobx';
import * as dateformat from 'dateformat'; import * as dateformat from 'dateformat';
import {connectWebSocket, registerHandler, WSMsgType} from "app/misc/WS"; import {connectWebSocket, registerHandler, unregisterHandler, WSMsgType} from "app/misc/WS";
class MPSMetric { class MPSMetric {
mps: number; mps: number;
...@@ -161,12 +161,41 @@ export class NodeStore { ...@@ -161,12 +161,41 @@ export class NodeStore {
@observable neighbor_metrics = new ObservableMap<string, NeighborMetrics>(); @observable neighbor_metrics = new ObservableMap<string, NeighborMetrics>();
@observable last_tips_metric: TipsMetric = new TipsMetric(); @observable last_tips_metric: TipsMetric = new TipsMetric();
@observable collected_tips_metrics: Array<TipsMetric> = []; @observable collected_tips_metrics: Array<TipsMetric> = [];
@observable collecting: boolean = true;
constructor() { constructor() {
this.registerHandlers();
}
registerHandlers = () => {
registerHandler(WSMsgType.Status, this.updateStatus); registerHandler(WSMsgType.Status, this.updateStatus);
registerHandler(WSMsgType.MPSMetrics, this.updateLastMPSMetric); registerHandler(WSMsgType.MPSMetrics, (mps: number) => {
this.addMPSMetric(this.updateLastMPSMetric(mps));
});
registerHandler(WSMsgType.NeighborStats, this.updateNeighborMetrics); registerHandler(WSMsgType.NeighborStats, this.updateNeighborMetrics);
registerHandler(WSMsgType.TipsMetrics, this.updateLastTipsMetric); registerHandler(WSMsgType.TipsMetrics, this.updateLastTipsMetric);
this.updateCollecting(true);
}
unregisterHandlers = () => {
unregisterHandler(WSMsgType.Status);
registerHandler(WSMsgType.MPSMetrics, this.updateLastMPSMetric);
unregisterHandler(WSMsgType.NeighborStats);
unregisterHandler(WSMsgType.TipsMetrics);
this.updateCollecting(false);
}
@action
updateCollecting = (collecting: boolean) => {
this.collecting = collecting;
}
@action
reset() {
this.collected_mps_metrics = [];
this.collected_mem_metrics = [];
this.neighbor_metrics = new ObservableMap<string, NeighborMetrics>();
this.collected_tips_metrics = [];
} }
connect() { connect() {
...@@ -216,11 +245,16 @@ export class NodeStore { ...@@ -216,11 +245,16 @@ export class NodeStore {
mpsMetric.mps = mps; mpsMetric.mps = mps;
mpsMetric.ts = dateformat(Date.now(), "HH:MM:ss"); mpsMetric.ts = dateformat(Date.now(), "HH:MM:ss");
this.last_mps_metric = mpsMetric; this.last_mps_metric = mpsMetric;
return mpsMetric;
};
@action
addMPSMetric = (metric: MPSMetric) => {
if (this.collected_mps_metrics.length > maxMetricsDataPoints) { if (this.collected_mps_metrics.length > maxMetricsDataPoints) {
this.collected_mps_metrics.shift(); this.collected_mps_metrics.shift();
} }
this.collected_mps_metrics.push(mpsMetric); this.collected_mps_metrics.push(metric);
}; }
@action @action
updateLastTipsMetric = (tips: number) => { updateLastTipsMetric = (tips: number) => {
......
import {action, observable, ObservableMap} from 'mobx';
import {registerHandler, WSMsgType} from "app/misc/WS";
import {RouterStore} from "mobx-react-router";
import {default as Viva} from 'vivagraphjs';
export class Vertex {
id: string;
trunk_id: string;
branch_id: string;
is_solid: boolean;
is_tip: boolean;
}
export class TipInfo {
id: string;
is_tip: boolean;
}
const vertexSize = 20;
export class VisualizerStore {
@observable vertices = new ObservableMap<string, Vertex>();
@observable verticesLimit = 1500;
@observable solid_count = 0;
@observable tips_count = 0;
verticesIncomingOrder = [];
collect: boolean = false;
routerStore: RouterStore;
// the currently selected vertex via hover
@observable selected: Vertex;
@observable selected_approvers_count = 0;
@observable selected_approvees_count = 0;
selected_via_click: boolean = false;
selected_origin_color: number = 0;
// search
@observable search: string = "";
// viva graph objs
graph;
graphics;
renderer;
@observable paused: boolean = false;
constructor(routerStore: RouterStore) {
this.routerStore = routerStore;
registerHandler(WSMsgType.Vertex, this.addVertex);
registerHandler(WSMsgType.TipInfo, this.addTipInfo);
}
@action
updateSearch = (search: string) => {
this.search = search.trim();
}
@action
searchAndHighlight = () => {
this.clearSelected();
if (!this.search) return;
let iter: IterableIterator<string> = this.vertices.keys();
let found = null;
for (const key of iter) {
if (key.indexOf(this.search) >= 0) {
found = key;
break;
}
}
if (!found) return;
this.updateSelected(this.vertices.get(found), false);
}
@action
pauseResume = () => {
if (this.paused) {
this.renderer.resume();
this.paused = false;
return;
}
this.renderer.pause();
this.paused = true;
}
@action
updateVerticesLimit = (num: number) => {
this.verticesLimit = num;
}
@action
addVertex = (vert: Vertex) => {
if (!this.collect) return;
let existing = this.vertices.get(vert.id);
if (existing) {
// can only go from unsolid to solid
if (!existing.is_solid && vert.is_solid) {
existing.is_solid = true;
this.solid_count++;
}
// update trunk and branch ids since we might be dealing
// with a vertex obj only created from a tip info
existing.trunk_id = vert.trunk_id;
existing.branch_id = vert.branch_id;
vert = existing
} else {
if (vert.is_solid) {
this.solid_count++;
}
this.verticesIncomingOrder.push(vert.id);
this.checkLimit();
}
this.vertices.set(vert.id, vert);
this.drawVertex(vert);
};
@action
addTipInfo = (tipInfo: TipInfo) => {
if (!this.collect) return;
let vert = this.vertices.get(tipInfo.id);
if (!vert) {
// create a new empty one for now
vert = new Vertex();
vert.id = tipInfo.id;
this.verticesIncomingOrder.push(vert.id);
this.checkLimit();
}
this.tips_count += tipInfo.is_tip ? 1 : vert.is_tip ? -1 : 0;
vert.is_tip = tipInfo.is_tip;
this.vertices.set(vert.id, vert);
this.drawVertex(vert);
};
@action
checkLimit = () => {
while (this.verticesIncomingOrder.length > this.verticesLimit) {
let deleteId = this.verticesIncomingOrder.shift();
let vert = this.vertices.get(deleteId);
// make sure we remove any markings if the vertex gets deleted
if (this.selected && deleteId === this.selected.id) {
this.clearSelected();
}
this.vertices.delete(deleteId);
this.graph.removeNode(deleteId);
if (!vert) {
continue;
}
if (vert.is_solid) {
this.solid_count--;
}
if (vert.is_tip) {
this.tips_count--;
}
this.deleteApproveeLink(vert.trunk_id);
this.deleteApproveeLink(vert.branch_id);
}
}
@action
deleteApproveeLink = (approveeId: string) => {
if (!approveeId) {
return;
}
let approvee = this.vertices.get(approveeId);
if (approvee) {
if (this.selected && approveeId === this.selected.id) {
this.clearSelected();
}
if (approvee.is_solid) {
this.solid_count--;
}
if (approvee.is_tip) {
this.tips_count--;
}
this.vertices.delete(approveeId);
}
this.graph.removeNode(approveeId);
}
drawVertex = (vert: Vertex) => {
let node;
let existing = this.graph.getNode(vert.id);
if (existing) {
// update coloring
let nodeUI = this.graphics.getNodeUI(vert.id);
nodeUI.color = parseColor(this.colorForVertexState(vert));
node = existing
} else {
node = this.graph.addNode(vert.id, vert);
}
if (vert.trunk_id && (!node.links || !node.links.some(link => link.fromId === vert.trunk_id))) {
this.graph.addLink(vert.trunk_id, vert.id);
}
if (vert.trunk_id === vert.branch_id) {
return;
}
if (vert.branch_id && (!node.links || !node.links.some(link => link.fromId === vert.branch_id))) {
this.graph.addLink(vert.branch_id, vert.id);
}
}
colorForVertexState = (vert: Vertex) => {
if (!vert || (!vert.trunk_id && !vert.branch_id)) return "#b58900";
if (vert.is_tip) {
return "#cb4b16";
}
if (vert.is_solid) {
return "#6c71c4";
}
return "#2aa198";
}
start = () => {
this.collect = true;
this.graph = Viva.Graph.graph();
let graphics: any = Viva.Graph.View.webglGraphics();
const layout = Viva.Graph.Layout.forceDirected(this.graph, {
springLength: 10,
springCoeff: 0.0001,
stableThreshold: 0.15,
gravity: -2,
dragCoeff: 0.02,
timeStep: 20,
theta: 0.8,
});
graphics.node((node) => {
if (!node.data) {
return Viva.Graph.View.webglSquare(10, this.colorForVertexState(node.data));
}
return Viva.Graph.View.webglSquare(vertexSize, this.colorForVertexState(node.data));
})
graphics.link(() => Viva.Graph.View.webglLine("#586e75"));
let ele = document.getElementById('visualizer');
this.renderer = Viva.Graph.View.renderer(this.graph, {
container: ele, graphics, layout,
});
let events = Viva.Graph.webglInputEvents(graphics, this.graph);
events.mouseEnter((node) => {
this.clearSelected();
this.updateSelected(node.data);
}).mouseLeave((node) => {
this.clearSelected();
});
this.graphics = graphics;
this.renderer.run();
}
stop = () => {
this.collect = false;
this.renderer.dispose();
this.graph = null;
this.paused = false;
this.selected = null;
this.solid_count = 0;
this.tips_count = 0;
this.vertices.clear();
}
@action
updateSelected = (vert: Vertex, viaClick?: boolean) => {
if (!vert) return;
this.selected = vert;
this.selected_via_click = !!viaClick;
// mutate links
let node = this.graph.getNode(vert.id);
let nodeUI = this.graphics.getNodeUI(vert.id);
this.selected_origin_color = nodeUI.color
nodeUI.color = parseColor("#859900");
nodeUI.size = vertexSize * 1.5;
const seenForward = [];
const seenBackwards = [];
dfsIterator(this.graph,
node,
node => {
this.selected_approvers_count++;
},
true,
link => {
const linkUI = this.graphics.getLinkUI(link.id);
linkUI.color = parseColor("#d33682");
},
seenForward
);
dfsIterator(this.graph, node, node => {
this.selected_approvees_count++;
}, false, link => {
const linkUI = this.graphics.getLinkUI(link.id);
linkUI.color = parseColor("#b58900");
},
seenBackwards
);
}
resetLinks = () => {
this.graph.forEachLink(function (link) {
const linkUI = this.graphics.getLinkUI(link.id);
linkUI.color = parseColor("#586e75");
});
}
@action
clearSelected = () => {
this.selected_approvers_count = 0;
this.selected_approvees_count = 0;
if (this.selected_via_click || !this.selected) {
return;
}
// clear link highlight
let node = this.graph.getNode(this.selected.id);
if (!node) {
// clear links
this.resetLinks();
return;
}
let nodeUI = this.graphics.getNodeUI(this.selected.id);
nodeUI.color = this.selected_origin_color;
nodeUI.size = vertexSize;
const seenForward = [];
const seenBackwards = [];
dfsIterator(this.graph, node, node => {
}, true,
link => {
const linkUI = this.graphics.getLinkUI(link.id);
linkUI.color = parseColor("#586e75");
},
seenBackwards
);
dfsIterator(this.graph, node, node => {
}, false,
link => {
const linkUI = this.graphics.getLinkUI(link.id);
linkUI.color = parseColor("#586e75");
},
seenForward
);
this.selected = null;
}
}
export default VisualizerStore;
// copied over and refactored from https://github.com/glumb/IOTAtangle
function dfsIterator(graph, node, cb, up, cbLinks: any = false, seenNodes = []) {
seenNodes.push(node);
let pointer = 0;
while (seenNodes.length > pointer) {
const node = seenNodes[pointer++];
if (cb(node)) return true;
for (const link of node.links) {
if (cbLinks) cbLinks(link);
if (!up && link.toId === node.id && !seenNodes.includes(graph.getNode(link.fromId))) {
seenNodes.push(graph.getNode(link.fromId));
continue;
}
if (up && link.fromId === node.id && !seenNodes.includes(graph.getNode(link.toId))) {
seenNodes.push(graph.getNode(link.toId));
}
}
}
}
function parseColor(color): any {
let parsedColor = 0x009ee8ff;
if (typeof color === 'number') {
return color;
}
if (typeof color === 'string' && color) {
if (color.length === 4) {
// #rgb, duplicate each letter except first #.
color = color.replace(/([^#])/g, '$1$1');
}
if (color.length === 9) {
// #rrggbbaa
parsedColor = parseInt(color.substr(1), 16);
} else if (color.length === 7) {
// or #rrggbb.
parsedColor = (parseInt(color.substr(1), 16) << 8) | 0xff;
} else {
throw 'Color expected in hex format with preceding "#". E.g. #00ff00. Got value: ' + color;
}
}
return parsedColor;
}
\ No newline at end of file
...@@ -11,3 +11,9 @@ ...@@ -11,3 +11,9 @@
.hidden { .hidden {
visibility: hidden !important; visibility: hidden !important;
} }
.visualizer {
position: absolute;
height: 100%;
width: 100%;
}
\ No newline at end of file
...@@ -9,17 +9,20 @@ import {Router} from 'react-router-dom'; ...@@ -9,17 +9,20 @@ import {Router} from 'react-router-dom';
import NodeStore from "app/stores/NodeStore"; import NodeStore from "app/stores/NodeStore";
import ExplorerStore from "app/stores/ExplorerStore"; import ExplorerStore from "app/stores/ExplorerStore";
import DrngStore from "app/stores/DrngStore"; import DrngStore from "app/stores/DrngStore";
import VisualizerStore from "app/stores/VisualizerStore";
// prepare MobX stores // prepare MobX stores
const routerStore = new RouterStore(); const routerStore = new RouterStore();
const nodeStore = new NodeStore(); const nodeStore = new NodeStore();
const explorerStore = new ExplorerStore(routerStore); const explorerStore = new ExplorerStore(routerStore);
const drngStore = new DrngStore(routerStore); const drngStore = new DrngStore(routerStore);
const visualizerStore = new VisualizerStore(routerStore);
const stores = { const stores = {
"routerStore": routerStore, "routerStore": routerStore,
"nodeStore": nodeStore, "nodeStore": nodeStore,
"explorerStore": explorerStore, "explorerStore": explorerStore,
"drngStore": drngStore, "drngStore": drngStore,
"visualizerStore": visualizerStore,
}; };
const browserHistory = createBrowserHistory(); const browserHistory = createBrowserHistory();
......
...@@ -775,6 +775,11 @@ acorn@^6.2.1: ...@@ -775,6 +775,11 @@ acorn@^6.2.1:
resolved "https://registry.yarnpkg.com/acorn/-/acorn-6.4.1.tgz#531e58ba3f51b9dacb9a6646ca4debf5b14ca474" resolved "https://registry.yarnpkg.com/acorn/-/acorn-6.4.1.tgz#531e58ba3f51b9dacb9a6646ca4debf5b14ca474"
integrity sha512-ZVA9k326Nwrj3Cj9jlh3wGFutC2ZornPNARZwsNYqQYgN0EsV2d53w5RN/co65Ohn4sUAUtb1rSUAOD6XN9idA== integrity sha512-ZVA9k326Nwrj3Cj9jlh3wGFutC2ZornPNARZwsNYqQYgN0EsV2d53w5RN/co65Ohn4sUAUtb1rSUAOD6XN9idA==
add-event-listener@0.0.1:
version "0.0.1"
resolved "https://registry.yarnpkg.com/add-event-listener/-/add-event-listener-0.0.1.tgz#a76229ebc64c8aefae204a16273a2f255abea2d0"
integrity sha1-p2Ip68ZMiu+uIEoWJzovJVq+otA=
ajv-errors@^1.0.0: ajv-errors@^1.0.0:
version "1.0.1" version "1.0.1"
resolved "https://registry.yarnpkg.com/ajv-errors/-/ajv-errors-1.0.1.tgz#f35986aceb91afadec4102fbd85014950cefa64d" resolved "https://registry.yarnpkg.com/ajv-errors/-/ajv-errors-1.0.1.tgz#f35986aceb91afadec4102fbd85014950cefa64d"
...@@ -3025,6 +3030,11 @@ getpass@^0.1.1: ...@@ -3025,6 +3030,11 @@ getpass@^0.1.1:
dependencies: dependencies:
assert-plus "^1.0.0" assert-plus "^1.0.0"
gintersect@0.1.0:
version "0.1.0"
resolved "https://registry.yarnpkg.com/gintersect/-/gintersect-0.1.0.tgz#9a8cb6a80b7d6e955ac33515495b1212627b1816"
integrity sha1-moy2qAt9bpVawzUVSVsSEmJ7GBY=
github-from-package@0.0.0: github-from-package@0.0.0:
version "0.0.0" version "0.0.0"
resolved "https://registry.yarnpkg.com/github-from-package/-/github-from-package-0.0.0.tgz#97fb5d96bfde8973313f20e8288ef9a167fa64ce" resolved "https://registry.yarnpkg.com/github-from-package/-/github-from-package-0.0.0.tgz#97fb5d96bfde8973313f20e8288ef9a167fa64ce"
...@@ -4526,6 +4536,100 @@ neo-async@^2.5.0, neo-async@^2.6.1: ...@@ -4526,6 +4536,100 @@ neo-async@^2.5.0, neo-async@^2.6.1:
resolved "https://registry.yarnpkg.com/neo-async/-/neo-async-2.6.1.tgz#ac27ada66167fa8849a6addd837f6b189ad2081c" resolved "https://registry.yarnpkg.com/neo-async/-/neo-async-2.6.1.tgz#ac27ada66167fa8849a6addd837f6b189ad2081c"
integrity sha512-iyam8fBuCUpWeKPGpaNMetEocMt364qkCsfL9JuhjXX6dRnguRVOfk2GZaDpPjcOKiiXCPINZC1GczQ7iTq3Zw== integrity sha512-iyam8fBuCUpWeKPGpaNMetEocMt364qkCsfL9JuhjXX6dRnguRVOfk2GZaDpPjcOKiiXCPINZC1GczQ7iTq3Zw==
ngraph.centrality@0.3.0:
version "0.3.0"
resolved "https://registry.yarnpkg.com/ngraph.centrality/-/ngraph.centrality-0.3.0.tgz#8cc0ec0319ef0a374357fc1044c16975b179d09d"
integrity sha1-jMDsAxnvCjdDV/wQRMFpdbF50J0=
ngraph.events@0.0.3:
version "0.0.3"
resolved "https://registry.yarnpkg.com/ngraph.events/-/ngraph.events-0.0.3.tgz#38f55316f3d207ad631ff94f6622ca8f2c0e87d0"
integrity sha1-OPVTFvPSB61jH/lPZiLKjywOh9A=
ngraph.events@0.0.4:
version "0.0.4"
resolved "https://registry.yarnpkg.com/ngraph.events/-/ngraph.events-0.0.4.tgz#72cb364488dd0fd7f057458449f6a3b17a722d9a"
integrity sha1-css2RIjdD9fwV0WESfajsXpyLZo=
ngraph.expose@0.0.0:
version "0.0.0"
resolved "https://registry.yarnpkg.com/ngraph.expose/-/ngraph.expose-0.0.0.tgz#746c34903a3848c45d033b14bc64619ea85fe5aa"
integrity sha1-dGw0kDo4SMRdAzsUvGRhnqhf5ao=
ngraph.forcelayout@0.5.0:
version "0.5.0"
resolved "https://registry.yarnpkg.com/ngraph.forcelayout/-/ngraph.forcelayout-0.5.0.tgz#51511c3e1db45d3d5436da75dfb1d6af097916f5"
integrity sha1-UVEcPh20XT1UNtp137HWrwl5FvU=
dependencies:
ngraph.events "0.0.4"
ngraph.physics.simulator "^0.3.0"
ngraph.fromjson@0.1.9:
version "0.1.9"
resolved "https://registry.yarnpkg.com/ngraph.fromjson/-/ngraph.fromjson-0.1.9.tgz#66910b664c69fa3c50a1ce79dd1dfdd5bab46f6e"
integrity sha1-ZpELZkxp+jxQoc553R391bq0b24=
dependencies:
ngraph.graph "0.0.14"
ngraph.generators@0.0.19:
version "0.0.19"
resolved "https://registry.yarnpkg.com/ngraph.generators/-/ngraph.generators-0.0.19.tgz#552c0d087f942b9d0d2b0c6ca9aac436befa7659"
integrity sha1-VSwNCH+UK50NKwxsqarENr76dlk=
dependencies:
ngraph.graph "0.0.14"
ngraph.random "0.1.0"
ngraph.graph@0.0.14:
version "0.0.14"
resolved "https://registry.yarnpkg.com/ngraph.graph/-/ngraph.graph-0.0.14.tgz#d47ac94967c920aaf76952d8a4e73346e1df2db7"
integrity sha1-1HrJSWfJIKr3aVLYpOczRuHfLbc=
dependencies:
ngraph.events "0.0.3"
ngraph.merge@0.0.1:
version "0.0.1"
resolved "https://registry.yarnpkg.com/ngraph.merge/-/ngraph.merge-0.0.1.tgz#e4e80ce37581a3c96b17d545e3a43c85434b9025"
integrity sha1-5OgM43WBo8lrF9VF46Q8hUNLkCU=
ngraph.physics.primitives@0.0.7:
version "0.0.7"
resolved "https://registry.yarnpkg.com/ngraph.physics.primitives/-/ngraph.physics.primitives-0.0.7.tgz#5dc9e179ba1f92e6dec774b01cd68914120b795b"
integrity sha1-Xcnhebofkubex3SwHNaJFBILeVs=
ngraph.physics.simulator@^0.3.0:
version "0.3.0"
resolved "https://registry.yarnpkg.com/ngraph.physics.simulator/-/ngraph.physics.simulator-0.3.0.tgz#7ca6fc3e3617c73e1080572eaa8e04dbb77e0102"
integrity sha1-fKb8PjYXxz4QgFcuqo4E27d+AQI=
dependencies:
ngraph.events "0.0.3"
ngraph.expose "0.0.0"
ngraph.merge "0.0.1"
ngraph.physics.primitives "0.0.7"
ngraph.quadtreebh "0.0.4"
ngraph.random "0.0.1"
ngraph.quadtreebh@0.0.4:
version "0.0.4"
resolved "https://registry.yarnpkg.com/ngraph.quadtreebh/-/ngraph.quadtreebh-0.0.4.tgz#c700d44e6e4af07b6d05001ba3987ff5e2dcd62c"
integrity sha1-xwDUTm5K8HttBQAbo5h/9eLc1iw=
dependencies:
ngraph.random "0.0.1"
ngraph.random@0.0.1:
version "0.0.1"
resolved "https://registry.yarnpkg.com/ngraph.random/-/ngraph.random-0.0.1.tgz#c008e2ebbfdffaf17ed10e4bbc913e567166bcf8"
integrity sha1-wAji67/f+vF+0Q5LvJE+VnFmvPg=
ngraph.random@0.1.0:
version "0.1.0"
resolved "https://registry.yarnpkg.com/ngraph.random/-/ngraph.random-0.1.0.tgz#1b6e9573529e080677da6ffa098790d76a0948a9"
integrity sha1-G26Vc1KeCAZ32m/6CYeQ12oJSKk=
ngraph.tojson@0.1.4:
version "0.1.4"
resolved "https://registry.yarnpkg.com/ngraph.tojson/-/ngraph.tojson-0.1.4.tgz#39f0046588440ade622d58734d589d7974a0b3bc"
integrity sha1-OfAEZYhECt5iLVhzTVideXSgs7w=
nice-try@^1.0.4: nice-try@^1.0.4:
version "1.0.5" version "1.0.5"
resolved "https://registry.yarnpkg.com/nice-try/-/nice-try-1.0.5.tgz#a3378a7696ce7d223e88fc9b764bd7ef1089e366" resolved "https://registry.yarnpkg.com/nice-try/-/nice-try-1.0.5.tgz#a3378a7696ce7d223e88fc9b764bd7ef1089e366"
...@@ -6585,6 +6689,13 @@ simple-swizzle@^0.2.2: ...@@ -6585,6 +6689,13 @@ simple-swizzle@^0.2.2:
dependencies: dependencies:
is-arrayish "^0.3.1" is-arrayish "^0.3.1"
simplesvg@0.0.10:
version "0.0.10"
resolved "https://registry.yarnpkg.com/simplesvg/-/simplesvg-0.0.10.tgz#37d2ec18de2c154dd9b69f79e8ad20bf1e1e5fdd"
integrity sha1-N9LsGN4sFU3Ztp956K0gvx4eX90=
dependencies:
add-event-listener "0.0.1"
snapdragon-node@^2.0.1: snapdragon-node@^2.0.1:
version "2.1.1" version "2.1.1"
resolved "https://registry.yarnpkg.com/snapdragon-node/-/snapdragon-node-2.1.1.tgz#6c175f86ff14bdb0724563e8f3c1b021a286853b" resolved "https://registry.yarnpkg.com/snapdragon-node/-/snapdragon-node-2.1.1.tgz#6c175f86ff14bdb0724563e8f3c1b021a286853b"
...@@ -7485,6 +7596,23 @@ vinyl@^2.2.0: ...@@ -7485,6 +7596,23 @@ vinyl@^2.2.0:
remove-trailing-separator "^1.0.1" remove-trailing-separator "^1.0.1"
replace-ext "^1.0.0" replace-ext "^1.0.0"
vivagraphjs@^0.12.0:
version "0.12.0"
resolved "https://registry.yarnpkg.com/vivagraphjs/-/vivagraphjs-0.12.0.tgz#6fd06ef6136aaeca5cffea86d6d6f8bfaff7f52b"
integrity sha512-Air+vUHXAWj8NTWUnbU800yKC7SiHpCVwpKIPfDtr5436YoMd7cpg8blt6Fn9xarx+sz1osRxGHBHTaHvcsR6Q==
dependencies:
gintersect "0.1.0"
ngraph.centrality "0.3.0"
ngraph.events "0.0.3"
ngraph.forcelayout "0.5.0"
ngraph.fromjson "0.1.9"
ngraph.generators "0.0.19"
ngraph.graph "0.0.14"
ngraph.merge "0.0.1"
ngraph.random "0.0.1"
ngraph.tojson "0.1.4"
simplesvg "0.0.10"
vm-browserify@^1.0.1: vm-browserify@^1.0.1:
version "1.1.2" version "1.1.2"
resolved "https://registry.yarnpkg.com/vm-browserify/-/vm-browserify-1.1.2.tgz#78641c488b8e6ca91a75f511e7a3b32a86e5dda0" resolved "https://registry.yarnpkg.com/vm-browserify/-/vm-browserify-1.1.2.tgz#78641c488b8e6ca91a75f511e7a3b32a86e5dda0"
......
...@@ -19,7 +19,7 @@ var liveFeedWorkerPool *workerpool.WorkerPool ...@@ -19,7 +19,7 @@ var liveFeedWorkerPool *workerpool.WorkerPool
func configureLiveFeed() { func configureLiveFeed() {
liveFeedWorkerPool = workerpool.New(func(task workerpool.Task) { liveFeedWorkerPool = workerpool.New(func(task workerpool.Task) {
task.Param(0).(*message.CachedMessage).Consume(func(message *message.Message) { task.Param(0).(*message.CachedMessage).Consume(func(message *message.Message) {
sendToAllWSClient(&wsmsg{MsgTypeMessage, &msg{message.Id().String(), 0}}) broadcastWsMessage(&wsmsg{MsgTypeMessage, &msg{message.Id().String(), 0}})
}) })
task.Return(nil) task.Return(nil)
......
Source diff could not be displayed: it is too large. Options to address this: view the blob.
...@@ -7,10 +7,8 @@ import ( ...@@ -7,10 +7,8 @@ import (
"net/http" "net/http"
"runtime" "runtime"
"strconv" "strconv"
"sync"
"time" "time"
"github.com/gorilla/websocket"
"github.com/iotaledger/goshimmer/packages/shutdown" "github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/autopeering" "github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/autopeering/local" "github.com/iotaledger/goshimmer/plugins/autopeering/local"
...@@ -18,14 +16,12 @@ import ( ...@@ -18,14 +16,12 @@ import (
"github.com/iotaledger/goshimmer/plugins/config" "github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/goshimmer/plugins/drng" "github.com/iotaledger/goshimmer/plugins/drng"
"github.com/iotaledger/goshimmer/plugins/gossip" "github.com/iotaledger/goshimmer/plugins/gossip"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/iotaledger/goshimmer/plugins/metrics" "github.com/iotaledger/goshimmer/plugins/metrics"
"github.com/iotaledger/hive.go/autopeering/peer/service" "github.com/iotaledger/hive.go/autopeering/peer/service"
"github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger" "github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node" "github.com/iotaledger/hive.go/node"
"github.com/iotaledger/hive.go/workerpool"
"github.com/labstack/echo" "github.com/labstack/echo"
"github.com/labstack/echo/middleware" "github.com/labstack/echo/middleware"
) )
...@@ -41,30 +37,14 @@ var ( ...@@ -41,30 +37,14 @@ var (
server *echo.Echo server *echo.Echo
nodeStartAt = time.Now() nodeStartAt = time.Now()
clientsMu sync.Mutex
clients = make(map[uint64]chan interface{})
nextClientID uint64
wsSendWorkerCount = 1
wsSendWorkerQueueSize = 250
wsSendWorkerPool *workerpool.WorkerPool
) )
func configure(plugin *node.Plugin) { func configure(plugin *node.Plugin) {
log = logger.NewLogger(plugin.Name) log = logger.NewLogger(plugin.Name)
configureWebSocketWorkerPool()
wsSendWorkerPool = workerpool.New(func(task workerpool.Task) {
sendToAllWSClient(&wsmsg{MsgTypeMPSMetric, task.Param(0).(uint64)})
sendToAllWSClient(&wsmsg{MsgTypeNodeStatus, currentNodeStatus()})
sendToAllWSClient(&wsmsg{MsgTypeNeighborMetric, neighborMetrics()})
sendToAllWSClient(&wsmsg{MsgTypeTipsMetric, messagelayer.TipSelector.TipCount()})
task.Return(nil)
}, workerpool.WorkerCount(wsSendWorkerCount), workerpool.QueueSize(wsSendWorkerQueueSize))
configureLiveFeed() configureLiveFeed()
configureDrngLiveFeed() configureDrngLiveFeed()
configureVisualizer()
configureServer() configureServer()
} }
...@@ -88,8 +68,12 @@ func configureServer() { ...@@ -88,8 +68,12 @@ func configureServer() {
} }
func run(*node.Plugin) { func run(*node.Plugin) {
// rune the message live feed // run message broker
runWebSocketStreams()
// run the message live feed
runLiveFeed() runLiveFeed()
// run the visualizer vertex feed
runVisualizer()
// run dRNG live feed if dRNG plugin is enabled // run dRNG live feed if dRNG plugin is enabled
if !node.IsSkipped(drng.Plugin) { if !node.IsSkipped(drng.Plugin) {
runDrngLiveFeed() runDrngLiveFeed()
...@@ -139,29 +123,6 @@ func worker(shutdownSignal <-chan struct{}) { ...@@ -139,29 +123,6 @@ func worker(shutdownSignal <-chan struct{}) {
} }
} }
// sends the given message to all connected websocket clients
func sendToAllWSClient(msg interface{}) {
clientsMu.Lock()
defer clientsMu.Unlock()
for _, channel := range clients {
select {
case channel <- msg:
default:
// drop if buffer not drained
}
}
}
var webSocketWriteTimeout = time.Duration(3) * time.Second
var (
upgrader = websocket.Upgrader{
HandshakeTimeout: webSocketWriteTimeout,
CheckOrigin: func(r *http.Request) bool { return true },
EnableCompression: true,
}
)
const ( const (
// MsgTypeNodeStatus is the type of the NodeStatus message. // MsgTypeNodeStatus is the type of the NodeStatus message.
MsgTypeNodeStatus byte = iota MsgTypeNodeStatus byte = iota
...@@ -175,6 +136,10 @@ const ( ...@@ -175,6 +136,10 @@ const (
MsgTypeDrng MsgTypeDrng
// MsgTypeTipsMetric is the type of the TipsMetric message. // MsgTypeTipsMetric is the type of the TipsMetric message.
MsgTypeTipsMetric MsgTypeTipsMetric
// MsgTypeVertex defines a vertex message.
MsgTypeVertex
// MsgTypeTipInfo defines a tip info message.
MsgTypeTipInfo
) )
type wsmsg struct { type wsmsg struct {
...@@ -187,14 +152,6 @@ type msg struct { ...@@ -187,14 +152,6 @@ type msg struct {
Value int64 `json:"value"` Value int64 `json:"value"`
} }
type drngMsg struct {
Instance uint32 `json:"instance"`
DistributedPK string `json:"dpk"`
Round uint64 `json:"round"`
Randomness string `json:"randomness"`
Timestamp string `json:"timestamp"`
}
type nodestatus struct { type nodestatus struct {
ID string `json:"id"` ID string `json:"id"`
Version string `json:"version"` Version string `json:"version"`
......
...@@ -5,7 +5,6 @@ import ( ...@@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"time"
"github.com/gobuffalo/packr/v2" "github.com/gobuffalo/packr/v2"
"github.com/iotaledger/goshimmer/plugins/config" "github.com/iotaledger/goshimmer/plugins/config"
...@@ -114,54 +113,3 @@ func setupRoutes(e *echo.Echo) { ...@@ -114,54 +113,3 @@ func setupRoutes(e *echo.Echo) {
c.String(statusCode, message) c.String(statusCode, message)
} }
} }
func registerWSClient() (uint64, chan interface{}) {
// allocate new client id
clientsMu.Lock()
defer clientsMu.Unlock()
clientID := nextClientID
channel := make(chan interface{}, 100)
clients[clientID] = channel
nextClientID++
return clientID, channel
}
func websocketRoute(c echo.Context) error {
defer func() {
if r := recover(); r != nil {
log.Errorf("recovered from panic within WS handle func: %s", r)
}
}()
ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
if err != nil {
return err
}
defer ws.Close()
ws.EnableWriteCompression(true)
// cleanup client websocket
clientID, channel := registerWSClient()
defer func() {
clientsMu.Lock()
delete(clients, clientID)
close(channel)
clientsMu.Unlock()
}()
msgRateLimiter := time.NewTicker(time.Second / 20)
defer msgRateLimiter.Stop()
for {
<-msgRateLimiter.C
msg := <-channel
if err := ws.WriteJSON(msg); err != nil {
log.Warnf("error while writing to web socket client %s: %s", c.RealIP(), err.Error())
break
}
if err := ws.SetWriteDeadline(time.Now().Add(webSocketWriteTimeout)); err != nil {
log.Warnf("error while setting write deadline on web socket client %s: %s", c.RealIP(), err.Error())
break
}
}
return nil
}
package dashboard
import (
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/tangle"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/workerpool"
)
var (
visualizerWorkerCount = 1
visualizerWorkerQueueSize = 500
visualizerWorkerPool *workerpool.WorkerPool
)
// vertex defines a vertex in a DAG.
type vertex struct {
ID string `json:"id"`
TrunkID string `json:"trunk_id"`
BranchID string `json:"branch_id"`
IsSolid bool `json:"is_solid"`
}
// tipinfo holds information about whether a given message is a tip or not.
type tipinfo struct {
ID string `json:"id"`
IsTip bool `json:"is_tip"`
}
func configureVisualizer() {
visualizerWorkerPool = workerpool.New(func(task workerpool.Task) {
switch x := task.Param(0).(type) {
case *message.CachedMessage:
sendVertex(x, task.Param(1).(*tangle.CachedMessageMetadata))
case message.Id:
sendTipInfo(x, task.Param(1).(bool))
}
task.Return(nil)
}, workerpool.WorkerCount(visualizerWorkerCount), workerpool.QueueSize(visualizerWorkerQueueSize))
}
func sendVertex(cachedMessage *message.CachedMessage, cachedMessageMetadata *tangle.CachedMessageMetadata) {
defer cachedMessage.Release()
defer cachedMessageMetadata.Release()
msg := cachedMessage.Unwrap()
broadcastWsMessage(&wsmsg{MsgTypeVertex, &vertex{
ID: msg.Id().String(),
TrunkID: msg.TrunkId().String(),
BranchID: msg.BranchId().String(),
IsSolid: cachedMessageMetadata.Unwrap().IsSolid(),
}}, true)
}
func sendTipInfo(messageID message.Id, isTip bool) {
broadcastWsMessage(&wsmsg{MsgTypeTipInfo, &tipinfo{
ID: messageID.String(),
IsTip: isTip,
}}, true)
}
func runVisualizer() {
notifyNewMsg := events.NewClosure(func(message *message.CachedMessage, metadata *tangle.CachedMessageMetadata) {
defer message.Release()
defer metadata.Release()
visualizerWorkerPool.TrySubmit(message.Retain(), metadata.Retain())
})
notifyNewTip := events.NewClosure(func(messageId message.Id) {
visualizerWorkerPool.TrySubmit(messageId, true)
})
notifyDeletedTip := events.NewClosure(func(messageId message.Id) {
visualizerWorkerPool.TrySubmit(messageId, false)
})
daemon.BackgroundWorker("Dashboard[Visualizer]", func(shutdownSignal <-chan struct{}) {
messagelayer.Tangle.Events.MessageAttached.Attach(notifyNewMsg)
defer messagelayer.Tangle.Events.MessageAttached.Detach(notifyNewMsg)
messagelayer.Tangle.Events.MessageSolid.Attach(notifyNewMsg)
defer messagelayer.Tangle.Events.MessageSolid.Detach(notifyNewMsg)
messagelayer.TipSelector.Events.TipAdded.Attach(notifyNewTip)
defer messagelayer.TipSelector.Events.TipAdded.Detach(notifyNewTip)
messagelayer.TipSelector.Events.TipRemoved.Attach(notifyDeletedTip)
defer messagelayer.TipSelector.Events.TipRemoved.Detach(notifyDeletedTip)
visualizerWorkerPool.Start()
<-shutdownSignal
log.Info("Stopping Dashboard[Visualizer] ...")
visualizerWorkerPool.Stop()
log.Info("Stopping Dashboard[Visualizer] ... done")
}, shutdown.PriorityDashboard)
}
package dashboard
import (
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/iotaledger/goshimmer/plugins/metrics"
"github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/workerpool"
"github.com/labstack/echo"
)
var (
// settings
wsSendWorkerCount = 1
wsSendWorkerQueueSize = 250
wsSendWorkerPool *workerpool.WorkerPool
webSocketWriteTimeout = time.Duration(3) * time.Second
// clients
wsClientsMu sync.Mutex
wsClients = make(map[uint64]*wsclient)
nextWsClientID uint64
// gorilla websocket layer
upgrader = websocket.Upgrader{
HandshakeTimeout: webSocketWriteTimeout,
CheckOrigin: func(r *http.Request) bool { return true },
EnableCompression: true,
}
)
// a websocket client with a channel for downstream messages.
type wsclient struct {
// downstream message channel.
channel chan interface{}
// a channel which is closed when the websocket client is disconnected.
exit chan struct{}
}
func configureWebSocketWorkerPool() {
wsSendWorkerPool = workerpool.New(func(task workerpool.Task) {
broadcastWsMessage(&wsmsg{MsgTypeMPSMetric, task.Param(0).(uint64)})
broadcastWsMessage(&wsmsg{MsgTypeNodeStatus, currentNodeStatus()})
broadcastWsMessage(&wsmsg{MsgTypeNeighborMetric, neighborMetrics()})
broadcastWsMessage(&wsmsg{MsgTypeTipsMetric, messagelayer.TipSelector.TipCount()})
task.Return(nil)
}, workerpool.WorkerCount(wsSendWorkerCount), workerpool.QueueSize(wsSendWorkerQueueSize))
}
func runWebSocketStreams() {
updateStatus := events.NewClosure(func(mps uint64) {
wsSendWorkerPool.TrySubmit(mps)
})
daemon.BackgroundWorker("Dashboard[StatusUpdate]", func(shutdownSignal <-chan struct{}) {
metrics.Events.ReceivedMPSUpdated.Attach(updateStatus)
wsSendWorkerPool.Start()
<-shutdownSignal
log.Info("Stopping Dashboard[StatusUpdate] ...")
metrics.Events.ReceivedMPSUpdated.Detach(updateStatus)
wsSendWorkerPool.Stop()
log.Info("Stopping Dashboard[StatusUpdate] ... done")
}, shutdown.PriorityDashboard)
}
// reigsters and creates a new websocket client.
func registerWSClient() (uint64, *wsclient) {
wsClientsMu.Lock()
defer wsClientsMu.Unlock()
clientID := nextWsClientID
wsClient := &wsclient{
channel: make(chan interface{}, 500),
exit: make(chan struct{}),
}
wsClients[clientID] = wsClient
nextWsClientID++
return clientID, wsClient
}
// removes the websocket client with the given id.
func removeWsClient(clientID uint64) {
wsClientsMu.Lock()
defer wsClientsMu.Unlock()
wsClient := wsClients[clientID]
close(wsClient.exit)
close(wsClient.channel)
delete(wsClients, clientID)
}
// broadcasts the given message to all connected websocket clients.
func broadcastWsMessage(msg interface{}, dontDrop ...bool) {
wsClientsMu.Lock()
defer wsClientsMu.Unlock()
for _, wsClient := range wsClients {
if len(dontDrop) > 0 {
select {
case wsClient.channel <- msg:
case <-wsClient.exit:
// get unblocked if the websocket connection just got closed
}
continue
}
select {
case wsClient.channel <- msg:
default:
// potentially drop if slow consumer
}
}
}
func websocketRoute(c echo.Context) error {
defer func() {
if r := recover(); r != nil {
log.Errorf("recovered from websocket handle func: %s", r)
}
}()
// upgrade to websocket connection
ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
if err != nil {
return err
}
defer ws.Close()
ws.EnableWriteCompression(true)
// cleanup client websocket
clientID, wsClient := registerWSClient()
defer removeWsClient(clientID)
for {
msg := <-wsClient.channel
if err := ws.WriteJSON(msg); err != nil {
break
}
if err := ws.SetWriteDeadline(time.Now().Add(webSocketWriteTimeout)); err != nil {
break
}
}
return nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment