Skip to content
Snippets Groups Projects
Unverified Commit 6026d479 authored by Levente Pap's avatar Levente Pap Committed by GitHub
Browse files

Use TLV protocol for Analysis Plugin (#424)

* Refactor heartbeat packet into message for TLV

* Refactor Analysis-Server for TLV protocol

* Adjust Heartbeat packet tests for TLV

* Remove obsolete ping packet

* Add comments

* Add message registry for analysis packet types

* Update hive.go

* Refactor analysis message registry initialization

* go mod tidy for integration tests

* format files with gofmt

* Linter warning fix

* Small fix
parent 0a6095d0
No related branches found
No related tags found
No related merge requests found
......@@ -11,7 +11,7 @@ require (
github.com/gobuffalo/packr/v2 v2.7.1
github.com/golang/protobuf v1.3.5
github.com/gorilla/websocket v1.4.1
github.com/iotaledger/hive.go v0.0.0-20200508125657-76ee9eb66cf8
github.com/iotaledger/hive.go v0.0.0-20200513180357-f0ac8c45b754
github.com/iotaledger/iota.go v1.0.0-beta.14
github.com/labstack/echo v3.3.10+incompatible
github.com/labstack/gommon v0.3.0
......
package packet
import (
"bytes"
"crypto/sha256"
"encoding/binary"
"errors"
"fmt"
"github.com/iotaledger/hive.go/protocol/message"
"github.com/iotaledger/hive.go/protocol/tlv"
)
var (
......@@ -16,21 +21,34 @@ const (
HeartbeatMaxOutboundPeersCount = 4
// HeartbeatMaxInboundPeersCount is the maximum amount of inbound peer IDs a heartbeat packet can contain.
HeartbeatMaxInboundPeersCount = 4
// HeartbeatPacketHeader is the byte value denoting a heartbeat packet.
HeartbeatPacketHeader = 0x01
// HeartbeatPacketHeaderSize is the byte size of the heartbeat header.
HeartbeatPacketHeaderSize = 1
// HeartbeatPacketPeerIDSize is the byte size of peer IDs within the heartbeat packet.
HeartbeatPacketPeerIDSize = sha256.Size
// HeartbeatPacketOutboundIDCountSize is the byte size of the counter indicating the amount of outbound IDs.
HeartbeatPacketOutboundIDCountSize = 1
)
var (
// HeartbeatPacketMinSize is the minimum byte size of a heartbeat packet.
HeartbeatPacketMinSize = HeartbeatPacketHeaderSize + HeartbeatPacketPeerIDSize + HeartbeatPacketOutboundIDCountSize
HeartbeatPacketMinSize = HeartbeatPacketPeerIDSize + HeartbeatPacketOutboundIDCountSize
// HeartbeatPacketMaxSize is the maximum size a heartbeat packet can have.
HeartbeatPacketMaxSize = HeartbeatPacketHeaderSize + HeartbeatPacketPeerIDSize + HeartbeatPacketOutboundIDCountSize +
HeartbeatPacketMaxSize = HeartbeatPacketPeerIDSize + HeartbeatPacketOutboundIDCountSize +
HeartbeatMaxOutboundPeersCount*sha256.Size + HeartbeatMaxInboundPeersCount*sha256.Size
)
const (
// MessageTypeHeartbeat is the unique id of a heartbeat message for analysis.
MessageTypeHeartbeat message.Type = 1
)
var (
// HeartbeatMessageDefinition defines a heartbeat message's format.
HeartbeatMessageDefinition = &message.Definition{
ID: MessageTypeHeartbeat,
MaxBytesLength: uint16(HeartbeatPacketMaxSize),
VariableLength: true,
}
)
// Heartbeat represents a heartbeat packet.
type Heartbeat struct {
// The ID of the node who sent the heartbeat.
......@@ -44,7 +62,7 @@ type Heartbeat struct {
InboundIDs [][]byte
}
// ParseHeartbeat parses a slice of bytes into a heartbeat.
// ParseHeartbeat parses a slice of bytes (serialized packet) into a heartbeat.
func ParseHeartbeat(data []byte) (*Heartbeat, error) {
// check minimum size
if len(data) < HeartbeatPacketMinSize {
......@@ -55,11 +73,6 @@ func ParseHeartbeat(data []byte) (*Heartbeat, error) {
return nil, fmt.Errorf("%w: packet exceeds maximum heartbeat packet size of %d", ErrMalformedPacket, HeartbeatPacketMaxSize)
}
// check whether we're actually dealing with a heartbeat packet
if data[0] != HeartbeatPacketHeader {
return nil, fmt.Errorf("%w: packet isn't of type heartbeat", ErrMalformedPacket)
}
// sanity check: packet len - min packet % id size = 0,
// since we're only dealing with IDs from that offset
if (len(data)-HeartbeatPacketMinSize)%HeartbeatPacketPeerIDSize != 0 {
......@@ -68,7 +81,7 @@ func ParseHeartbeat(data []byte) (*Heartbeat, error) {
// copy own ID
ownID := make([]byte, HeartbeatPacketPeerIDSize)
copy(ownID, data[HeartbeatPacketHeaderSize:HeartbeatPacketHeaderSize+HeartbeatPacketPeerIDSize])
copy(ownID, data[:HeartbeatPacketPeerIDSize])
// read outbound IDs count
outboundIDCount := int(data[HeartbeatPacketMinSize-1])
......@@ -100,7 +113,7 @@ func ParseHeartbeat(data []byte) (*Heartbeat, error) {
// inbound IDs can be zero
inboundIDs := make([][]byte, inboundIDCount)
offset := HeartbeatPacketHeaderSize + HeartbeatPacketPeerIDSize + HeartbeatPacketOutboundIDCountSize + outboundIDCount*HeartbeatPacketPeerIDSize
offset := HeartbeatPacketPeerIDSize + HeartbeatPacketOutboundIDCountSize + outboundIDCount*HeartbeatPacketPeerIDSize
for i := range inboundIDs {
inboundIDs[i] = make([]byte, HeartbeatPacketPeerIDSize)
copy(inboundIDs[i], data[offset+i*HeartbeatPacketPeerIDSize:offset+(i+1)*HeartbeatPacketPeerIDSize])
......@@ -109,7 +122,8 @@ func ParseHeartbeat(data []byte) (*Heartbeat, error) {
return &Heartbeat{OwnID: ownID, OutboundIDs: outboundIDs, InboundIDs: inboundIDs}, nil
}
// NewHeartbeatMessage serializes the given heartbeat into a byte slice.
// NewHeartbeatMessage serializes the given heartbeat into a byte slice and adds a tlv header to the packet.
// message = tlv header + serialized packet
func NewHeartbeatMessage(hb *Heartbeat) ([]byte, error) {
if len(hb.InboundIDs) > HeartbeatMaxInboundPeersCount {
return nil, fmt.Errorf("%w: heartbeat exceeds maximum inbound IDs of %d", ErrInvalidHeartbeat, HeartbeatMaxInboundPeersCount)
......@@ -126,14 +140,11 @@ func NewHeartbeatMessage(hb *Heartbeat) ([]byte, error) {
packetSize := HeartbeatPacketMinSize + len(hb.OutboundIDs)*HeartbeatPacketPeerIDSize + len(hb.InboundIDs)*HeartbeatPacketPeerIDSize
packet := make([]byte, packetSize)
// header byte
packet[0] = HeartbeatPacketHeader
// own nodeId
copy(packet[HeartbeatPacketHeaderSize:HeartbeatPacketHeaderSize+HeartbeatPacketPeerIDSize], hb.OwnID[:])
copy(packet[:HeartbeatPacketPeerIDSize], hb.OwnID[:])
// outbound id count
packet[HeartbeatPacketHeaderSize+HeartbeatPacketPeerIDSize] = byte(len(hb.OutboundIDs))
packet[HeartbeatPacketPeerIDSize] = byte(len(hb.OutboundIDs))
// copy contents of hb.OutboundIDs
offset := HeartbeatPacketMinSize
......@@ -149,5 +160,16 @@ func NewHeartbeatMessage(hb *Heartbeat) ([]byte, error) {
copy(packet[offset+i*HeartbeatPacketPeerIDSize:offset+(i+1)*HeartbeatPacketPeerIDSize], inboundID[:HeartbeatPacketPeerIDSize])
}
return packet, nil
// create a buffer for tlv header plus the packet
buf := bytes.NewBuffer(make([]byte, 0, tlv.HeaderMessageDefinition.MaxBytesLength+uint16(packetSize)))
// write tlv header into buffer
if err := tlv.WriteHeader(buf, MessageTypeHeartbeat, uint16(packetSize)); err != nil {
return nil, err
}
// write serialized packet bytes into the buffer
if err := binary.Write(buf, binary.LittleEndian, packet); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
......@@ -6,6 +6,7 @@ import (
"testing"
. "github.com/iotaledger/goshimmer/plugins/analysis/packet"
"github.com/iotaledger/hive.go/protocol/tlv"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
......@@ -91,18 +92,19 @@ func TestNewHeartbeatMessage(t *testing.T) {
for _, testCase := range testCases {
hb := testCase.hb
serializedHb, err := NewHeartbeatMessage(hb)
tlvHeaderLength := int(tlv.HeaderMessageDefinition.MaxBytesLength)
if testCase.err != nil {
require.True(t, errors.Is(err, testCase.err))
continue
}
require.NoError(t, err, "heartbeat should have been serialized successfully")
assert.EqualValues(t, HeartbeatPacketHeader, serializedHb[0], "expected heartbeat header value as first byte")
assert.EqualValues(t, hb.OwnID[:], serializedHb[HeartbeatPacketHeaderSize:HeartbeatPacketHeaderSize+HeartbeatPacketPeerIDSize], "expected own peer id to be within range of %d:%d", HeartbeatPacketHeaderSize, HeartbeatPacketPeerIDSize)
assert.EqualValues(t, len(hb.OutboundIDs), serializedHb[HeartbeatPacketHeaderSize+HeartbeatPacketPeerIDSize], "expected outbound IDs count of %d", len(hb.OutboundIDs))
assert.EqualValues(t, MessageTypeHeartbeat, serializedHb[0], "expected heartbeat message type tlv header value as first byte")
assert.EqualValues(t, hb.OwnID[:], serializedHb[tlvHeaderLength:tlvHeaderLength+HeartbeatPacketPeerIDSize], "expected own peer id to be within range of %d:%d", tlvHeaderLength, tlvHeaderLength+HeartbeatPacketPeerIDSize)
assert.EqualValues(t, len(hb.OutboundIDs), serializedHb[tlvHeaderLength+HeartbeatPacketPeerIDSize], "expected outbound IDs count of %d", len(hb.OutboundIDs))
// after the outbound IDs count, the outbound IDs are serialized
offset := HeartbeatPacketMinSize
offset := int(tlvHeaderLength) + HeartbeatPacketMinSize
for i := 0; i < len(hb.OutboundIDs); i++ {
assert.EqualValues(t, hb.OutboundIDs[i], serializedHb[offset+i*HeartbeatPacketPeerIDSize:offset+(i+1)*HeartbeatPacketPeerIDSize], "outbound ID at the given position doesn't match")
}
......@@ -117,6 +119,7 @@ func TestNewHeartbeatMessage(t *testing.T) {
}
func TestParseHeartbeat(t *testing.T) {
tlvHeaderLength := int(tlv.HeaderMessageDefinition.MaxBytesLength)
type testcase struct {
source []byte
expected *Heartbeat
......@@ -126,8 +129,10 @@ func TestParseHeartbeat(t *testing.T) {
// ok
func() testcase {
hb := &Heartbeat{OwnID: ownID[:], OutboundIDs: make([][]byte, 0), InboundIDs: make([][]byte, 0)}
// message = tlv header + packet
// ParseHeartbeat() expects only the packet, hence serializedHb[tlvHeaderLength:]
serializedHb, _ := NewHeartbeatMessage(hb)
return testcase{source: serializedHb, expected: hb, err: nil}
return testcase{source: serializedHb[tlvHeaderLength:], expected: hb, err: nil}
}(),
// ok
func() testcase {
......@@ -140,15 +145,10 @@ func TestParseHeartbeat(t *testing.T) {
inboundIDs[i] = inboundID[:]
}
hb := &Heartbeat{OwnID: ownID[:], OutboundIDs: outboundIDs, InboundIDs: inboundIDs}
// message = tlv header + packet
// ParseHeartbeat() expects only the packet, hence serializedHb[tlvHeaderLength:]
serializedHb, _ := NewHeartbeatMessage(hb)
return testcase{source: serializedHb, expected: hb, err: nil}
}(),
// err, modify wrong packet header
func() testcase {
hb := &Heartbeat{OwnID: ownID[:], OutboundIDs: make([][]byte, 0), InboundIDs: make([][]byte, 0)}
serializedHb, _ := NewHeartbeatMessage(hb)
serializedHb[0] = 0xff
return testcase{source: serializedHb, expected: nil, err: ErrMalformedPacket}
return testcase{source: serializedHb[tlvHeaderLength:], expected: hb, err: nil}
}(),
// err, exceeds max inbound peer IDs
func() testcase {
......@@ -164,21 +164,29 @@ func TestParseHeartbeat(t *testing.T) {
inboundIDs[i] = inboundID[:]
}
hb := &Heartbeat{OwnID: ownID[:], OutboundIDs: outboundIDs, InboundIDs: inboundIDs}
// message = tlv header + packet
// ParseHeartbeat() expects only the packet, hence serializedHb[tlvHeaderLength:]
serializedHb, _ := NewHeartbeatMessage(hb)
// add an additional peer id
serializedHb = append(serializedHb, ownID[:]...)
return testcase{source: serializedHb, expected: hb, err: ErrMalformedPacket}
return testcase{source: serializedHb[tlvHeaderLength:], expected: hb, err: ErrMalformedPacket}
}(),
// err, exceeds max outbound peer IDs
func() testcase {
outboundIDs := make([][]byte, HeartbeatMaxOutboundPeersCount+1)
for i := 0; i < HeartbeatMaxInboundPeersCount+1; i++ {
outboundIDs := make([][]byte, HeartbeatMaxOutboundPeersCount)
for i := 0; i < HeartbeatMaxInboundPeersCount; i++ {
outboundID := sha256.Sum256([]byte{byte(i)})
outboundIDs[i] = outboundID[:]
}
hb := &Heartbeat{OwnID: ownID[:], OutboundIDs: outboundIDs, InboundIDs: make([][]byte, 0)}
// NewHeartbeatMessage would return nil and and error for a malformed packet (too many outbound peer IDs)
// so we create a correct message(tlv header + packet)
serializedHb, _ := NewHeartbeatMessage(hb)
return testcase{source: serializedHb, expected: hb, err: ErrMalformedPacket}
// and add an extra outbound ID (inbound IDs are zero)
serializedHb = append(serializedHb, ownID[:]...)
// manually overwrite outboundIDCount
serializedHb[tlvHeaderLength+HeartbeatPacketMinSize-1] = HeartbeatMaxOutboundPeersCount + 1
return testcase{source: serializedHb[tlvHeaderLength:], expected: hb, err: ErrMalformedPacket}
}(),
// err, advertised outbound ID count is bigger than remaining data
func() testcase {
......@@ -188,11 +196,13 @@ func TestParseHeartbeat(t *testing.T) {
outboundIDs[i] = outboundID[:]
}
hb := &Heartbeat{OwnID: ownID[:], OutboundIDs: outboundIDs, InboundIDs: make([][]byte, 0)}
// message = tlv header + packet
// ParseHeartbeat() expects only the packet, hence serializedHb[tlvHeaderLength:]
serializedHb, _ := NewHeartbeatMessage(hb)
// we set the count to HeartbeatMaxOutboundPeersCount but we only have HeartbeatMaxOutboundPeersCount - 1
// actually serialized in the packet
serializedHb[HeartbeatPacketMinSize-1] = HeartbeatMaxOutboundPeersCount
return testcase{source: serializedHb, expected: nil, err: ErrMalformedPacket}
serializedHb[tlvHeaderLength+HeartbeatPacketMinSize-1] = HeartbeatMaxOutboundPeersCount
return testcase{source: serializedHb[tlvHeaderLength:], expected: nil, err: ErrMalformedPacket}
}(),
// err, doesn't reach minimum packet size
func() testcase {
......
package packet
import "errors"
import (
"errors"
"github.com/iotaledger/hive.go/protocol/message"
"github.com/iotaledger/hive.go/protocol/tlv"
)
var (
// ErrMalformedPacket is returned when malformed packets are tried to be parsed.
ErrMalformedPacket = errors.New("malformed packet")
)
// AnalysisMsgRegistry holds all message definitions for analysis server related messages
var AnalysisMsgRegistry *message.Registry
func init() {
// message definitions to be registered in registry
definitions := []*message.Definition{
tlv.HeaderMessageDefinition,
HeartbeatMessageDefinition,
}
AnalysisMsgRegistry = message.NewRegistry(definitions)
}
package packet
import (
"fmt"
)
const (
// PingPacketHeader is the byte value denoting a ping packet.
PingPacketHeader = 0x00
// PingPacketSize is the size of a ping packet.
PingPacketSize = 1
)
// Ping defines a ping as an empty struct.
type Ping struct{}
// ParsePing parses a slice of bytes into a ping.
func ParsePing(data []byte) (*Ping, error) {
if len(data) != PingPacketSize {
return nil, fmt.Errorf("%w: packet doesn't match ping packet size of %d", ErrMalformedPacket, PingPacketSize)
}
if data[0] != PingPacketHeader {
return nil, fmt.Errorf("%w: packet isn't of type ping", ErrMalformedPacket)
}
return &Ping{}, nil
}
// NewPingMessage creates a new ping message.
func NewPingMessage() []byte {
return []byte{PingPacketHeader}
}
package server
import (
"errors"
"net"
"strconv"
"time"
......@@ -15,6 +14,7 @@ import (
"github.com/iotaledger/hive.go/network"
"github.com/iotaledger/hive.go/network/tcp"
"github.com/iotaledger/hive.go/node"
"github.com/iotaledger/hive.go/protocol"
flag "github.com/spf13/pflag"
)
......@@ -27,8 +27,6 @@ const (
// IdleTimeout defines the idle timeout.
IdleTimeout = 10 * time.Second
// StateHeartbeat defines the state of the heartbeat.
StateHeartbeat = packet.HeartbeatPacketHeader
)
func init() {
......@@ -36,8 +34,6 @@ func init() {
}
var (
// ErrInvalidPacketHeader defines an invalid packet header error.
ErrInvalidPacketHeader = errors.New("invalid packet header")
// Plugin is the plugin instance of the analysis server plugin.
Plugin = node.NewPlugin(PluginName, node.Disabled, configure, run)
server *tcp.TCPServer
......@@ -77,9 +73,6 @@ func run(_ *node.Plugin) {
}
}
// ConnectionState defines the type of a connection state as a byte
type ConnectionState = byte
// HandleConnection handles the given connection.
func HandleConnection(conn *network.ManagedConnection) {
err := conn.SetTimeout(IdleTimeout)
......@@ -87,14 +80,15 @@ func HandleConnection(conn *network.ManagedConnection) {
log.Errorf(err.Error())
}
var connectionState byte
var receiveBuffer []byte
var onDisconnect *events.Closure
// create new protocol instance
p := protocol.New(conn, packet.AnalysisMsgRegistry)
onReceiveData := events.NewClosure(func(data []byte) {
processIncomingPacket(&connectionState, &receiveBuffer, conn, data)
// process incoming data in protocol.Receive()
p.Receive(data)
})
var onDisconnect *events.Closure
onDisconnect = events.NewClosure(func() {
conn.Events.ReceiveData.Detach(onReceiveData)
conn.Events.Close.Detach(onDisconnect)
......@@ -103,57 +97,26 @@ func HandleConnection(conn *network.ManagedConnection) {
conn.Events.ReceiveData.Attach(onReceiveData)
conn.Events.Close.Attach(onDisconnect)
maxPacketsSize := getMaxPacketSize(packet.HeartbeatPacketMaxSize)
go conn.Read(make([]byte, maxPacketsSize))
}
func getMaxPacketSize(packetSizes ...int) int {
maxPacketSize := 0
for _, packetSize := range packetSizes {
if packetSize > maxPacketSize {
maxPacketSize = packetSize
}
}
return maxPacketSize
}
// connect protocol events to processors
wireUp(p)
func processIncomingPacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte) {
var err error
if *connectionState, *receiveBuffer, err = parsePackageHeader(data); err != nil {
Events.Error.Trigger(err)
conn.Close()
return
// starts the protocol and reads from its connection
go p.Start()
}
switch *connectionState {
case StateHeartbeat:
processHeartbeatPacket(connectionState, receiveBuffer, conn, data)
}
}
func parsePackageHeader(data []byte) (ConnectionState, []byte, error) {
var connectionState ConnectionState
var receiveBuffer []byte
switch data[0] {
case packet.HeartbeatPacketHeader:
receiveBuffer = make([]byte, packet.HeartbeatPacketMaxSize)
connectionState = StateHeartbeat
default:
return 0, nil, ErrInvalidPacketHeader
}
return connectionState, receiveBuffer, nil
// wireUp connects the Received events of the protocol to the packet specific processor
func wireUp(p *protocol.Protocol) {
p.Events.Received[packet.MessageTypeHeartbeat].Attach(events.NewClosure(func(data []byte) {
processHeartbeatPacket(data, p)
}))
}
func processHeartbeatPacket(_ *byte, _ *[]byte, conn *network.ManagedConnection, data []byte) {
// processHeartbeatPacket parses the serialized data into a Heartbeat packet and triggers its event
func processHeartbeatPacket(data []byte, p *protocol.Protocol) {
heartbeatPacket, err := packet.ParseHeartbeat(data)
if err != nil {
Events.Error.Trigger(err)
conn.Close()
p.CloseConnection()
return
}
Events.Heartbeat.Trigger(heartbeatPacket)
......
......@@ -10,7 +10,7 @@ require (
github.com/docker/go-units v0.4.0 // indirect
github.com/drand/drand v0.8.1
github.com/iotaledger/goshimmer v0.1.3
github.com/iotaledger/hive.go v0.0.0-20200508125657-76ee9eb66cf8
github.com/iotaledger/hive.go v0.0.0-20200513180357-f0ac8c45b754
github.com/opencontainers/go-digest v1.0.0-rc1 // indirect
github.com/stretchr/testify v1.5.1
)
......
......@@ -138,8 +138,8 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/iotaledger/hive.go v0.0.0-20200508125657-76ee9eb66cf8 h1:b2CAofhKmJDLkPC9ul88KXa3BU5zRHTZp3TOPbIzSsg=
github.com/iotaledger/hive.go v0.0.0-20200508125657-76ee9eb66cf8/go.mod h1:HgYsLMzyQV+eaiUrxa1c7qvH9Jwi2ncycqtlw+Lczhs=
github.com/iotaledger/hive.go v0.0.0-20200513180357-f0ac8c45b754 h1:UCyAisLvAuKIWf2bMz+iYYgjGdHS7H4W2wMTpWg9yl8=
github.com/iotaledger/hive.go v0.0.0-20200513180357-f0ac8c45b754/go.mod h1:HgYsLMzyQV+eaiUrxa1c7qvH9Jwi2ncycqtlw+Lczhs=
github.com/iotaledger/iota.go v1.0.0-beta.9/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8=
github.com/iotaledger/iota.go v1.0.0-beta.14/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8=
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo=
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment