diff --git a/plugins/gossip/events.go b/plugins/gossip/events.go index 0a0a5a1a173d968624b85068f50b8ccc3e52f0ec..e08c6813fb81883e391747d2ec15052003d6b34f 100644 --- a/plugins/gossip/events.go +++ b/plugins/gossip/events.go @@ -6,9 +6,10 @@ import ( ) var Events = pluginEvents{ - AddNeighbor: events.NewEvent(errorCaller), - RemoveNeighbor: events.NewEvent(errorCaller), - DropNeighbor: events.NewEvent(errorCaller), + AddNeighbor: events.NewEvent(neighborCaller), + UpdateNeighbor: events.NewEvent(neighborCaller), + RemoveNeighbor: events.NewEvent(neighborCaller), + DropNeighbor: events.NewEvent(neighborCaller), IncomingConnection: events.NewEvent(errorCaller), ReceiveTransaction: events.NewEvent(transactionCaller), Error: events.NewEvent(errorCaller), @@ -17,13 +18,14 @@ var Events = pluginEvents{ type pluginEvents struct { // neighbor events AddNeighbor *events.Event + UpdateNeighbor *events.Event RemoveNeighbor *events.Event - DropNeighbor *events.Event // low level network events IncomingConnection *events.Event // high level protocol events + DropNeighbor *events.Event SendTransaction *events.Event SendTransactionRequest *events.Event ReceiveTransaction *events.Event @@ -34,12 +36,6 @@ type pluginEvents struct { Error *events.Event } -func intCaller(handler interface{}, params ...interface{}) { handler.(func(int))(params[0].(int)) } - -func errorCaller(handler interface{}, params ...interface{}) { handler.(func(error))(params[0].(error)) } - -func transactionCaller(handler interface{}, params ...interface{}) { handler.(func(*transaction.Transaction))(params[0].(*transaction.Transaction)) } - type protocolEvents struct { ReceiveVersion *events.Event ReceiveIdentification *events.Event @@ -50,3 +46,11 @@ type protocolEvents struct { ReceiveRequestData *events.Event Error *events.Event } + +func intCaller(handler interface{}, params ...interface{}) { handler.(func(int))(params[0].(int)) } + +func neighborCaller(handler interface{}, params ...interface{}) { handler.(func(*Peer))(params[0].(*Peer)) } + +func errorCaller(handler interface{}, params ...interface{}) { handler.(func(error))(params[0].(error)) } + +func transactionCaller(handler interface{}, params ...interface{}) { handler.(func(*transaction.Transaction))(params[0].(*transaction.Transaction)) } diff --git a/plugins/gossip/neighbormanager.go b/plugins/gossip/neighbormanager.go deleted file mode 100644 index 60782adb30dec476b6b9b23845493ddb828ccfe8..0000000000000000000000000000000000000000 --- a/plugins/gossip/neighbormanager.go +++ /dev/null @@ -1,34 +0,0 @@ -package gossip - -import ( - "github.com/iotaledger/goshimmer/packages/identity" - "github.com/iotaledger/goshimmer/packages/network" - "net" -) - -const ( - MARSHALLED_NEIGHBOR_TOTAL_SIZE = 1 -) - -type Neighbor struct { - Conn *network.ManagedConnection - Identity identity.Identity - Address net.IP - Port uint16 -} - -func UnmarshalNeighbor(data []byte) (*Neighbor, error) { - return &Neighbor{}, nil -} - -func (neighbor *Neighbor) Marshal() []byte { - return nil -} - -func AddNeighbor() { - -} - -func GetNeighbor() { - -} diff --git a/plugins/gossip/neighbors.go b/plugins/gossip/neighbors.go new file mode 100644 index 0000000000000000000000000000000000000000..77b7247d2299be0dcaee226a9fd1c87ad6de3aec --- /dev/null +++ b/plugins/gossip/neighbors.go @@ -0,0 +1,86 @@ +package gossip + +import ( + "github.com/iotaledger/goshimmer/packages/identity" + "github.com/iotaledger/goshimmer/packages/network" + "net" + "sync" +) + +type Peer struct { + Identity identity.Identity + Address net.IP + Port uint16 + Conn *network.ManagedConnection +} + +func UnmarshalPeer(data []byte) (*Peer, error) { + return &Peer{}, nil +} + +func (peer *Peer) Marshal() []byte { + return nil +} + +func (peer *Peer) Equals(other *Peer) bool { + return peer.Identity.StringIdentifier == peer.Identity.StringIdentifier && + peer.Port == other.Port && peer.Address.String() == other.Address.String() +} + +func AddNeighbor(newNeighbor *Peer) { + neighborLock.Lock() + defer neighborLock.Lock() + + if neighbor, exists := neighbors[newNeighbor.Identity.StringIdentifier]; !exists { + neighbors[newNeighbor.Identity.StringIdentifier] = newNeighbor + + Events.AddNeighbor.Trigger(newNeighbor) + } else { + if !newNeighbor.Equals(neighbor) { + neighbor.Identity = neighbor.Identity + neighbor.Port = neighbor.Port + neighbor.Address = neighbor.Address + + Events.UpdateNeighbor.Trigger(newNeighbor) + } + } +} + +func RemoveNeighbor(identifier string) { + neighborLock.Lock() + defer neighborLock.Lock() + + if neighbor, exists := neighbors[identifier]; exists { + delete(neighbors, identifier) + + Events.RemoveNeighbor.Trigger(neighbor) + } +} + +func GetNeighbor(neighbor *Peer) (*Peer, bool) { + neighborLock.RLock() + defer neighborLock.RUnlock() + + neighbor, exists := neighbors[neighbor.Identity.StringIdentifier] + + return neighbor, exists +} + +func GetNeighbors() map[string]*Peer { + neighborLock.RLock() + defer neighborLock.RUnlock() + + result := make(map[string]*Peer) + for id, neighbor := range neighbors { + result[id] = neighbor + } + + return result +} + +const ( + MARSHALLED_NEIGHBOR_TOTAL_SIZE = 1 +) + +var neighbors = make(map[string]*Peer) +var neighborLock sync.RWMutex diff --git a/plugins/gossip/protocol.go b/plugins/gossip/protocol.go index 1612eced68df85885633d0552e1b4d95cb2bf179..c2f6e8661feaf0192114190b4126624e7a28266c 100644 --- a/plugins/gossip/protocol.go +++ b/plugins/gossip/protocol.go @@ -2,26 +2,31 @@ package gossip import ( "github.com/iotaledger/goshimmer/packages/errors" + "github.com/iotaledger/goshimmer/packages/events" "strconv" ) -//region interfaces //////////////////////////////////////////////////////////////////////////////////////////////////// +// region interfaces /////////////////////////////////////////////////////////////////////////////////////////////////// type protocolState interface { Consume(protocol *protocol, data []byte, offset int, length int) (int, errors.IdentifiableError) } -//endregion //////////////////////////////////////////////////////////////////////////////////////////////////////////// +// endregion //////////////////////////////////////////////////////////////////////////////////////////////////////////// // region protocol ///////////////////////////////////////////////////////////////////////////////////////////////////// type protocol struct { - neighbor *Neighbor + Events protocolEvents + neighbor *Peer currentState protocolState } -func newProtocol(neighbor *Neighbor) *protocol { +func newProtocol(neighbor *Peer) *protocol { protocol := &protocol{ + Events: protocolEvents{ + ReceiveVersion: events.NewEvent(intCaller), + }, neighbor: neighbor, currentState: &versionState{}, } @@ -47,14 +52,14 @@ func (protocol *protocol) parseData(data []byte) { // endregion //////////////////////////////////////////////////////////////////////////////////////////////////////////// -// region versionState ////////////////////////////////////////////////////////////////////////////////////////////////// +// region versionState ///////////////////////////////////////////////////////////////////////////////////////////////// type versionState struct{} func (state *versionState) Consume(protocol *protocol, data []byte, offset int, length int) (int, errors.IdentifiableError) { switch data[offset] { case 1: - Events.ReceiveVersion.Trigger(1) + protocol.Events.ReceiveVersion.Trigger(1) protocol.currentState = newIndentificationStateV1() diff --git a/plugins/gossip/protocol_v1.go b/plugins/gossip/protocol_v1.go index 008b024320d7d170ea5a8abc8014a561ff061197..72a418ab178f993becc9e62177bf2123265bed1c 100644 --- a/plugins/gossip/protocol_v1.go +++ b/plugins/gossip/protocol_v1.go @@ -26,13 +26,13 @@ func (state *indentificationStateV1) Consume(protocol *protocol, data []byte, of state.offset += bytesRead if state.offset == MARSHALLED_NEIGHBOR_TOTAL_SIZE { - if unmarshalledNeighbor, err := UnmarshalNeighbor(state.buffer); err != nil { + if unmarshalledNeighbor, err := UnmarshalPeer(state.buffer); err != nil { return bytesRead, ErrInvalidAuthenticationMessage.Derive(err, "invalid authentication message") } else { protocol.neighbor.Identity = unmarshalledNeighbor.Identity protocol.neighbor.Port = unmarshalledNeighbor.Port - Events.ReceiveIdentification.Trigger(protocol.neighbor) + protocol.Events.ReceiveIdentification.Trigger(protocol.neighbor) protocol.currentState = newacceptanceStateV1() state.offset = 0 @@ -54,17 +54,17 @@ func newacceptanceStateV1() *acceptanceStateV1 { func (state *acceptanceStateV1) Consume(protocol *protocol, data []byte, offset int, length int) (int, errors.IdentifiableError) { switch data[offset] { - case 1: - Events.AcceptConnection.Trigger() + case 0: + protocol.Events.RejectConnection.Trigger() - protocol.currentState = newDispatchStateV1() + protocol.neighbor.Conn.Close() + protocol.currentState = nil break - case 2: - Events.RejectConnection.Trigger() + case 1: + protocol.Events.AcceptConnection.Trigger() - protocol.neighbor.Conn.Close() - protocol.currentState = nil + protocol.currentState = newDispatchStateV1() break default: @@ -87,7 +87,7 @@ func newDispatchStateV1() *dispatchStateV1 { func (state *dispatchStateV1) Consume(protocol *protocol, data []byte, offset int, length int) (int, errors.IdentifiableError) { switch data[0] { case 0: - Events.RejectConnection.Trigger() + protocol.Events.RejectConnection.Trigger() protocol.neighbor.Conn.Close() protocol.currentState = nil @@ -130,7 +130,7 @@ func (state *transactionStateV1) Consume(protocol *protocol, data []byte, offset transactionData := make([]byte, transaction.MARSHALLED_TOTAL_SIZE) copy(transactionData, state.buffer) - Events.ReceiveTransactionData.Trigger(transactionData) + protocol.Events.ReceiveTransactionData.Trigger(transactionData) go func() { Events.ReceiveTransaction.Trigger(transaction.FromBytes(transactionData)) diff --git a/plugins/gossip/protocol_v1.png b/plugins/gossip/protocol_v1.png index f5c361b612b7bc7644c4e65829c63fa79547efb2..ce667756467d35bf33185cf5c3727894ea7c4e57 100644 Binary files a/plugins/gossip/protocol_v1.png and b/plugins/gossip/protocol_v1.png differ diff --git a/plugins/gossip/server.go b/plugins/gossip/server.go index 76bd245aa7b75353a50d1096d10abb3ded65fe46..6595dc923d6e093ebc2e6212a9a2929b563c7c42 100644 --- a/plugins/gossip/server.go +++ b/plugins/gossip/server.go @@ -14,7 +14,7 @@ var TCPServer = tcp.NewServer() func configureServer(plugin *node.Plugin) { TCPServer.Events.Connect.Attach(events.NewClosure(func(conn *network.ManagedConnection) { - neighbor := &Neighbor{ + neighbor := &Peer{ Address: conn.RemoteAddr().(*net.TCPAddr).IP, }