diff --git a/.gitignore b/.gitignore index 87725e6da2f14d20435f3bf9504e84bd09dd6903..ed554be4bbe94dca5030e830ec94f18fa5ef4953 100644 --- a/.gitignore +++ b/.gitignore @@ -28,4 +28,5 @@ objectsdb/ shimmer goshimmer -config.json \ No newline at end of file +config.json +.env \ No newline at end of file diff --git a/config.default.json b/config.default.json index 3161b48f6ff436ba5113347f95791fef5bc0cf8d..059be5f0ed5343b805a56f32c75fb3d558b5c8f4 100644 --- a/config.default.json +++ b/config.default.json @@ -67,5 +67,8 @@ "username": "goshimmer" }, "bindAddress": "127.0.0.1:8080" + }, + "networkdelay": { + "originPublicKey": "9DB3j9cWYSuEEtkvanrzqkzCQMdH1FGv3TawJdVbDxkd" } } diff --git a/dapps/networkdelay/dapp.go b/dapps/networkdelay/dapp.go new file mode 100644 index 0000000000000000000000000000000000000000..36611d18db9398465168fe0d19358021db196b9f --- /dev/null +++ b/dapps/networkdelay/dapp.go @@ -0,0 +1,132 @@ +package networkdelay + +import ( + "fmt" + "time" + + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/message" + messageTangle "github.com/iotaledger/goshimmer/packages/binary/messagelayer/tangle" + "github.com/iotaledger/goshimmer/plugins/autopeering/local" + "github.com/iotaledger/goshimmer/plugins/config" + "github.com/iotaledger/goshimmer/plugins/messagelayer" + "github.com/iotaledger/goshimmer/plugins/remotelog" + "github.com/iotaledger/hive.go/crypto/ed25519" + "github.com/iotaledger/hive.go/events" + "github.com/iotaledger/hive.go/logger" + "github.com/iotaledger/hive.go/node" + "github.com/mr-tron/base58" +) + +const ( + // PluginName contains the human readable name of the plugin. + PluginName = "NetworkDelay" + + // CfgNetworkDelayOriginPublicKey defines the config flag of the issuer node public key. + CfgNetworkDelayOriginPublicKey = "networkdelay.originPublicKey" + + remoteLogType = "networkdelay" +) + +var ( + // App is the "plugin" instance of the network delay application. + App = node.NewPlugin(PluginName, node.Disabled, configure) + + // log holds a reference to the logger used by this app. + log *logger.Logger + + remoteLogger *remotelog.RemoteLoggerConn + + myID string + myPublicKey ed25519.PublicKey + originPublicKey ed25519.PublicKey +) + +func configure(_ *node.Plugin) { + // configure logger + log = logger.NewLogger(PluginName) + + remoteLogger = remotelog.RemoteLogger() + + if local.GetInstance() != nil { + myID = local.GetInstance().ID().String() + myPublicKey = local.GetInstance().PublicKey() + } + + // get origin public key from config + bytes, err := base58.Decode(config.Node.GetString(CfgNetworkDelayOriginPublicKey)) + if err != nil { + log.Fatalf("could not parse %s config entry as base58. %v", CfgNetworkDelayOriginPublicKey, err) + } + originPublicKey, _, err = ed25519.PublicKeyFromBytes(bytes) + if err != nil { + log.Fatalf("could not parse %s config entry as public key. %v", CfgNetworkDelayOriginPublicKey, err) + } + + configureWebAPI() + + // subscribe to message-layer + messagelayer.Tangle.Events.MessageSolid.Attach(events.NewClosure(onReceiveMessageFromMessageLayer)) +} + +func onReceiveMessageFromMessageLayer(cachedMessage *message.CachedMessage, cachedMessageMetadata *messageTangle.CachedMessageMetadata) { + defer cachedMessage.Release() + defer cachedMessageMetadata.Release() + + solidMessage := cachedMessage.Unwrap() + if solidMessage == nil { + log.Debug("failed to unpack solid message from message layer") + + return + } + + messagePayload := solidMessage.Payload() + if messagePayload.Type() != Type { + return + } + + // check for node identity + issuerPubKey := solidMessage.IssuerPublicKey() + if issuerPubKey != originPublicKey || issuerPubKey == myPublicKey { + return + } + + networkDelayObject, ok := messagePayload.(*Object) + if !ok { + log.Info("could not cast payload to network delay object") + + return + } + + now := time.Now().UnixNano() + + // abort if message was sent more than 1min ago + // this should only happen due to a node resyncing + fmt.Println(time.Duration(now-networkDelayObject.sentTime), time.Minute) + if time.Duration(now-networkDelayObject.sentTime) > time.Minute { + log.Debugf("Received network delay message with >1min delay\n%s", networkDelayObject) + return + } + + sendToRemoteLog(networkDelayObject, now) +} + +func sendToRemoteLog(networkDelayObject *Object, receiveTime int64) { + m := networkDelay{ + NodeID: myID, + ID: networkDelayObject.id.String(), + SentTime: networkDelayObject.sentTime, + ReceiveTime: receiveTime, + Delta: receiveTime - networkDelayObject.sentTime, + Type: remoteLogType, + } + _ = remoteLogger.Send(m) +} + +type networkDelay struct { + NodeID string `json:"nodeId"` + ID string `json:"id"` + SentTime int64 `json:"sentTime"` + ReceiveTime int64 `json:"receiveTime"` + Delta int64 `json:"delta"` + Type string `json:"type"` +} diff --git a/dapps/networkdelay/object.go b/dapps/networkdelay/object.go new file mode 100644 index 0000000000000000000000000000000000000000..5b41369019c68354bdaaa1fd9a3f5db26c8a3b6a --- /dev/null +++ b/dapps/networkdelay/object.go @@ -0,0 +1,156 @@ +package networkdelay + +import ( + "sync" + + "github.com/iotaledger/goshimmer/packages/binary/messagelayer/payload" + "github.com/iotaledger/hive.go/marshalutil" + "github.com/iotaledger/hive.go/stringify" + "github.com/mr-tron/base58" +) + +// ID represents a 32 byte ID of a network delay object. +type ID [32]byte + +// String returns a human-friendly representation of the ID. +func (id ID) String() string { + return base58.Encode(id[:]) +} + +// Object represents the network delay object type. +type Object struct { + id ID + sentTime int64 + + bytes []byte + bytesMutex sync.RWMutex +} + +// NewObject creates a new network delay object. +func NewObject(id ID, sentTime int64) *Object { + return &Object{ + id: id, + sentTime: sentTime, + } +} + +// FromBytes parses the marshaled version of an Object into a Go object. +// It either returns a new Object or fills an optionally provided Object with the parsed information. +func FromBytes(bytes []byte, optionalTargetObject ...*Object) (result *Object, consumedBytes int, err error) { + marshalUtil := marshalutil.New(bytes) + result, err = Parse(marshalUtil, optionalTargetObject...) + consumedBytes = marshalUtil.ReadOffset() + + return +} + +// Parse unmarshals an Object using the given marshalUtil (for easier marshaling/unmarshaling). +func Parse(marshalUtil *marshalutil.MarshalUtil, optionalTarget ...*Object) (result *Object, err error) { + // determine the target that will hold the unmarshaled information + switch len(optionalTarget) { + case 0: + result = &Object{} + case 1: + result = optionalTarget[0] + default: + panic("too many arguments in call to FromBytes") + } + + // read information that are required to identify the object from the outside + if _, err = marshalUtil.ReadUint32(); err != nil { + return + } + if _, err = marshalUtil.ReadUint32(); err != nil { + return + } + + // parse id + id, err := marshalUtil.ReadBytes(32) + if err != nil { + return + } + copy(result.id[:], id) + + // parse sent time + if result.sentTime, err = marshalUtil.ReadInt64(); err != nil { + return + } + + // store bytes, so we don't have to marshal manually + consumedBytes := marshalUtil.ReadOffset() + copy(result.bytes, marshalUtil.Bytes()[:consumedBytes]) + + return +} + +// Bytes returns a marshaled version of this Object. +func (o *Object) Bytes() (bytes []byte) { + // acquire lock for reading bytes + o.bytesMutex.RLock() + + // return if bytes have been determined already + if bytes = o.bytes; bytes != nil { + o.bytesMutex.RUnlock() + return + } + + // switch to write lock + o.bytesMutex.RUnlock() + o.bytesMutex.Lock() + defer o.bytesMutex.Unlock() + + // return if bytes have been determined in the mean time + if bytes = o.bytes; bytes != nil { + return + } + + objectLength := len(o.id) + marshalutil.INT64_SIZE + // initialize helper + marshalUtil := marshalutil.New(marshalutil.UINT32_SIZE + marshalutil.UINT32_SIZE + objectLength) + + // marshal the payload specific information + marshalUtil.WriteUint32(Type) + marshalUtil.WriteUint32(uint32(objectLength)) + marshalUtil.WriteBytes(o.id[:]) + marshalUtil.WriteInt64(o.sentTime) + + bytes = marshalUtil.Bytes() + + return +} + +// String returns a human-friendly representation of the Object. +func (o *Object) String() string { + return stringify.Struct("NetworkDelayObject", + stringify.StructField("id", o.id), + stringify.StructField("sentTime", uint64(o.sentTime)), + ) +} + +// region Payload implementation /////////////////////////////////////////////////////////////////////////////////////// + +// Type represents the identifier which addresses the network delay Object type. +const Type = payload.Type(189) + +// Type returns the type of the Object. +func (o *Object) Type() payload.Type { + return Type +} + +// Unmarshal unmarshals the payload from the given bytes. +func (o *Object) Unmarshal(data []byte) (err error) { + _, _, err = FromBytes(data, o) + + return +} + +func init() { + payload.RegisterType(Type, func(data []byte) (payload payload.Payload, err error) { + payload = &Object{} + err = payload.Unmarshal(data) + + return + }) +} + +// // endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/dapps/networkdelay/webapi.go b/dapps/networkdelay/webapi.go new file mode 100644 index 0000000000000000000000000000000000000000..b7ecf54f5fbf20c0e81e33c3e7dd0a5473f00727 --- /dev/null +++ b/dapps/networkdelay/webapi.go @@ -0,0 +1,38 @@ +package networkdelay + +import ( + "math/rand" + "net/http" + "time" + + "github.com/iotaledger/goshimmer/plugins/issuer" + "github.com/iotaledger/goshimmer/plugins/webapi" + "github.com/labstack/echo" +) + +func configureWebAPI() { + webapi.Server.POST("networkdelay", broadcastNetworkDelayObject) +} + +// broadcastNetworkDelayObject creates a message with a network delay object and +// broadcasts it to the node's neighbors. It returns the message ID if successful. +func broadcastNetworkDelayObject(c echo.Context) error { + // generate random id + rand.Seed(time.Now().UnixNano()) + var id [32]byte + if _, err := rand.Read(id[:]); err != nil { + return c.JSON(http.StatusInternalServerError, Response{Error: err.Error()}) + } + + msg, err := issuer.IssuePayload(NewObject(id, time.Now().UnixNano())) + if err != nil { + return c.JSON(http.StatusBadRequest, Response{Error: err.Error()}) + } + return c.JSON(http.StatusOK, Response{ID: msg.Id().String()}) +} + +// Response contains the ID of the message sent. +type Response struct { + ID string `json:"id,omitempty"` + Error string `json:"error,omitempty"` +} diff --git a/pluginmgr/research/plugins.go b/pluginmgr/research/plugins.go index 1b2e701623c77e9a2f77c3ca4531d3efd8a00632..6e3b7bd39a3bd1cbed30585d4bd03054088ad35f 100644 --- a/pluginmgr/research/plugins.go +++ b/pluginmgr/research/plugins.go @@ -1,6 +1,7 @@ package research import ( + "github.com/iotaledger/goshimmer/dapps/networkdelay" analysisclient "github.com/iotaledger/goshimmer/plugins/analysis/client" analysisdashboard "github.com/iotaledger/goshimmer/plugins/analysis/dashboard" analysisserver "github.com/iotaledger/goshimmer/plugins/analysis/server" @@ -13,4 +14,5 @@ var PLUGINS = node.Plugins( analysisserver.Plugin, analysisclient.Plugin, analysisdashboard.Plugin, + networkdelay.App, ) diff --git a/plugins/remotelog/plugin.go b/plugins/remotelog/plugin.go index 32a507786ca41e984517e8f103e64030e86b0045..298038d337d303007ce73879fe92d942e3006791 100644 --- a/plugins/remotelog/plugin.go +++ b/plugins/remotelog/plugin.go @@ -5,12 +5,10 @@ package remotelog import ( - "encoding/json" - "fmt" - "net" "os" "path/filepath" "runtime" + "sync" "time" "github.com/iotaledger/goshimmer/packages/shutdown" @@ -26,17 +24,6 @@ import ( "gopkg.in/src-d/go-git.v4" ) -type logMessage struct { - Version string `json:"version"` - GitHead string `json:"gitHead,omitempty"` - GitBranch string `json:"gitBranch,omitempty"` - NodeID string `json:"nodeId"` - Level string `json:"level"` - Name string `json:"name"` - Msg string `json:"msg"` - Timestamp time.Time `json:"timestamp"` -} - const ( // CfgLoggerRemotelogServerAddress defines the config flag of the server address. CfgLoggerRemotelogServerAddress = "logger.remotelog.serverAddress" @@ -44,17 +31,21 @@ const ( CfgDisableEvents = "logger.disableEvents" // PluginName is the name of the remote log plugin. PluginName = "RemoteLog" + + remoteLogType = "log" ) var ( // Plugin is the plugin instance of the remote plugin instance. Plugin = node.NewPlugin(PluginName, node.Disabled, configure, run) log *logger.Logger - conn net.Conn myID string myGitHead string myGitBranch string workerPool *workerpool.WorkerPool + + remoteLogger *RemoteLoggerConn + remoteLoggerOnce sync.Once ) func init() { @@ -69,12 +60,8 @@ func configure(plugin *node.Plugin) { return } - c, err := net.Dial("udp", config.Node.GetString(CfgLoggerRemotelogServerAddress)) - if err != nil { - log.Fatalf("Could not create UDP socket to '%s'. %v", config.Node.GetString(CfgLoggerRemotelogServerAddress), err) - return - } - conn = c + // initialize remote logger connection + RemoteLogger() if local.GetInstance() != nil { myID = local.GetInstance().ID().String() @@ -117,9 +104,10 @@ func sendLogMsg(level logger.Level, name string, msg string) { name, msg, time.Now(), + remoteLogType, } - b, _ := json.Marshal(m) - fmt.Fprint(conn, string(b)) + + _ = RemoteLogger().Send(m) } func getGitInfo() { @@ -159,3 +147,30 @@ func getGitDir() string { return gitDir } + +// RemoteLogger represents a connection to our remote log server. +func RemoteLogger() *RemoteLoggerConn { + remoteLoggerOnce.Do(func() { + r, err := newRemoteLoggerConn(config.Node.GetString(CfgLoggerRemotelogServerAddress)) + if err != nil { + log.Fatal(err) + return + } + + remoteLogger = r + }) + + return remoteLogger +} + +type logMessage struct { + Version string `json:"version"` + GitHead string `json:"gitHead,omitempty"` + GitBranch string `json:"gitBranch,omitempty"` + NodeID string `json:"nodeId"` + Level string `json:"level"` + Name string `json:"name"` + Msg string `json:"msg"` + Timestamp time.Time `json:"timestamp"` + Type string `json:"type"` +} diff --git a/plugins/remotelog/remotelogger.go b/plugins/remotelog/remotelogger.go new file mode 100644 index 0000000000000000000000000000000000000000..d2fab8432ae3d28c454a4126eeff8cfc7ede8413 --- /dev/null +++ b/plugins/remotelog/remotelogger.go @@ -0,0 +1,35 @@ +package remotelog + +import ( + "encoding/json" + "fmt" + "net" +) + +// RemoteLoggerConn is a wrapper for a connection to our RemoteLog server. +type RemoteLoggerConn struct { + conn net.Conn +} + +func newRemoteLoggerConn(address string) (*RemoteLoggerConn, error) { + c, err := net.Dial("udp", address) + if err != nil { + return nil, fmt.Errorf("could not create UDP socket to '%s'. %v", address, err) + } + + return &RemoteLoggerConn{conn: c}, nil +} + +// Send sends a message on the RemoteLoggers connection. +func (r *RemoteLoggerConn) Send(msg interface{}) error { + b, err := json.Marshal(msg) + if err != nil { + return err + } + _, err = r.conn.Write(b) + if err != nil { + return err + } + + return nil +} diff --git a/plugins/remotelog/server/.env b/plugins/remotelog/server/.env deleted file mode 100644 index 68aaea9a63f638a31870bb929bbc323409a12ebc..0000000000000000000000000000000000000000 --- a/plugins/remotelog/server/.env +++ /dev/null @@ -1 +0,0 @@ -ELK_VERSION=7.5.2 \ No newline at end of file diff --git a/plugins/remotelog/server/.env.default b/plugins/remotelog/server/.env.default new file mode 100644 index 0000000000000000000000000000000000000000..89b017525eecb4a7a0f397ea8bfee33b8bf05ad8 --- /dev/null +++ b/plugins/remotelog/server/.env.default @@ -0,0 +1,3 @@ +ELK_VERSION=7.5.2 +VIRTUAL_HOST= +LETSENCRYPT_HOST= \ No newline at end of file diff --git a/plugins/remotelog/server/config/logstash/pipeline/logstash.conf b/plugins/remotelog/server/config/logstash/pipeline/logstash.conf index fcc2ccbc0a770d42433a8323a5664fa956f20c8c..c5d451334c6e63916d5aa88a49fa292cc327a40d 100644 --- a/plugins/remotelog/server/config/logstash/pipeline/logstash.conf +++ b/plugins/remotelog/server/config/logstash/pipeline/logstash.conf @@ -20,7 +20,15 @@ filter { } output { - elasticsearch { - hosts => "elasticsearch:9200" - } +# stdout {codec => rubydebug} + if [log][type] == "networkdelay" { + elasticsearch { + hosts => "elasticsearch:9200" + index => "networkdelay" + } + } else { + elasticsearch { + hosts => "elasticsearch:9200" + } + } } \ No newline at end of file diff --git a/plugins/remotelog/server/docker-compose.yml b/plugins/remotelog/server/docker-compose.yml index 69ed87d88d86cb6f112359e7de0528274730f6e7..02784c0051b36d206344b8935e06180e801881a0 100644 --- a/plugins/remotelog/server/docker-compose.yml +++ b/plugins/remotelog/server/docker-compose.yml @@ -4,7 +4,9 @@ services: elasticsearch: container_name: elasticsearch image: docker.elastic.co/elasticsearch/elasticsearch:${ELK_VERSION} + restart: always volumes: + - "/etc/localtime:/etc/localtime:ro" - type: bind source: ./config/elasticsearch.yml target: /usr/share/elasticsearch/config/elasticsearch.yml @@ -23,7 +25,9 @@ services: logstash: container_name: logstash image: docker.elastic.co/logstash/logstash:${ELK_VERSION} + restart: always volumes: + - "/etc/localtime:/etc/localtime:ro" - type: bind source: ./config/logstash/logstash.yml target: /usr/share/logstash/config/logstash.yml @@ -44,13 +48,19 @@ services: kibana: container_name: kibana image: docker.elastic.co/kibana/kibana:${ELK_VERSION} + restart: always volumes: + - "/etc/localtime:/etc/localtime:ro" - type: bind source: ./config/kibana.yml target: /usr/share/kibana/config/kibana.yml read_only: true ports: - - "5601:5601" + - "127.0.0.1:5601:5601" + environment: + - "VIRTUAL_HOST=${VIRTUAL_HOST}" + - "LETSENCRYPT_HOST=${LETSENCRYPT_HOST}" + - "VIRTUAL_PORT=5601" networks: - elk depends_on: