Skip to content
Snippets Groups Projects
Commit 17f603fd authored by Hans Moog's avatar Hans Moog
Browse files

Refactor: refactored gossip module

parent 9258f54a
Branches
No related tags found
No related merge requests found
......@@ -6,9 +6,10 @@ import (
)
var Events = pluginEvents{
AddNeighbor: events.NewEvent(errorCaller),
RemoveNeighbor: events.NewEvent(errorCaller),
DropNeighbor: events.NewEvent(errorCaller),
AddNeighbor: events.NewEvent(neighborCaller),
UpdateNeighbor: events.NewEvent(neighborCaller),
RemoveNeighbor: events.NewEvent(neighborCaller),
DropNeighbor: events.NewEvent(neighborCaller),
IncomingConnection: events.NewEvent(errorCaller),
ReceiveTransaction: events.NewEvent(transactionCaller),
Error: events.NewEvent(errorCaller),
......@@ -17,13 +18,14 @@ var Events = pluginEvents{
type pluginEvents struct {
// neighbor events
AddNeighbor *events.Event
UpdateNeighbor *events.Event
RemoveNeighbor *events.Event
DropNeighbor *events.Event
// low level network events
IncomingConnection *events.Event
// high level protocol events
DropNeighbor *events.Event
SendTransaction *events.Event
SendTransactionRequest *events.Event
ReceiveTransaction *events.Event
......@@ -34,12 +36,6 @@ type pluginEvents struct {
Error *events.Event
}
func intCaller(handler interface{}, params ...interface{}) { handler.(func(int))(params[0].(int)) }
func errorCaller(handler interface{}, params ...interface{}) { handler.(func(error))(params[0].(error)) }
func transactionCaller(handler interface{}, params ...interface{}) { handler.(func(*transaction.Transaction))(params[0].(*transaction.Transaction)) }
type protocolEvents struct {
ReceiveVersion *events.Event
ReceiveIdentification *events.Event
......@@ -50,3 +46,11 @@ type protocolEvents struct {
ReceiveRequestData *events.Event
Error *events.Event
}
func intCaller(handler interface{}, params ...interface{}) { handler.(func(int))(params[0].(int)) }
func neighborCaller(handler interface{}, params ...interface{}) { handler.(func(*Peer))(params[0].(*Peer)) }
func errorCaller(handler interface{}, params ...interface{}) { handler.(func(error))(params[0].(error)) }
func transactionCaller(handler interface{}, params ...interface{}) { handler.(func(*transaction.Transaction))(params[0].(*transaction.Transaction)) }
package gossip
import (
"github.com/iotaledger/goshimmer/packages/identity"
"github.com/iotaledger/goshimmer/packages/network"
"net"
)
const (
MARSHALLED_NEIGHBOR_TOTAL_SIZE = 1
)
type Neighbor struct {
Conn *network.ManagedConnection
Identity identity.Identity
Address net.IP
Port uint16
}
func UnmarshalNeighbor(data []byte) (*Neighbor, error) {
return &Neighbor{}, nil
}
func (neighbor *Neighbor) Marshal() []byte {
return nil
}
func AddNeighbor() {
}
func GetNeighbor() {
}
package gossip
import (
"github.com/iotaledger/goshimmer/packages/identity"
"github.com/iotaledger/goshimmer/packages/network"
"net"
"sync"
)
type Peer struct {
Identity identity.Identity
Address net.IP
Port uint16
Conn *network.ManagedConnection
}
func UnmarshalPeer(data []byte) (*Peer, error) {
return &Peer{}, nil
}
func (peer *Peer) Marshal() []byte {
return nil
}
func (peer *Peer) Equals(other *Peer) bool {
return peer.Identity.StringIdentifier == peer.Identity.StringIdentifier &&
peer.Port == other.Port && peer.Address.String() == other.Address.String()
}
func AddNeighbor(newNeighbor *Peer) {
neighborLock.Lock()
defer neighborLock.Lock()
if neighbor, exists := neighbors[newNeighbor.Identity.StringIdentifier]; !exists {
neighbors[newNeighbor.Identity.StringIdentifier] = newNeighbor
Events.AddNeighbor.Trigger(newNeighbor)
} else {
if !newNeighbor.Equals(neighbor) {
neighbor.Identity = neighbor.Identity
neighbor.Port = neighbor.Port
neighbor.Address = neighbor.Address
Events.UpdateNeighbor.Trigger(newNeighbor)
}
}
}
func RemoveNeighbor(identifier string) {
neighborLock.Lock()
defer neighborLock.Lock()
if neighbor, exists := neighbors[identifier]; exists {
delete(neighbors, identifier)
Events.RemoveNeighbor.Trigger(neighbor)
}
}
func GetNeighbor(neighbor *Peer) (*Peer, bool) {
neighborLock.RLock()
defer neighborLock.RUnlock()
neighbor, exists := neighbors[neighbor.Identity.StringIdentifier]
return neighbor, exists
}
func GetNeighbors() map[string]*Peer {
neighborLock.RLock()
defer neighborLock.RUnlock()
result := make(map[string]*Peer)
for id, neighbor := range neighbors {
result[id] = neighbor
}
return result
}
const (
MARSHALLED_NEIGHBOR_TOTAL_SIZE = 1
)
var neighbors = make(map[string]*Peer)
var neighborLock sync.RWMutex
......@@ -2,10 +2,11 @@ package gossip
import (
"github.com/iotaledger/goshimmer/packages/errors"
"github.com/iotaledger/goshimmer/packages/events"
"strconv"
)
//region interfaces ////////////////////////////////////////////////////////////////////////////////////////////////////
// region interfaces ///////////////////////////////////////////////////////////////////////////////////////////////////
type protocolState interface {
Consume(protocol *protocol, data []byte, offset int, length int) (int, errors.IdentifiableError)
......@@ -16,12 +17,16 @@ type protocolState interface {
// region protocol /////////////////////////////////////////////////////////////////////////////////////////////////////
type protocol struct {
neighbor *Neighbor
Events protocolEvents
neighbor *Peer
currentState protocolState
}
func newProtocol(neighbor *Neighbor) *protocol {
func newProtocol(neighbor *Peer) *protocol {
protocol := &protocol{
Events: protocolEvents{
ReceiveVersion: events.NewEvent(intCaller),
},
neighbor: neighbor,
currentState: &versionState{},
}
......@@ -47,14 +52,14 @@ func (protocol *protocol) parseData(data []byte) {
// endregion ////////////////////////////////////////////////////////////////////////////////////////////////////////////
// region versionState //////////////////////////////////////////////////////////////////////////////////////////////////
// region versionState /////////////////////////////////////////////////////////////////////////////////////////////////
type versionState struct{}
func (state *versionState) Consume(protocol *protocol, data []byte, offset int, length int) (int, errors.IdentifiableError) {
switch data[offset] {
case 1:
Events.ReceiveVersion.Trigger(1)
protocol.Events.ReceiveVersion.Trigger(1)
protocol.currentState = newIndentificationStateV1()
......
......@@ -26,13 +26,13 @@ func (state *indentificationStateV1) Consume(protocol *protocol, data []byte, of
state.offset += bytesRead
if state.offset == MARSHALLED_NEIGHBOR_TOTAL_SIZE {
if unmarshalledNeighbor, err := UnmarshalNeighbor(state.buffer); err != nil {
if unmarshalledNeighbor, err := UnmarshalPeer(state.buffer); err != nil {
return bytesRead, ErrInvalidAuthenticationMessage.Derive(err, "invalid authentication message")
} else {
protocol.neighbor.Identity = unmarshalledNeighbor.Identity
protocol.neighbor.Port = unmarshalledNeighbor.Port
Events.ReceiveIdentification.Trigger(protocol.neighbor)
protocol.Events.ReceiveIdentification.Trigger(protocol.neighbor)
protocol.currentState = newacceptanceStateV1()
state.offset = 0
......@@ -54,17 +54,17 @@ func newacceptanceStateV1() *acceptanceStateV1 {
func (state *acceptanceStateV1) Consume(protocol *protocol, data []byte, offset int, length int) (int, errors.IdentifiableError) {
switch data[offset] {
case 1:
Events.AcceptConnection.Trigger()
case 0:
protocol.Events.RejectConnection.Trigger()
protocol.currentState = newDispatchStateV1()
protocol.neighbor.Conn.Close()
protocol.currentState = nil
break
case 2:
Events.RejectConnection.Trigger()
case 1:
protocol.Events.AcceptConnection.Trigger()
protocol.neighbor.Conn.Close()
protocol.currentState = nil
protocol.currentState = newDispatchStateV1()
break
default:
......@@ -87,7 +87,7 @@ func newDispatchStateV1() *dispatchStateV1 {
func (state *dispatchStateV1) Consume(protocol *protocol, data []byte, offset int, length int) (int, errors.IdentifiableError) {
switch data[0] {
case 0:
Events.RejectConnection.Trigger()
protocol.Events.RejectConnection.Trigger()
protocol.neighbor.Conn.Close()
protocol.currentState = nil
......@@ -130,7 +130,7 @@ func (state *transactionStateV1) Consume(protocol *protocol, data []byte, offset
transactionData := make([]byte, transaction.MARSHALLED_TOTAL_SIZE)
copy(transactionData, state.buffer)
Events.ReceiveTransactionData.Trigger(transactionData)
protocol.Events.ReceiveTransactionData.Trigger(transactionData)
go func() {
Events.ReceiveTransaction.Trigger(transaction.FromBytes(transactionData))
......
plugins/gossip/protocol_v1.png

22.1 KiB | W: | H:

plugins/gossip/protocol_v1.png

24.4 KiB | W: | H:

plugins/gossip/protocol_v1.png
plugins/gossip/protocol_v1.png
plugins/gossip/protocol_v1.png
plugins/gossip/protocol_v1.png
  • 2-up
  • Swipe
  • Onion skin
......@@ -14,7 +14,7 @@ var TCPServer = tcp.NewServer()
func configureServer(plugin *node.Plugin) {
TCPServer.Events.Connect.Attach(events.NewClosure(func(conn *network.ManagedConnection) {
neighbor := &Neighbor{
neighbor := &Peer{
Address: conn.RemoteAddr().(*net.TCPAddr).IP,
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment