diff --git a/go.mod b/go.mod index 8dbbb46d25141f6bda1e7c9181a6c7bebb706de4..d2f4b1558b4aa545b6fbbabb9a3199cab8cbc302 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/gobuffalo/packr/v2 v2.7.1 github.com/golang/protobuf v1.3.5 github.com/gorilla/websocket v1.4.1 - github.com/iotaledger/hive.go v0.0.0-20200508125657-76ee9eb66cf8 + github.com/iotaledger/hive.go v0.0.0-20200513180357-f0ac8c45b754 github.com/iotaledger/iota.go v1.0.0-beta.14 github.com/labstack/echo v3.3.10+incompatible github.com/labstack/gommon v0.3.0 diff --git a/go.sum b/go.sum index c39e6d8c0a5f34536257b79ccff6d35658cc0f30..bc4b41436c3c6873e7f1d56641cb4f465c259ef2 100644 --- a/go.sum +++ b/go.sum @@ -139,8 +139,8 @@ github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= -github.com/iotaledger/hive.go v0.0.0-20200508125657-76ee9eb66cf8 h1:b2CAofhKmJDLkPC9ul88KXa3BU5zRHTZp3TOPbIzSsg= -github.com/iotaledger/hive.go v0.0.0-20200508125657-76ee9eb66cf8/go.mod h1:HgYsLMzyQV+eaiUrxa1c7qvH9Jwi2ncycqtlw+Lczhs= +github.com/iotaledger/hive.go v0.0.0-20200513180357-f0ac8c45b754 h1:UCyAisLvAuKIWf2bMz+iYYgjGdHS7H4W2wMTpWg9yl8= +github.com/iotaledger/hive.go v0.0.0-20200513180357-f0ac8c45b754/go.mod h1:HgYsLMzyQV+eaiUrxa1c7qvH9Jwi2ncycqtlw+Lczhs= github.com/iotaledger/iota.go v1.0.0-beta.9/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8= github.com/iotaledger/iota.go v1.0.0-beta.14 h1:Oeb28MfBuJEeXcGrLhTCJFtbsnc8y1u7xidsAmiOD5A= github.com/iotaledger/iota.go v1.0.0-beta.14/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8= diff --git a/plugins/analysis/packet/heartbeat.go b/plugins/analysis/packet/heartbeat.go index ddfda06ff53b422a4ed9aa2f7a8751a183f39a83..bf6ef3ff9db984c80b303a169031774cc2d04f41 100644 --- a/plugins/analysis/packet/heartbeat.go +++ b/plugins/analysis/packet/heartbeat.go @@ -1,9 +1,14 @@ package packet import ( + "bytes" "crypto/sha256" + "encoding/binary" "errors" "fmt" + + "github.com/iotaledger/hive.go/protocol/message" + "github.com/iotaledger/hive.go/protocol/tlv" ) var ( @@ -16,21 +21,34 @@ const ( HeartbeatMaxOutboundPeersCount = 4 // HeartbeatMaxInboundPeersCount is the maximum amount of inbound peer IDs a heartbeat packet can contain. HeartbeatMaxInboundPeersCount = 4 - // HeartbeatPacketHeader is the byte value denoting a heartbeat packet. - HeartbeatPacketHeader = 0x01 - // HeartbeatPacketHeaderSize is the byte size of the heartbeat header. - HeartbeatPacketHeaderSize = 1 // HeartbeatPacketPeerIDSize is the byte size of peer IDs within the heartbeat packet. HeartbeatPacketPeerIDSize = sha256.Size // HeartbeatPacketOutboundIDCountSize is the byte size of the counter indicating the amount of outbound IDs. HeartbeatPacketOutboundIDCountSize = 1 +) + +var ( // HeartbeatPacketMinSize is the minimum byte size of a heartbeat packet. - HeartbeatPacketMinSize = HeartbeatPacketHeaderSize + HeartbeatPacketPeerIDSize + HeartbeatPacketOutboundIDCountSize + HeartbeatPacketMinSize = HeartbeatPacketPeerIDSize + HeartbeatPacketOutboundIDCountSize // HeartbeatPacketMaxSize is the maximum size a heartbeat packet can have. - HeartbeatPacketMaxSize = HeartbeatPacketHeaderSize + HeartbeatPacketPeerIDSize + HeartbeatPacketOutboundIDCountSize + + HeartbeatPacketMaxSize = HeartbeatPacketPeerIDSize + HeartbeatPacketOutboundIDCountSize + HeartbeatMaxOutboundPeersCount*sha256.Size + HeartbeatMaxInboundPeersCount*sha256.Size ) +const ( + // MessageTypeHeartbeat is the unique id of a heartbeat message for analysis. + MessageTypeHeartbeat message.Type = 1 +) + +var ( + // HeartbeatMessageDefinition defines a heartbeat message's format. + HeartbeatMessageDefinition = &message.Definition{ + ID: MessageTypeHeartbeat, + MaxBytesLength: uint16(HeartbeatPacketMaxSize), + VariableLength: true, + } +) + // Heartbeat represents a heartbeat packet. type Heartbeat struct { // The ID of the node who sent the heartbeat. @@ -44,7 +62,7 @@ type Heartbeat struct { InboundIDs [][]byte } -// ParseHeartbeat parses a slice of bytes into a heartbeat. +// ParseHeartbeat parses a slice of bytes (serialized packet) into a heartbeat. func ParseHeartbeat(data []byte) (*Heartbeat, error) { // check minimum size if len(data) < HeartbeatPacketMinSize { @@ -55,11 +73,6 @@ func ParseHeartbeat(data []byte) (*Heartbeat, error) { return nil, fmt.Errorf("%w: packet exceeds maximum heartbeat packet size of %d", ErrMalformedPacket, HeartbeatPacketMaxSize) } - // check whether we're actually dealing with a heartbeat packet - if data[0] != HeartbeatPacketHeader { - return nil, fmt.Errorf("%w: packet isn't of type heartbeat", ErrMalformedPacket) - } - // sanity check: packet len - min packet % id size = 0, // since we're only dealing with IDs from that offset if (len(data)-HeartbeatPacketMinSize)%HeartbeatPacketPeerIDSize != 0 { @@ -68,7 +81,7 @@ func ParseHeartbeat(data []byte) (*Heartbeat, error) { // copy own ID ownID := make([]byte, HeartbeatPacketPeerIDSize) - copy(ownID, data[HeartbeatPacketHeaderSize:HeartbeatPacketHeaderSize+HeartbeatPacketPeerIDSize]) + copy(ownID, data[:HeartbeatPacketPeerIDSize]) // read outbound IDs count outboundIDCount := int(data[HeartbeatPacketMinSize-1]) @@ -100,7 +113,7 @@ func ParseHeartbeat(data []byte) (*Heartbeat, error) { // inbound IDs can be zero inboundIDs := make([][]byte, inboundIDCount) - offset := HeartbeatPacketHeaderSize + HeartbeatPacketPeerIDSize + HeartbeatPacketOutboundIDCountSize + outboundIDCount*HeartbeatPacketPeerIDSize + offset := HeartbeatPacketPeerIDSize + HeartbeatPacketOutboundIDCountSize + outboundIDCount*HeartbeatPacketPeerIDSize for i := range inboundIDs { inboundIDs[i] = make([]byte, HeartbeatPacketPeerIDSize) copy(inboundIDs[i], data[offset+i*HeartbeatPacketPeerIDSize:offset+(i+1)*HeartbeatPacketPeerIDSize]) @@ -109,7 +122,8 @@ func ParseHeartbeat(data []byte) (*Heartbeat, error) { return &Heartbeat{OwnID: ownID, OutboundIDs: outboundIDs, InboundIDs: inboundIDs}, nil } -// NewHeartbeatMessage serializes the given heartbeat into a byte slice. +// NewHeartbeatMessage serializes the given heartbeat into a byte slice and adds a tlv header to the packet. +// message = tlv header + serialized packet func NewHeartbeatMessage(hb *Heartbeat) ([]byte, error) { if len(hb.InboundIDs) > HeartbeatMaxInboundPeersCount { return nil, fmt.Errorf("%w: heartbeat exceeds maximum inbound IDs of %d", ErrInvalidHeartbeat, HeartbeatMaxInboundPeersCount) @@ -126,14 +140,11 @@ func NewHeartbeatMessage(hb *Heartbeat) ([]byte, error) { packetSize := HeartbeatPacketMinSize + len(hb.OutboundIDs)*HeartbeatPacketPeerIDSize + len(hb.InboundIDs)*HeartbeatPacketPeerIDSize packet := make([]byte, packetSize) - // header byte - packet[0] = HeartbeatPacketHeader - // own nodeId - copy(packet[HeartbeatPacketHeaderSize:HeartbeatPacketHeaderSize+HeartbeatPacketPeerIDSize], hb.OwnID[:]) + copy(packet[:HeartbeatPacketPeerIDSize], hb.OwnID[:]) // outbound id count - packet[HeartbeatPacketHeaderSize+HeartbeatPacketPeerIDSize] = byte(len(hb.OutboundIDs)) + packet[HeartbeatPacketPeerIDSize] = byte(len(hb.OutboundIDs)) // copy contents of hb.OutboundIDs offset := HeartbeatPacketMinSize @@ -149,5 +160,16 @@ func NewHeartbeatMessage(hb *Heartbeat) ([]byte, error) { copy(packet[offset+i*HeartbeatPacketPeerIDSize:offset+(i+1)*HeartbeatPacketPeerIDSize], inboundID[:HeartbeatPacketPeerIDSize]) } - return packet, nil + // create a buffer for tlv header plus the packet + buf := bytes.NewBuffer(make([]byte, 0, tlv.HeaderMessageDefinition.MaxBytesLength+uint16(packetSize))) + // write tlv header into buffer + if err := tlv.WriteHeader(buf, MessageTypeHeartbeat, uint16(packetSize)); err != nil { + return nil, err + } + // write serialized packet bytes into the buffer + if err := binary.Write(buf, binary.LittleEndian, packet); err != nil { + return nil, err + } + + return buf.Bytes(), nil } diff --git a/plugins/analysis/packet/heartbeat_test.go b/plugins/analysis/packet/heartbeat_test.go index 134eba4c13a2e15163ff53528174277e4d60a37e..53d02b7d49d255048d67392f32314b9ef0bf5ce1 100644 --- a/plugins/analysis/packet/heartbeat_test.go +++ b/plugins/analysis/packet/heartbeat_test.go @@ -6,6 +6,7 @@ import ( "testing" . "github.com/iotaledger/goshimmer/plugins/analysis/packet" + "github.com/iotaledger/hive.go/protocol/tlv" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -91,18 +92,19 @@ func TestNewHeartbeatMessage(t *testing.T) { for _, testCase := range testCases { hb := testCase.hb serializedHb, err := NewHeartbeatMessage(hb) + tlvHeaderLength := int(tlv.HeaderMessageDefinition.MaxBytesLength) if testCase.err != nil { require.True(t, errors.Is(err, testCase.err)) continue } require.NoError(t, err, "heartbeat should have been serialized successfully") - assert.EqualValues(t, HeartbeatPacketHeader, serializedHb[0], "expected heartbeat header value as first byte") - assert.EqualValues(t, hb.OwnID[:], serializedHb[HeartbeatPacketHeaderSize:HeartbeatPacketHeaderSize+HeartbeatPacketPeerIDSize], "expected own peer id to be within range of %d:%d", HeartbeatPacketHeaderSize, HeartbeatPacketPeerIDSize) - assert.EqualValues(t, len(hb.OutboundIDs), serializedHb[HeartbeatPacketHeaderSize+HeartbeatPacketPeerIDSize], "expected outbound IDs count of %d", len(hb.OutboundIDs)) + assert.EqualValues(t, MessageTypeHeartbeat, serializedHb[0], "expected heartbeat message type tlv header value as first byte") + assert.EqualValues(t, hb.OwnID[:], serializedHb[tlvHeaderLength:tlvHeaderLength+HeartbeatPacketPeerIDSize], "expected own peer id to be within range of %d:%d", tlvHeaderLength, tlvHeaderLength+HeartbeatPacketPeerIDSize) + assert.EqualValues(t, len(hb.OutboundIDs), serializedHb[tlvHeaderLength+HeartbeatPacketPeerIDSize], "expected outbound IDs count of %d", len(hb.OutboundIDs)) // after the outbound IDs count, the outbound IDs are serialized - offset := HeartbeatPacketMinSize + offset := int(tlvHeaderLength) + HeartbeatPacketMinSize for i := 0; i < len(hb.OutboundIDs); i++ { assert.EqualValues(t, hb.OutboundIDs[i], serializedHb[offset+i*HeartbeatPacketPeerIDSize:offset+(i+1)*HeartbeatPacketPeerIDSize], "outbound ID at the given position doesn't match") } @@ -117,6 +119,7 @@ func TestNewHeartbeatMessage(t *testing.T) { } func TestParseHeartbeat(t *testing.T) { + tlvHeaderLength := int(tlv.HeaderMessageDefinition.MaxBytesLength) type testcase struct { source []byte expected *Heartbeat @@ -126,8 +129,10 @@ func TestParseHeartbeat(t *testing.T) { // ok func() testcase { hb := &Heartbeat{OwnID: ownID[:], OutboundIDs: make([][]byte, 0), InboundIDs: make([][]byte, 0)} + // message = tlv header + packet + // ParseHeartbeat() expects only the packet, hence serializedHb[tlvHeaderLength:] serializedHb, _ := NewHeartbeatMessage(hb) - return testcase{source: serializedHb, expected: hb, err: nil} + return testcase{source: serializedHb[tlvHeaderLength:], expected: hb, err: nil} }(), // ok func() testcase { @@ -140,15 +145,10 @@ func TestParseHeartbeat(t *testing.T) { inboundIDs[i] = inboundID[:] } hb := &Heartbeat{OwnID: ownID[:], OutboundIDs: outboundIDs, InboundIDs: inboundIDs} + // message = tlv header + packet + // ParseHeartbeat() expects only the packet, hence serializedHb[tlvHeaderLength:] serializedHb, _ := NewHeartbeatMessage(hb) - return testcase{source: serializedHb, expected: hb, err: nil} - }(), - // err, modify wrong packet header - func() testcase { - hb := &Heartbeat{OwnID: ownID[:], OutboundIDs: make([][]byte, 0), InboundIDs: make([][]byte, 0)} - serializedHb, _ := NewHeartbeatMessage(hb) - serializedHb[0] = 0xff - return testcase{source: serializedHb, expected: nil, err: ErrMalformedPacket} + return testcase{source: serializedHb[tlvHeaderLength:], expected: hb, err: nil} }(), // err, exceeds max inbound peer IDs func() testcase { @@ -164,21 +164,29 @@ func TestParseHeartbeat(t *testing.T) { inboundIDs[i] = inboundID[:] } hb := &Heartbeat{OwnID: ownID[:], OutboundIDs: outboundIDs, InboundIDs: inboundIDs} + // message = tlv header + packet + // ParseHeartbeat() expects only the packet, hence serializedHb[tlvHeaderLength:] serializedHb, _ := NewHeartbeatMessage(hb) // add an additional peer id serializedHb = append(serializedHb, ownID[:]...) - return testcase{source: serializedHb, expected: hb, err: ErrMalformedPacket} + return testcase{source: serializedHb[tlvHeaderLength:], expected: hb, err: ErrMalformedPacket} }(), // err, exceeds max outbound peer IDs func() testcase { - outboundIDs := make([][]byte, HeartbeatMaxOutboundPeersCount+1) - for i := 0; i < HeartbeatMaxInboundPeersCount+1; i++ { + outboundIDs := make([][]byte, HeartbeatMaxOutboundPeersCount) + for i := 0; i < HeartbeatMaxInboundPeersCount; i++ { outboundID := sha256.Sum256([]byte{byte(i)}) outboundIDs[i] = outboundID[:] } hb := &Heartbeat{OwnID: ownID[:], OutboundIDs: outboundIDs, InboundIDs: make([][]byte, 0)} + // NewHeartbeatMessage would return nil and and error for a malformed packet (too many outbound peer IDs) + // so we create a correct message(tlv header + packet) serializedHb, _ := NewHeartbeatMessage(hb) - return testcase{source: serializedHb, expected: hb, err: ErrMalformedPacket} + // and add an extra outbound ID (inbound IDs are zero) + serializedHb = append(serializedHb, ownID[:]...) + // manually overwrite outboundIDCount + serializedHb[tlvHeaderLength+HeartbeatPacketMinSize-1] = HeartbeatMaxOutboundPeersCount + 1 + return testcase{source: serializedHb[tlvHeaderLength:], expected: hb, err: ErrMalformedPacket} }(), // err, advertised outbound ID count is bigger than remaining data func() testcase { @@ -188,11 +196,13 @@ func TestParseHeartbeat(t *testing.T) { outboundIDs[i] = outboundID[:] } hb := &Heartbeat{OwnID: ownID[:], OutboundIDs: outboundIDs, InboundIDs: make([][]byte, 0)} + // message = tlv header + packet + // ParseHeartbeat() expects only the packet, hence serializedHb[tlvHeaderLength:] serializedHb, _ := NewHeartbeatMessage(hb) // we set the count to HeartbeatMaxOutboundPeersCount but we only have HeartbeatMaxOutboundPeersCount - 1 // actually serialized in the packet - serializedHb[HeartbeatPacketMinSize-1] = HeartbeatMaxOutboundPeersCount - return testcase{source: serializedHb, expected: nil, err: ErrMalformedPacket} + serializedHb[tlvHeaderLength+HeartbeatPacketMinSize-1] = HeartbeatMaxOutboundPeersCount + return testcase{source: serializedHb[tlvHeaderLength:], expected: nil, err: ErrMalformedPacket} }(), // err, doesn't reach minimum packet size func() testcase { diff --git a/plugins/analysis/packet/packet.go b/plugins/analysis/packet/packet.go index c95ed394b53b3fef198274b4c37a4eb5034cd115..c68b5d984f0d0686993e8a1563b660cc187603d5 100644 --- a/plugins/analysis/packet/packet.go +++ b/plugins/analysis/packet/packet.go @@ -1,8 +1,25 @@ package packet -import "errors" +import ( + "errors" + + "github.com/iotaledger/hive.go/protocol/message" + "github.com/iotaledger/hive.go/protocol/tlv" +) var ( // ErrMalformedPacket is returned when malformed packets are tried to be parsed. ErrMalformedPacket = errors.New("malformed packet") ) + +// AnalysisMsgRegistry holds all message definitions for analysis server related messages +var AnalysisMsgRegistry *message.Registry + +func init() { + // message definitions to be registered in registry + definitions := []*message.Definition{ + tlv.HeaderMessageDefinition, + HeartbeatMessageDefinition, + } + AnalysisMsgRegistry = message.NewRegistry(definitions) +} diff --git a/plugins/analysis/packet/ping.go b/plugins/analysis/packet/ping.go deleted file mode 100644 index bbd11958b7005892b254973ed2a6d8dc6ea48e62..0000000000000000000000000000000000000000 --- a/plugins/analysis/packet/ping.go +++ /dev/null @@ -1,33 +0,0 @@ -package packet - -import ( - "fmt" -) - -const ( - // PingPacketHeader is the byte value denoting a ping packet. - PingPacketHeader = 0x00 - // PingPacketSize is the size of a ping packet. - PingPacketSize = 1 -) - -// Ping defines a ping as an empty struct. -type Ping struct{} - -// ParsePing parses a slice of bytes into a ping. -func ParsePing(data []byte) (*Ping, error) { - if len(data) != PingPacketSize { - return nil, fmt.Errorf("%w: packet doesn't match ping packet size of %d", ErrMalformedPacket, PingPacketSize) - } - - if data[0] != PingPacketHeader { - return nil, fmt.Errorf("%w: packet isn't of type ping", ErrMalformedPacket) - } - - return &Ping{}, nil -} - -// NewPingMessage creates a new ping message. -func NewPingMessage() []byte { - return []byte{PingPacketHeader} -} diff --git a/plugins/analysis/server/plugin.go b/plugins/analysis/server/plugin.go index 9aebe353354705021020a5299b1ecdb26286d4be..ce93828929eb2b72932a037e68a281b2145d246c 100644 --- a/plugins/analysis/server/plugin.go +++ b/plugins/analysis/server/plugin.go @@ -1,7 +1,6 @@ package server import ( - "errors" "net" "strconv" "time" @@ -15,6 +14,7 @@ import ( "github.com/iotaledger/hive.go/network" "github.com/iotaledger/hive.go/network/tcp" "github.com/iotaledger/hive.go/node" + "github.com/iotaledger/hive.go/protocol" flag "github.com/spf13/pflag" ) @@ -27,8 +27,6 @@ const ( // IdleTimeout defines the idle timeout. IdleTimeout = 10 * time.Second - // StateHeartbeat defines the state of the heartbeat. - StateHeartbeat = packet.HeartbeatPacketHeader ) func init() { @@ -36,8 +34,6 @@ func init() { } var ( - // ErrInvalidPacketHeader defines an invalid packet header error. - ErrInvalidPacketHeader = errors.New("invalid packet header") // Plugin is the plugin instance of the analysis server plugin. Plugin = node.NewPlugin(PluginName, node.Disabled, configure, run) server *tcp.TCPServer @@ -77,9 +73,6 @@ func run(_ *node.Plugin) { } } -// ConnectionState defines the type of a connection state as a byte -type ConnectionState = byte - // HandleConnection handles the given connection. func HandleConnection(conn *network.ManagedConnection) { err := conn.SetTimeout(IdleTimeout) @@ -87,14 +80,15 @@ func HandleConnection(conn *network.ManagedConnection) { log.Errorf(err.Error()) } - var connectionState byte - var receiveBuffer []byte - - var onDisconnect *events.Closure + // create new protocol instance + p := protocol.New(conn, packet.AnalysisMsgRegistry) onReceiveData := events.NewClosure(func(data []byte) { - processIncomingPacket(&connectionState, &receiveBuffer, conn, data) + // process incoming data in protocol.Receive() + p.Receive(data) }) + + var onDisconnect *events.Closure onDisconnect = events.NewClosure(func() { conn.Events.ReceiveData.Detach(onReceiveData) conn.Events.Close.Detach(onDisconnect) @@ -103,57 +97,26 @@ func HandleConnection(conn *network.ManagedConnection) { conn.Events.ReceiveData.Attach(onReceiveData) conn.Events.Close.Attach(onDisconnect) - maxPacketsSize := getMaxPacketSize(packet.HeartbeatPacketMaxSize) + // connect protocol events to processors + wireUp(p) - go conn.Read(make([]byte, maxPacketsSize)) + // starts the protocol and reads from its connection + go p.Start() } -func getMaxPacketSize(packetSizes ...int) int { - maxPacketSize := 0 - - for _, packetSize := range packetSizes { - if packetSize > maxPacketSize { - maxPacketSize = packetSize - } - } - - return maxPacketSize -} - -func processIncomingPacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte) { - var err error - if *connectionState, *receiveBuffer, err = parsePackageHeader(data); err != nil { - Events.Error.Trigger(err) - conn.Close() - return - } - - switch *connectionState { - case StateHeartbeat: - processHeartbeatPacket(connectionState, receiveBuffer, conn, data) - } -} - -func parsePackageHeader(data []byte) (ConnectionState, []byte, error) { - var connectionState ConnectionState - var receiveBuffer []byte - - switch data[0] { - case packet.HeartbeatPacketHeader: - receiveBuffer = make([]byte, packet.HeartbeatPacketMaxSize) - connectionState = StateHeartbeat - default: - return 0, nil, ErrInvalidPacketHeader - } - - return connectionState, receiveBuffer, nil +// wireUp connects the Received events of the protocol to the packet specific processor +func wireUp(p *protocol.Protocol) { + p.Events.Received[packet.MessageTypeHeartbeat].Attach(events.NewClosure(func(data []byte) { + processHeartbeatPacket(data, p) + })) } -func processHeartbeatPacket(_ *byte, _ *[]byte, conn *network.ManagedConnection, data []byte) { +// processHeartbeatPacket parses the serialized data into a Heartbeat packet and triggers its event +func processHeartbeatPacket(data []byte, p *protocol.Protocol) { heartbeatPacket, err := packet.ParseHeartbeat(data) if err != nil { Events.Error.Trigger(err) - conn.Close() + p.CloseConnection() return } Events.Heartbeat.Trigger(heartbeatPacket) diff --git a/tools/integration-tests/tester/go.mod b/tools/integration-tests/tester/go.mod index c3439265a1b19deb13347bf53979de552dc2f74e..8f31861c5760b6229a2b2ec62b943f470993b75c 100644 --- a/tools/integration-tests/tester/go.mod +++ b/tools/integration-tests/tester/go.mod @@ -10,7 +10,7 @@ require ( github.com/docker/go-units v0.4.0 // indirect github.com/drand/drand v0.8.1 github.com/iotaledger/goshimmer v0.1.3 - github.com/iotaledger/hive.go v0.0.0-20200508125657-76ee9eb66cf8 + github.com/iotaledger/hive.go v0.0.0-20200513180357-f0ac8c45b754 github.com/opencontainers/go-digest v1.0.0-rc1 // indirect github.com/stretchr/testify v1.5.1 ) diff --git a/tools/integration-tests/tester/go.sum b/tools/integration-tests/tester/go.sum index 3168baced1d6fa22f30882004cb9c68e3b7b2403..f71c038d73d23289a0ecd1a7ce429bdbd3ef3284 100644 --- a/tools/integration-tests/tester/go.sum +++ b/tools/integration-tests/tester/go.sum @@ -138,8 +138,8 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= -github.com/iotaledger/hive.go v0.0.0-20200508125657-76ee9eb66cf8 h1:b2CAofhKmJDLkPC9ul88KXa3BU5zRHTZp3TOPbIzSsg= -github.com/iotaledger/hive.go v0.0.0-20200508125657-76ee9eb66cf8/go.mod h1:HgYsLMzyQV+eaiUrxa1c7qvH9Jwi2ncycqtlw+Lczhs= +github.com/iotaledger/hive.go v0.0.0-20200513180357-f0ac8c45b754 h1:UCyAisLvAuKIWf2bMz+iYYgjGdHS7H4W2wMTpWg9yl8= +github.com/iotaledger/hive.go v0.0.0-20200513180357-f0ac8c45b754/go.mod h1:HgYsLMzyQV+eaiUrxa1c7qvH9Jwi2ncycqtlw+Lczhs= github.com/iotaledger/iota.go v1.0.0-beta.9/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8= github.com/iotaledger/iota.go v1.0.0-beta.14/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo=