Skip to content
Snippets Groups Projects
Commit de15eae0 authored by capossele's avatar capossele
Browse files

:construction: WIP

parent 7b7b3cc3
No related branches found
No related tags found
No related merge requests found
...@@ -11,7 +11,7 @@ require ( ...@@ -11,7 +11,7 @@ require (
github.com/gobuffalo/packr/v2 v2.7.1 github.com/gobuffalo/packr/v2 v2.7.1
github.com/golang/protobuf v1.3.5 github.com/golang/protobuf v1.3.5
github.com/gorilla/websocket v1.4.1 github.com/gorilla/websocket v1.4.1
github.com/iotaledger/hive.go v0.0.0-20200508125657-76ee9eb66cf8 github.com/iotaledger/hive.go v0.0.0-20200513180357-f0ac8c45b754
github.com/iotaledger/iota.go v1.0.0-beta.14 github.com/iotaledger/iota.go v1.0.0-beta.14
github.com/labstack/echo v3.3.10+incompatible github.com/labstack/echo v3.3.10+incompatible
github.com/labstack/gommon v0.3.0 github.com/labstack/gommon v0.3.0
......
...@@ -141,6 +141,8 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO ...@@ -141,6 +141,8 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/iotaledger/hive.go v0.0.0-20200508125657-76ee9eb66cf8 h1:b2CAofhKmJDLkPC9ul88KXa3BU5zRHTZp3TOPbIzSsg= github.com/iotaledger/hive.go v0.0.0-20200508125657-76ee9eb66cf8 h1:b2CAofhKmJDLkPC9ul88KXa3BU5zRHTZp3TOPbIzSsg=
github.com/iotaledger/hive.go v0.0.0-20200508125657-76ee9eb66cf8/go.mod h1:HgYsLMzyQV+eaiUrxa1c7qvH9Jwi2ncycqtlw+Lczhs= github.com/iotaledger/hive.go v0.0.0-20200508125657-76ee9eb66cf8/go.mod h1:HgYsLMzyQV+eaiUrxa1c7qvH9Jwi2ncycqtlw+Lczhs=
github.com/iotaledger/hive.go v0.0.0-20200513180357-f0ac8c45b754 h1:UCyAisLvAuKIWf2bMz+iYYgjGdHS7H4W2wMTpWg9yl8=
github.com/iotaledger/hive.go v0.0.0-20200513180357-f0ac8c45b754/go.mod h1:HgYsLMzyQV+eaiUrxa1c7qvH9Jwi2ncycqtlw+Lczhs=
github.com/iotaledger/iota.go v1.0.0-beta.9/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8= github.com/iotaledger/iota.go v1.0.0-beta.9/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8=
github.com/iotaledger/iota.go v1.0.0-beta.14 h1:Oeb28MfBuJEeXcGrLhTCJFtbsnc8y1u7xidsAmiOD5A= github.com/iotaledger/iota.go v1.0.0-beta.14 h1:Oeb28MfBuJEeXcGrLhTCJFtbsnc8y1u7xidsAmiOD5A=
github.com/iotaledger/iota.go v1.0.0-beta.14/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8= github.com/iotaledger/iota.go v1.0.0-beta.14/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8=
......
...@@ -7,12 +7,15 @@ import ( ...@@ -7,12 +7,15 @@ import (
"sync" "sync"
"time" "time"
"github.com/iotaledger/goshimmer/dapps/fpctest"
"github.com/iotaledger/goshimmer/packages/shutdown" "github.com/iotaledger/goshimmer/packages/shutdown"
"github.com/iotaledger/goshimmer/packages/vote"
"github.com/iotaledger/goshimmer/plugins/analysis/packet" "github.com/iotaledger/goshimmer/plugins/analysis/packet"
"github.com/iotaledger/goshimmer/plugins/autopeering" "github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/autopeering/local" "github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/goshimmer/plugins/config" "github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger" "github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/network" "github.com/iotaledger/hive.go/network"
"github.com/iotaledger/hive.go/node" "github.com/iotaledger/hive.go/node"
...@@ -42,6 +45,10 @@ var ( ...@@ -42,6 +45,10 @@ var (
func run(_ *node.Plugin) { func run(_ *node.Plugin) {
log = logger.NewLogger(PluginName) log = logger.NewLogger(PluginName)
if err := daemon.BackgroundWorker(PluginName, func(shutdownSignal <-chan struct{}) { if err := daemon.BackgroundWorker(PluginName, func(shutdownSignal <-chan struct{}) {
fpctest.Voter().Events().RoundExecuted.Attach(events.NewClosure(onRoundExecuted))
defer fpctest.Voter().Events().RoundExecuted.Detach(events.NewClosure(onRoundExecuted))
ticker := time.NewTicker(reportIntervalSec * time.Second) ticker := time.NewTicker(reportIntervalSec * time.Second)
defer ticker.Stop() defer ticker.Stop()
for { for {
...@@ -134,3 +141,38 @@ func reportHeartbeat(dispatchers *EventDispatchers) { ...@@ -134,3 +141,38 @@ func reportHeartbeat(dispatchers *EventDispatchers) {
hb := &packet.Heartbeat{OwnID: nodeID, OutboundIDs: outboundIDs, InboundIDs: inboundIDs} hb := &packet.Heartbeat{OwnID: nodeID, OutboundIDs: outboundIDs, InboundIDs: inboundIDs}
dispatchers.Heartbeat(hb) dispatchers.Heartbeat(hb)
} }
func onRoundExecuted(roundStats *vote.RoundStats) {
// get own ID
var nodeID []byte
if local.GetInstance() != nil {
// doesn't copy the ID, take care not to modify underlying bytearray!
nodeID = local.GetInstance().ID().Bytes()
}
hb := &packet.FPCHeartbeat{
OwnID: nodeID,
RoundStats: *roundStats,
}
data, err := packet.NewFPCHeartbeatMessage(hb)
if err != nil {
log.Info(err, " - FPC heartbeat message skipped")
return
}
conn, err := net.Dial("tcp", config.Node.GetString(CfgServerAddress))
if err != nil {
log.Debugf("Could not connect to reporting server: %s", err.Error())
return
}
managedConn := network.NewManagedConnection(conn)
connLock.Lock()
defer connLock.Unlock()
if _, err = managedConn.Write(data); err != nil {
log.Debugw("Error while writing to connection", "Description", err)
return
}
}
package packet
import (
"bytes"
"encoding/binary"
"encoding/gob"
"errors"
"github.com/iotaledger/goshimmer/packages/vote"
"github.com/iotaledger/hive.go/protocol/message"
"github.com/iotaledger/hive.go/protocol/tlv"
)
var (
// ErrInvalidFPCHeartbeat is returned for invalid FPC heartbeats.
ErrInvalidFPCHeartbeat = errors.New("invalid FPC heartbeat")
)
var (
// HeartbeatMessageDefinition defines a heartbeat message's format.
FPCHeartbeatMessageDefinition = &message.Definition{
ID: MessageTypeFPCHeartbeat,
MaxBytesLength: 65535,
VariableLength: true,
}
)
// Heartbeat represents a heartbeat packet.
type FPCHeartbeat struct {
// The ID of the node who sent the heartbeat.
// Must be contained when a heartbeat is serialized.
OwnID []byte
// RoundStats contains stats about an FPC round.
RoundStats vote.RoundStats
// Finalized contains the finalized conflicts within the last FPC round.
Finalized map[string]vote.Opinion
}
// ParseFPCHeartbeat parses a slice of bytes (serialized packet) into a FPC heartbeat.
func ParseFPCHeartbeat(data []byte) (*FPCHeartbeat, error) {
hb := &FPCHeartbeat{}
buf := new(bytes.Buffer)
_, err := buf.Write(data)
if err != nil {
return nil, err
}
decoder := gob.NewDecoder(buf)
err = decoder.Decode(hb)
if err != nil {
return nil, err
}
return hb, nil
}
func (hb FPCHeartbeat) Bytes() ([]byte, error) {
buf := new(bytes.Buffer)
encoder := gob.NewEncoder(buf)
err := encoder.Encode(hb)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// NewHeartbeatMessage serializes the given heartbeat into a byte slice and adds a tlv header to the packet.
// message = tlv header + serialized packet
func NewFPCHeartbeatMessage(hb *FPCHeartbeat) ([]byte, error) {
packet, err := hb.Bytes()
if err != nil {
return nil, err
}
// calculate total needed bytes based on packet
packetSize := len(packet)
// create a buffer for tlv header plus the packet
buf := bytes.NewBuffer(make([]byte, 0, tlv.HeaderMessageDefinition.MaxBytesLength+uint16(packetSize)))
// write tlv header into buffer
if err := tlv.WriteHeader(buf, MessageTypeFPCHeartbeat, uint16(packetSize)); err != nil {
return nil, err
}
// write serialized packet bytes into the buffer
if err := binary.Write(buf, binary.BigEndian, packet); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
package packet
import (
"crypto/sha256"
"testing"
"time"
"github.com/iotaledger/goshimmer/packages/vote"
"github.com/iotaledger/hive.go/protocol/message"
"github.com/iotaledger/hive.go/protocol/tlv"
"github.com/stretchr/testify/require"
)
var ownID = sha256.Sum256([]byte{'A'})
func dummyFPCHeartbeat() *FPCHeartbeat {
return &FPCHeartbeat{
OwnID: ownID[:],
RoundStats: vote.RoundStats{
Duration: time.Second,
RandUsed: 0.5,
ActiveVoteContexts: map[string]*vote.Context{
"one": {
ID: "one",
Liked: 1.,
Rounds: 3,
Opinions: []vote.Opinion{vote.Dislike, vote.Like, vote.Dislike},
}},
QueriedOpinions: []vote.QueriedOpinions{{
OpinionGiverID: "nodeA",
Opinions: map[string]vote.Opinion{"one": vote.Like, "two": vote.Dislike},
TimesCounted: 2,
}},
},
Finalized: map[string]vote.Opinion{"one": vote.Like, "two": vote.Dislike},
}
}
func TestFPCHeartbeat(t *testing.T) {
hb := dummyFPCHeartbeat()
packet, err := hb.Bytes()
require.NoError(t, err)
hbParsed, err := ParseFPCHeartbeat(packet)
require.NoError(t, err)
require.Equal(t, hb, hbParsed)
tlvHeaderLength := int(tlv.HeaderMessageDefinition.MaxBytesLength)
msg, err := NewFPCHeartbeatMessage(hb)
require.NoError(t, err)
require.Equal(t, MessageTypeFPCHeartbeat, message.Type(msg[0]))
hbParsed, err = ParseFPCHeartbeat(msg[tlvHeaderLength:])
require.NoError(t, err)
require.Equal(t, hb, hbParsed)
}
...@@ -35,10 +35,6 @@ var ( ...@@ -35,10 +35,6 @@ var (
HeartbeatMaxOutboundPeersCount*sha256.Size + HeartbeatMaxInboundPeersCount*sha256.Size HeartbeatMaxOutboundPeersCount*sha256.Size + HeartbeatMaxInboundPeersCount*sha256.Size
) )
const (
MessageTypeHeartbeat message.Type = 1
)
var ( var (
// HeartbeatMessageDefinition defines a heartbeat message's format. // HeartbeatMessageDefinition defines a heartbeat message's format.
HeartbeatMessageDefinition = &message.Definition{ HeartbeatMessageDefinition = &message.Definition{
...@@ -48,7 +44,6 @@ var ( ...@@ -48,7 +44,6 @@ var (
} }
) )
// Heartbeat represents a heartbeat packet. // Heartbeat represents a heartbeat packet.
type Heartbeat struct { type Heartbeat struct {
// The ID of the node who sent the heartbeat. // The ID of the node who sent the heartbeat.
......
...@@ -16,14 +16,22 @@ var ( ...@@ -16,14 +16,22 @@ var (
var AnalysisMsgRegistry *message.Registry var AnalysisMsgRegistry *message.Registry
func init() { func init() {
AnalysisMsgRegistry = message.NewRegistry() AnalysisMsgRegistry = message.NewRegistry([]*message.Definition{
// register tlv header type tlv.HeaderMessageDefinition,
if err := AnalysisMsgRegistry.RegisterType(tlv.MessageTypeHeader, tlv.HeaderMessageDefinition); err != nil { HeartbeatMessageDefinition,
panic(err) FPCHeartbeatMessageDefinition,
} })
// // register tlv header type
// if err := AnalysisMsgRegistry.RegisterType(tlv.MessageTypeHeader, tlv.HeaderMessageDefinition); err != nil {
// panic(err)
// }
// analysis plugin specific types (msgType > 0) // // analysis plugin specific types (msgType > 0)
if err := AnalysisMsgRegistry.RegisterType(MessageTypeHeartbeat, HeartbeatMessageDefinition); err != nil { // if err := AnalysisMsgRegistry.RegisterType(MessageTypeHeartbeat, HeartbeatMessageDefinition); err != nil {
panic(err) // panic(err)
} // }
// if err := AnalysisMsgRegistry.RegisterType(MessageTypeFPCHeartbeat, FPCHeartbeatMessageDefinition); err != nil {
// panic(err)
// }
} }
package packet
import "github.com/iotaledger/hive.go/protocol/message"
const (
MessageTypeHeartbeat message.Type = iota + 1
MessageTypeFPCHeartbeat
)
...@@ -19,6 +19,8 @@ var Events = struct { ...@@ -19,6 +19,8 @@ var Events = struct {
Error *events.Event Error *events.Event
// Heartbeat triggers when an heartbeat has been received. // Heartbeat triggers when an heartbeat has been received.
Heartbeat *events.Event Heartbeat *events.Event
// FPCHeartbeat triggers when an FPC heartbeat has been received.
FPCHeartbeat *events.Event
}{ }{
events.NewEvent(stringCaller), events.NewEvent(stringCaller),
events.NewEvent(stringCaller), events.NewEvent(stringCaller),
...@@ -26,6 +28,7 @@ var Events = struct { ...@@ -26,6 +28,7 @@ var Events = struct {
events.NewEvent(stringStringCaller), events.NewEvent(stringStringCaller),
events.NewEvent(errorCaller), events.NewEvent(errorCaller),
events.NewEvent(heartbeatPacketCaller), events.NewEvent(heartbeatPacketCaller),
events.NewEvent(fpcHeartbeatPacketCaller),
} }
func stringCaller(handler interface{}, params ...interface{}) { func stringCaller(handler interface{}, params ...interface{}) {
...@@ -43,3 +46,7 @@ func errorCaller(handler interface{}, params ...interface{}) { ...@@ -43,3 +46,7 @@ func errorCaller(handler interface{}, params ...interface{}) {
func heartbeatPacketCaller(handler interface{}, params ...interface{}) { func heartbeatPacketCaller(handler interface{}, params ...interface{}) {
handler.(func(heartbeat *packet.Heartbeat))(params[0].(*packet.Heartbeat)) handler.(func(heartbeat *packet.Heartbeat))(params[0].(*packet.Heartbeat))
} }
func fpcHeartbeatPacketCaller(handler interface{}, params ...interface{}) {
handler.(func(hb *packet.FPCHeartbeat))(params[0].(*packet.FPCHeartbeat))
}
...@@ -109,6 +109,9 @@ func wireUp(p *protocol.Protocol) { ...@@ -109,6 +109,9 @@ func wireUp(p *protocol.Protocol) {
p.Events.Received[packet.MessageTypeHeartbeat].Attach(events.NewClosure(func(data []byte) { p.Events.Received[packet.MessageTypeHeartbeat].Attach(events.NewClosure(func(data []byte) {
processHeartbeatPacket(data, p) processHeartbeatPacket(data, p)
})) }))
p.Events.Received[packet.MessageTypeFPCHeartbeat].Attach(events.NewClosure(func(data []byte) {
processFPCHeartbeatPacket(data, p)
}))
} }
// processHeartbeatPacket parses the serialized data into a Heartbeat packet and triggers its event // processHeartbeatPacket parses the serialized data into a Heartbeat packet and triggers its event
...@@ -121,3 +124,14 @@ func processHeartbeatPacket(data []byte, p *protocol.Protocol) { ...@@ -121,3 +124,14 @@ func processHeartbeatPacket(data []byte, p *protocol.Protocol) {
} }
Events.Heartbeat.Trigger(heartbeatPacket) Events.Heartbeat.Trigger(heartbeatPacket)
} }
// processHeartbeatPacket parses the serialized data into a Heartbeat packet and triggers its event
func processFPCHeartbeatPacket(data []byte, p *protocol.Protocol) {
hb, err := packet.ParseFPCHeartbeat(data)
if err != nil {
Events.Error.Trigger(err)
p.CloseConnection()
return
}
Events.FPCHeartbeat.Trigger(hb)
}
...@@ -47,7 +47,7 @@ func configure(plugin *node.Plugin) { ...@@ -47,7 +47,7 @@ func configure(plugin *node.Plugin) {
} }
engine.GET("/datastream", echo.WrapHandler(websocket.Handler(dataStream))) engine.GET("/datastream", echo.WrapHandler(websocket.Handler(dataStream)))
configureEventsRecording(plugin) configureEventsRecording()
} }
func run(_ *node.Plugin) { func run(_ *node.Plugin) {
......
...@@ -11,7 +11,6 @@ import ( ...@@ -11,7 +11,6 @@ import (
"github.com/iotaledger/goshimmer/plugins/analysis/server" "github.com/iotaledger/goshimmer/plugins/analysis/server"
"github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/node"
) )
// the period in which we scan and delete old data. // the period in which we scan and delete old data.
...@@ -26,8 +25,12 @@ var ( ...@@ -26,8 +25,12 @@ var (
) )
// configures the event recording by attaching to the analysis server's heartbeat event. // configures the event recording by attaching to the analysis server's heartbeat event.
func configureEventsRecording(plugin *node.Plugin) { func configureEventsRecording() {
server.Events.Heartbeat.Attach(events.NewClosure(func(hb *packet.Heartbeat) { server.Events.Heartbeat.Attach(events.NewClosure(onHeartbeatReceived))
server.Events.FPCHeartbeat.Attach(events.NewClosure(onFPCHeartbeatReceived))
}
func onHeartbeatReceived(hb *packet.Heartbeat) {
var out strings.Builder var out strings.Builder
for _, value := range hb.OutboundIDs { for _, value := range hb.OutboundIDs {
out.WriteString(hex.EncodeToString(value)) out.WriteString(hex.EncodeToString(value))
...@@ -36,7 +39,7 @@ func configureEventsRecording(plugin *node.Plugin) { ...@@ -36,7 +39,7 @@ func configureEventsRecording(plugin *node.Plugin) {
for _, value := range hb.InboundIDs { for _, value := range hb.InboundIDs {
in.WriteString(hex.EncodeToString(value)) in.WriteString(hex.EncodeToString(value))
} }
plugin.Node.Logger.Debugw( log.Debugw(
"Heartbeat", "Heartbeat",
"nodeId", hex.EncodeToString(hb.OwnID), "nodeId", hex.EncodeToString(hb.OwnID),
"outboundIds", out.String(), "outboundIds", out.String(),
...@@ -106,7 +109,14 @@ func configureEventsRecording(plugin *node.Plugin) { ...@@ -106,7 +109,14 @@ func configureEventsRecording(plugin *node.Plugin) {
// update links map // update links map
links[incomingNeighborString][nodeIDString] = timestamp links[incomingNeighborString][nodeIDString] = timestamp
} }
})) }
func onFPCHeartbeatReceived(hb *packet.FPCHeartbeat) {
log.Infow(
"FPCHeartbeat",
"nodeId", hex.EncodeToString(hb.OwnID),
"ActiveVoteContext", hb.RoundStats.ActiveVoteContexts,
)
} }
func runEventsRecordManager() { func runEventsRecordManager() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment