Skip to content
Snippets Groups Projects
Unverified Commit f4acc2bd authored by Jonas Theis's avatar Jonas Theis Committed by GitHub
Browse files

dApp network delay (#472)

* Create base for little dapp to measure actual network delay

* Adjust remote log to support additional messages

* Send network delay objects via remote log functionality

* Create wrapper for remote logger connection for easier use

* Add check for issuer public key

* Disable debug log in Logstash

* Disable debug log in Logstash

* Bind host time into containers
parent 7ba00254
No related branches found
No related tags found
No related merge requests found
...@@ -28,4 +28,5 @@ objectsdb/ ...@@ -28,4 +28,5 @@ objectsdb/
shimmer shimmer
goshimmer goshimmer
config.json config.json
\ No newline at end of file .env
\ No newline at end of file
...@@ -67,5 +67,8 @@ ...@@ -67,5 +67,8 @@
"username": "goshimmer" "username": "goshimmer"
}, },
"bindAddress": "127.0.0.1:8080" "bindAddress": "127.0.0.1:8080"
},
"networkdelay": {
"originPublicKey": "9DB3j9cWYSuEEtkvanrzqkzCQMdH1FGv3TawJdVbDxkd"
} }
} }
package networkdelay
import (
"fmt"
"time"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/message"
messageTangle "github.com/iotaledger/goshimmer/packages/binary/messagelayer/tangle"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/goshimmer/plugins/config"
"github.com/iotaledger/goshimmer/plugins/messagelayer"
"github.com/iotaledger/goshimmer/plugins/remotelog"
"github.com/iotaledger/hive.go/crypto/ed25519"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/node"
"github.com/mr-tron/base58"
)
const (
// PluginName contains the human readable name of the plugin.
PluginName = "NetworkDelay"
// CfgNetworkDelayOriginPublicKey defines the config flag of the issuer node public key.
CfgNetworkDelayOriginPublicKey = "networkdelay.originPublicKey"
remoteLogType = "networkdelay"
)
var (
// App is the "plugin" instance of the network delay application.
App = node.NewPlugin(PluginName, node.Disabled, configure)
// log holds a reference to the logger used by this app.
log *logger.Logger
remoteLogger *remotelog.RemoteLoggerConn
myID string
myPublicKey ed25519.PublicKey
originPublicKey ed25519.PublicKey
)
func configure(_ *node.Plugin) {
// configure logger
log = logger.NewLogger(PluginName)
remoteLogger = remotelog.RemoteLogger()
if local.GetInstance() != nil {
myID = local.GetInstance().ID().String()
myPublicKey = local.GetInstance().PublicKey()
}
// get origin public key from config
bytes, err := base58.Decode(config.Node.GetString(CfgNetworkDelayOriginPublicKey))
if err != nil {
log.Fatalf("could not parse %s config entry as base58. %v", CfgNetworkDelayOriginPublicKey, err)
}
originPublicKey, _, err = ed25519.PublicKeyFromBytes(bytes)
if err != nil {
log.Fatalf("could not parse %s config entry as public key. %v", CfgNetworkDelayOriginPublicKey, err)
}
configureWebAPI()
// subscribe to message-layer
messagelayer.Tangle.Events.MessageSolid.Attach(events.NewClosure(onReceiveMessageFromMessageLayer))
}
func onReceiveMessageFromMessageLayer(cachedMessage *message.CachedMessage, cachedMessageMetadata *messageTangle.CachedMessageMetadata) {
defer cachedMessage.Release()
defer cachedMessageMetadata.Release()
solidMessage := cachedMessage.Unwrap()
if solidMessage == nil {
log.Debug("failed to unpack solid message from message layer")
return
}
messagePayload := solidMessage.Payload()
if messagePayload.Type() != Type {
return
}
// check for node identity
issuerPubKey := solidMessage.IssuerPublicKey()
if issuerPubKey != originPublicKey || issuerPubKey == myPublicKey {
return
}
networkDelayObject, ok := messagePayload.(*Object)
if !ok {
log.Info("could not cast payload to network delay object")
return
}
now := time.Now().UnixNano()
// abort if message was sent more than 1min ago
// this should only happen due to a node resyncing
fmt.Println(time.Duration(now-networkDelayObject.sentTime), time.Minute)
if time.Duration(now-networkDelayObject.sentTime) > time.Minute {
log.Debugf("Received network delay message with >1min delay\n%s", networkDelayObject)
return
}
sendToRemoteLog(networkDelayObject, now)
}
func sendToRemoteLog(networkDelayObject *Object, receiveTime int64) {
m := networkDelay{
NodeID: myID,
ID: networkDelayObject.id.String(),
SentTime: networkDelayObject.sentTime,
ReceiveTime: receiveTime,
Delta: receiveTime - networkDelayObject.sentTime,
Type: remoteLogType,
}
_ = remoteLogger.Send(m)
}
type networkDelay struct {
NodeID string `json:"nodeId"`
ID string `json:"id"`
SentTime int64 `json:"sentTime"`
ReceiveTime int64 `json:"receiveTime"`
Delta int64 `json:"delta"`
Type string `json:"type"`
}
package networkdelay
import (
"sync"
"github.com/iotaledger/goshimmer/packages/binary/messagelayer/payload"
"github.com/iotaledger/hive.go/marshalutil"
"github.com/iotaledger/hive.go/stringify"
"github.com/mr-tron/base58"
)
// ID represents a 32 byte ID of a network delay object.
type ID [32]byte
// String returns a human-friendly representation of the ID.
func (id ID) String() string {
return base58.Encode(id[:])
}
// Object represents the network delay object type.
type Object struct {
id ID
sentTime int64
bytes []byte
bytesMutex sync.RWMutex
}
// NewObject creates a new network delay object.
func NewObject(id ID, sentTime int64) *Object {
return &Object{
id: id,
sentTime: sentTime,
}
}
// FromBytes parses the marshaled version of an Object into a Go object.
// It either returns a new Object or fills an optionally provided Object with the parsed information.
func FromBytes(bytes []byte, optionalTargetObject ...*Object) (result *Object, consumedBytes int, err error) {
marshalUtil := marshalutil.New(bytes)
result, err = Parse(marshalUtil, optionalTargetObject...)
consumedBytes = marshalUtil.ReadOffset()
return
}
// Parse unmarshals an Object using the given marshalUtil (for easier marshaling/unmarshaling).
func Parse(marshalUtil *marshalutil.MarshalUtil, optionalTarget ...*Object) (result *Object, err error) {
// determine the target that will hold the unmarshaled information
switch len(optionalTarget) {
case 0:
result = &Object{}
case 1:
result = optionalTarget[0]
default:
panic("too many arguments in call to FromBytes")
}
// read information that are required to identify the object from the outside
if _, err = marshalUtil.ReadUint32(); err != nil {
return
}
if _, err = marshalUtil.ReadUint32(); err != nil {
return
}
// parse id
id, err := marshalUtil.ReadBytes(32)
if err != nil {
return
}
copy(result.id[:], id)
// parse sent time
if result.sentTime, err = marshalUtil.ReadInt64(); err != nil {
return
}
// store bytes, so we don't have to marshal manually
consumedBytes := marshalUtil.ReadOffset()
copy(result.bytes, marshalUtil.Bytes()[:consumedBytes])
return
}
// Bytes returns a marshaled version of this Object.
func (o *Object) Bytes() (bytes []byte) {
// acquire lock for reading bytes
o.bytesMutex.RLock()
// return if bytes have been determined already
if bytes = o.bytes; bytes != nil {
o.bytesMutex.RUnlock()
return
}
// switch to write lock
o.bytesMutex.RUnlock()
o.bytesMutex.Lock()
defer o.bytesMutex.Unlock()
// return if bytes have been determined in the mean time
if bytes = o.bytes; bytes != nil {
return
}
objectLength := len(o.id) + marshalutil.INT64_SIZE
// initialize helper
marshalUtil := marshalutil.New(marshalutil.UINT32_SIZE + marshalutil.UINT32_SIZE + objectLength)
// marshal the payload specific information
marshalUtil.WriteUint32(Type)
marshalUtil.WriteUint32(uint32(objectLength))
marshalUtil.WriteBytes(o.id[:])
marshalUtil.WriteInt64(o.sentTime)
bytes = marshalUtil.Bytes()
return
}
// String returns a human-friendly representation of the Object.
func (o *Object) String() string {
return stringify.Struct("NetworkDelayObject",
stringify.StructField("id", o.id),
stringify.StructField("sentTime", uint64(o.sentTime)),
)
}
// region Payload implementation ///////////////////////////////////////////////////////////////////////////////////////
// Type represents the identifier which addresses the network delay Object type.
const Type = payload.Type(189)
// Type returns the type of the Object.
func (o *Object) Type() payload.Type {
return Type
}
// Unmarshal unmarshals the payload from the given bytes.
func (o *Object) Unmarshal(data []byte) (err error) {
_, _, err = FromBytes(data, o)
return
}
func init() {
payload.RegisterType(Type, func(data []byte) (payload payload.Payload, err error) {
payload = &Object{}
err = payload.Unmarshal(data)
return
})
}
// // endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
package networkdelay
import (
"math/rand"
"net/http"
"time"
"github.com/iotaledger/goshimmer/plugins/issuer"
"github.com/iotaledger/goshimmer/plugins/webapi"
"github.com/labstack/echo"
)
func configureWebAPI() {
webapi.Server.POST("networkdelay", broadcastNetworkDelayObject)
}
// broadcastNetworkDelayObject creates a message with a network delay object and
// broadcasts it to the node's neighbors. It returns the message ID if successful.
func broadcastNetworkDelayObject(c echo.Context) error {
// generate random id
rand.Seed(time.Now().UnixNano())
var id [32]byte
if _, err := rand.Read(id[:]); err != nil {
return c.JSON(http.StatusInternalServerError, Response{Error: err.Error()})
}
msg, err := issuer.IssuePayload(NewObject(id, time.Now().UnixNano()))
if err != nil {
return c.JSON(http.StatusBadRequest, Response{Error: err.Error()})
}
return c.JSON(http.StatusOK, Response{ID: msg.Id().String()})
}
// Response contains the ID of the message sent.
type Response struct {
ID string `json:"id,omitempty"`
Error string `json:"error,omitempty"`
}
package research package research
import ( import (
"github.com/iotaledger/goshimmer/dapps/networkdelay"
analysisclient "github.com/iotaledger/goshimmer/plugins/analysis/client" analysisclient "github.com/iotaledger/goshimmer/plugins/analysis/client"
analysisdashboard "github.com/iotaledger/goshimmer/plugins/analysis/dashboard" analysisdashboard "github.com/iotaledger/goshimmer/plugins/analysis/dashboard"
analysisserver "github.com/iotaledger/goshimmer/plugins/analysis/server" analysisserver "github.com/iotaledger/goshimmer/plugins/analysis/server"
...@@ -13,4 +14,5 @@ var PLUGINS = node.Plugins( ...@@ -13,4 +14,5 @@ var PLUGINS = node.Plugins(
analysisserver.Plugin, analysisserver.Plugin,
analysisclient.Plugin, analysisclient.Plugin,
analysisdashboard.Plugin, analysisdashboard.Plugin,
networkdelay.App,
) )
...@@ -5,12 +5,10 @@ ...@@ -5,12 +5,10 @@
package remotelog package remotelog
import ( import (
"encoding/json"
"fmt"
"net"
"os" "os"
"path/filepath" "path/filepath"
"runtime" "runtime"
"sync"
"time" "time"
"github.com/iotaledger/goshimmer/packages/shutdown" "github.com/iotaledger/goshimmer/packages/shutdown"
...@@ -26,17 +24,6 @@ import ( ...@@ -26,17 +24,6 @@ import (
"gopkg.in/src-d/go-git.v4" "gopkg.in/src-d/go-git.v4"
) )
type logMessage struct {
Version string `json:"version"`
GitHead string `json:"gitHead,omitempty"`
GitBranch string `json:"gitBranch,omitempty"`
NodeID string `json:"nodeId"`
Level string `json:"level"`
Name string `json:"name"`
Msg string `json:"msg"`
Timestamp time.Time `json:"timestamp"`
}
const ( const (
// CfgLoggerRemotelogServerAddress defines the config flag of the server address. // CfgLoggerRemotelogServerAddress defines the config flag of the server address.
CfgLoggerRemotelogServerAddress = "logger.remotelog.serverAddress" CfgLoggerRemotelogServerAddress = "logger.remotelog.serverAddress"
...@@ -44,17 +31,21 @@ const ( ...@@ -44,17 +31,21 @@ const (
CfgDisableEvents = "logger.disableEvents" CfgDisableEvents = "logger.disableEvents"
// PluginName is the name of the remote log plugin. // PluginName is the name of the remote log plugin.
PluginName = "RemoteLog" PluginName = "RemoteLog"
remoteLogType = "log"
) )
var ( var (
// Plugin is the plugin instance of the remote plugin instance. // Plugin is the plugin instance of the remote plugin instance.
Plugin = node.NewPlugin(PluginName, node.Disabled, configure, run) Plugin = node.NewPlugin(PluginName, node.Disabled, configure, run)
log *logger.Logger log *logger.Logger
conn net.Conn
myID string myID string
myGitHead string myGitHead string
myGitBranch string myGitBranch string
workerPool *workerpool.WorkerPool workerPool *workerpool.WorkerPool
remoteLogger *RemoteLoggerConn
remoteLoggerOnce sync.Once
) )
func init() { func init() {
...@@ -69,12 +60,8 @@ func configure(plugin *node.Plugin) { ...@@ -69,12 +60,8 @@ func configure(plugin *node.Plugin) {
return return
} }
c, err := net.Dial("udp", config.Node.GetString(CfgLoggerRemotelogServerAddress)) // initialize remote logger connection
if err != nil { RemoteLogger()
log.Fatalf("Could not create UDP socket to '%s'. %v", config.Node.GetString(CfgLoggerRemotelogServerAddress), err)
return
}
conn = c
if local.GetInstance() != nil { if local.GetInstance() != nil {
myID = local.GetInstance().ID().String() myID = local.GetInstance().ID().String()
...@@ -117,9 +104,10 @@ func sendLogMsg(level logger.Level, name string, msg string) { ...@@ -117,9 +104,10 @@ func sendLogMsg(level logger.Level, name string, msg string) {
name, name,
msg, msg,
time.Now(), time.Now(),
remoteLogType,
} }
b, _ := json.Marshal(m)
fmt.Fprint(conn, string(b)) _ = RemoteLogger().Send(m)
} }
func getGitInfo() { func getGitInfo() {
...@@ -159,3 +147,30 @@ func getGitDir() string { ...@@ -159,3 +147,30 @@ func getGitDir() string {
return gitDir return gitDir
} }
// RemoteLogger represents a connection to our remote log server.
func RemoteLogger() *RemoteLoggerConn {
remoteLoggerOnce.Do(func() {
r, err := newRemoteLoggerConn(config.Node.GetString(CfgLoggerRemotelogServerAddress))
if err != nil {
log.Fatal(err)
return
}
remoteLogger = r
})
return remoteLogger
}
type logMessage struct {
Version string `json:"version"`
GitHead string `json:"gitHead,omitempty"`
GitBranch string `json:"gitBranch,omitempty"`
NodeID string `json:"nodeId"`
Level string `json:"level"`
Name string `json:"name"`
Msg string `json:"msg"`
Timestamp time.Time `json:"timestamp"`
Type string `json:"type"`
}
package remotelog
import (
"encoding/json"
"fmt"
"net"
)
// RemoteLoggerConn is a wrapper for a connection to our RemoteLog server.
type RemoteLoggerConn struct {
conn net.Conn
}
func newRemoteLoggerConn(address string) (*RemoteLoggerConn, error) {
c, err := net.Dial("udp", address)
if err != nil {
return nil, fmt.Errorf("could not create UDP socket to '%s'. %v", address, err)
}
return &RemoteLoggerConn{conn: c}, nil
}
// Send sends a message on the RemoteLoggers connection.
func (r *RemoteLoggerConn) Send(msg interface{}) error {
b, err := json.Marshal(msg)
if err != nil {
return err
}
_, err = r.conn.Write(b)
if err != nil {
return err
}
return nil
}
ELK_VERSION=7.5.2
\ No newline at end of file
ELK_VERSION=7.5.2
VIRTUAL_HOST=
LETSENCRYPT_HOST=
\ No newline at end of file
...@@ -20,7 +20,15 @@ filter { ...@@ -20,7 +20,15 @@ filter {
} }
output { output {
elasticsearch { # stdout {codec => rubydebug}
hosts => "elasticsearch:9200" if [log][type] == "networkdelay" {
} elasticsearch {
hosts => "elasticsearch:9200"
index => "networkdelay"
}
} else {
elasticsearch {
hosts => "elasticsearch:9200"
}
}
} }
\ No newline at end of file
...@@ -4,7 +4,9 @@ services: ...@@ -4,7 +4,9 @@ services:
elasticsearch: elasticsearch:
container_name: elasticsearch container_name: elasticsearch
image: docker.elastic.co/elasticsearch/elasticsearch:${ELK_VERSION} image: docker.elastic.co/elasticsearch/elasticsearch:${ELK_VERSION}
restart: always
volumes: volumes:
- "/etc/localtime:/etc/localtime:ro"
- type: bind - type: bind
source: ./config/elasticsearch.yml source: ./config/elasticsearch.yml
target: /usr/share/elasticsearch/config/elasticsearch.yml target: /usr/share/elasticsearch/config/elasticsearch.yml
...@@ -23,7 +25,9 @@ services: ...@@ -23,7 +25,9 @@ services:
logstash: logstash:
container_name: logstash container_name: logstash
image: docker.elastic.co/logstash/logstash:${ELK_VERSION} image: docker.elastic.co/logstash/logstash:${ELK_VERSION}
restart: always
volumes: volumes:
- "/etc/localtime:/etc/localtime:ro"
- type: bind - type: bind
source: ./config/logstash/logstash.yml source: ./config/logstash/logstash.yml
target: /usr/share/logstash/config/logstash.yml target: /usr/share/logstash/config/logstash.yml
...@@ -44,13 +48,19 @@ services: ...@@ -44,13 +48,19 @@ services:
kibana: kibana:
container_name: kibana container_name: kibana
image: docker.elastic.co/kibana/kibana:${ELK_VERSION} image: docker.elastic.co/kibana/kibana:${ELK_VERSION}
restart: always
volumes: volumes:
- "/etc/localtime:/etc/localtime:ro"
- type: bind - type: bind
source: ./config/kibana.yml source: ./config/kibana.yml
target: /usr/share/kibana/config/kibana.yml target: /usr/share/kibana/config/kibana.yml
read_only: true read_only: true
ports: ports:
- "5601:5601" - "127.0.0.1:5601:5601"
environment:
- "VIRTUAL_HOST=${VIRTUAL_HOST}"
- "LETSENCRYPT_HOST=${LETSENCRYPT_HOST}"
- "VIRTUAL_PORT=5601"
networks: networks:
- elk - elk
depends_on: depends_on:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment