From 923937696f4a38aa3e86e261f5f40a8dce81f114 Mon Sep 17 00:00:00 2001 From: Hans Moog <hm@mkjc.net> Date: Thu, 11 Apr 2019 11:39:49 +0200 Subject: [PATCH] Refactor: added ping messages for auto peering and refactored code --- packages/timeutil/ticker.go | 19 ++ plugins/autopeering/parameters/parameters.go | 2 +- .../peermanager/accepted_neighbors.go | 5 - .../peermanager/chosen_neighbor_candidates.go | 25 ++ .../peermanager/chosen_neighbors.go | 5 - plugins/autopeering/peermanager/constants.go | 9 - .../autopeering/peermanager/entry_nodes.go | 22 +- plugins/autopeering/peermanager/events.go | 31 +++ .../autopeering/peermanager/known_peers.go | 15 +- .../autopeering/peermanager/neighborhood.go | 42 +++ plugins/autopeering/peermanager/peer_list.go | 45 ---- .../autopeering/peermanager/peer_manager.go | 250 ------------------ .../peermanager/peering_request.go | 33 --- .../peermanager/types/peer_list.go | 33 +++ .../peermanager/types/peer_register.go | 57 ++++ plugins/autopeering/plugin.go | 28 +- .../protocol/chosen_neighbor_processor.go | 61 +++++ .../protocol/constants/constants.go | 11 + plugins/autopeering/protocol/error_handler.go | 12 + plugins/autopeering/protocol/events.go | 84 ++++++ .../protocol/incoming_ping_processor.go | 18 ++ .../protocol/incoming_request_processor.go | 47 ++++ .../protocol/incoming_response_processor.go | 33 +++ .../protocol/outgoing_ping_processor.go | 83 ++++++ .../autopeering/protocol/peer/constants.go | 6 +- plugins/autopeering/protocol/peer/peer.go | 83 +++++- .../autopeering/protocol/ping/constants.go | 29 ++ plugins/autopeering/protocol/ping/errors.go | 8 + plugins/autopeering/protocol/ping/ping.go | 69 +++++ plugins/autopeering/protocol/plugin.go | 30 +++ .../autopeering/protocol/request/constants.go | 6 +- .../autopeering/protocol/request/request.go | 37 ++- plugins/autopeering/protocol/request/send.go | 51 ++++ .../protocol/response/constants.go | 4 +- plugins/autopeering/protocol/types.go | 5 - .../protocol/{ => types}/constants.go | 4 +- plugins/autopeering/protocol/types/types.go | 5 + plugins/autopeering/saltmanager/errors.go | 8 + plugins/autopeering/saltmanager/utils.go | 19 ++ plugins/autopeering/server/tcp/constants.go | 1 + plugins/autopeering/server/tcp/events.go | 48 +++- plugins/autopeering/server/tcp/server.go | 74 +++++- plugins/autopeering/server/tcp/types.go | 8 +- plugins/autopeering/server/udp/events.go | 26 +- plugins/autopeering/server/udp/server.go | 9 + plugins/autopeering/server/udp/types.go | 3 + .../neighbormanager/accepted_neighbors.go | 7 + .../neighbormanager/chosen_neighbors.go | 7 + .../gossip/neighbormanager/neighbormanager.go | 3 + plugins/statusscreen/ui_header_bar.go | 6 +- 50 files changed, 1098 insertions(+), 428 deletions(-) create mode 100644 packages/timeutil/ticker.go delete mode 100644 plugins/autopeering/peermanager/accepted_neighbors.go create mode 100644 plugins/autopeering/peermanager/chosen_neighbor_candidates.go delete mode 100644 plugins/autopeering/peermanager/chosen_neighbors.go delete mode 100644 plugins/autopeering/peermanager/constants.go create mode 100644 plugins/autopeering/peermanager/events.go create mode 100644 plugins/autopeering/peermanager/neighborhood.go delete mode 100644 plugins/autopeering/peermanager/peer_list.go delete mode 100644 plugins/autopeering/peermanager/peer_manager.go delete mode 100644 plugins/autopeering/peermanager/peering_request.go create mode 100644 plugins/autopeering/peermanager/types/peer_list.go create mode 100644 plugins/autopeering/peermanager/types/peer_register.go create mode 100644 plugins/autopeering/protocol/chosen_neighbor_processor.go create mode 100644 plugins/autopeering/protocol/constants/constants.go create mode 100644 plugins/autopeering/protocol/error_handler.go create mode 100644 plugins/autopeering/protocol/events.go create mode 100644 plugins/autopeering/protocol/incoming_ping_processor.go create mode 100644 plugins/autopeering/protocol/incoming_request_processor.go create mode 100644 plugins/autopeering/protocol/incoming_response_processor.go create mode 100644 plugins/autopeering/protocol/outgoing_ping_processor.go create mode 100644 plugins/autopeering/protocol/ping/constants.go create mode 100644 plugins/autopeering/protocol/ping/errors.go create mode 100644 plugins/autopeering/protocol/ping/ping.go create mode 100644 plugins/autopeering/protocol/plugin.go create mode 100644 plugins/autopeering/protocol/request/send.go delete mode 100644 plugins/autopeering/protocol/types.go rename plugins/autopeering/protocol/{ => types}/constants.go (80%) create mode 100644 plugins/autopeering/protocol/types/types.go create mode 100644 plugins/autopeering/saltmanager/errors.go create mode 100644 plugins/autopeering/saltmanager/utils.go create mode 100644 plugins/gossip/neighbormanager/accepted_neighbors.go create mode 100644 plugins/gossip/neighbormanager/chosen_neighbors.go create mode 100644 plugins/gossip/neighbormanager/neighbormanager.go diff --git a/packages/timeutil/ticker.go b/packages/timeutil/ticker.go new file mode 100644 index 00000000..0a753694 --- /dev/null +++ b/packages/timeutil/ticker.go @@ -0,0 +1,19 @@ +package timeutil + +import ( + "github.com/iotaledger/goshimmer/packages/daemon" + "time" +) + +func Ticker(handler func(), interval time.Duration) { + ticker := time.NewTicker(interval) + ticker: + for { + select { + case <- daemon.ShutdownSignal: + break ticker + case <- ticker.C: + handler() + } + } +} diff --git a/plugins/autopeering/parameters/parameters.go b/plugins/autopeering/parameters/parameters.go index f4cceaa3..8d5b29f7 100644 --- a/plugins/autopeering/parameters/parameters.go +++ b/plugins/autopeering/parameters/parameters.go @@ -4,7 +4,7 @@ import "github.com/iotaledger/goshimmer/packages/parameter" var ( ADDRESS = parameter.AddString("AUTOPEERING/ADDRESS", "0.0.0.0", "address to bind for incoming peering requests") - ENTRY_NODES = parameter.AddString("AUTOPEERING/ENTRY_NODES", "tcp://82.165.29.179:14626", "list of trusted entry nodes for auto peering") + ENTRY_NODES = parameter.AddString("AUTOPEERING/ENTRY_NODES", "tcp://0d828930890386f036eb77982cc067c5429f7b8f@82.165.29.179:14626", "list of trusted entry nodes for auto peering") TCP_PORT = parameter.AddInt("AUTOPEERING/TCP_PORT", 14626, "tcp port for incoming peering requests") UDP_PORT = parameter.AddInt("AUTOPEERING/UDP_PORT", 14626, "udp port for incoming peering requests") ) diff --git a/plugins/autopeering/peermanager/accepted_neighbors.go b/plugins/autopeering/peermanager/accepted_neighbors.go deleted file mode 100644 index ff1e87ed..00000000 --- a/plugins/autopeering/peermanager/accepted_neighbors.go +++ /dev/null @@ -1,5 +0,0 @@ -package peermanager - -import "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" - -var ACCEPTED_NEIGHBORS = &PeerList{make(map[string]*peer.Peer)} diff --git a/plugins/autopeering/peermanager/chosen_neighbor_candidates.go b/plugins/autopeering/peermanager/chosen_neighbor_candidates.go new file mode 100644 index 00000000..52f41864 --- /dev/null +++ b/plugins/autopeering/peermanager/chosen_neighbor_candidates.go @@ -0,0 +1,25 @@ +package peermanager + +import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/peermanager/types" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" +) + +var CHOSEN_NEIGHBOR_CANDIDATES types.PeerList + +var CHOSEN_NEIGHBOR_DISTANCE = func(req *request.Request) func(p *peer.Peer) float64 { + return func(p *peer.Peer) float64 { + return 1 + } +} + +func init() { + updateNeighborCandidates() + + Events.UpdateNeighborhood.Attach(updateNeighborCandidates) +} + +func updateNeighborCandidates() { + CHOSEN_NEIGHBOR_CANDIDATES = NEIGHBORHOOD.List().Sort(CHOSEN_NEIGHBOR_DISTANCE(request.OUTGOING_REQUEST)) +} diff --git a/plugins/autopeering/peermanager/chosen_neighbors.go b/plugins/autopeering/peermanager/chosen_neighbors.go deleted file mode 100644 index 595d8571..00000000 --- a/plugins/autopeering/peermanager/chosen_neighbors.go +++ /dev/null @@ -1,5 +0,0 @@ -package peermanager - -import "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" - -var CHOSEN_NEIGHBORS = &PeerList{make(map[string]*peer.Peer)} diff --git a/plugins/autopeering/peermanager/constants.go b/plugins/autopeering/peermanager/constants.go deleted file mode 100644 index df72cfb2..00000000 --- a/plugins/autopeering/peermanager/constants.go +++ /dev/null @@ -1,9 +0,0 @@ -package peermanager - -import ( - "time" -) - -const ( - FIND_NEIGHBOR_INTERVAL = 5 * time.Second -) diff --git a/plugins/autopeering/peermanager/entry_nodes.go b/plugins/autopeering/peermanager/entry_nodes.go index e5d6b27a..f7f7b22c 100644 --- a/plugins/autopeering/peermanager/entry_nodes.go +++ b/plugins/autopeering/peermanager/entry_nodes.go @@ -1,9 +1,11 @@ package peermanager import ( + "github.com/iotaledger/goshimmer/packages/identity" "github.com/iotaledger/goshimmer/plugins/autopeering/parameters" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol" + peermanagerTypes "github.com/iotaledger/goshimmer/plugins/autopeering/peermanager/types" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/types" "net" "strconv" "strings" @@ -11,8 +13,8 @@ import ( var ENTRY_NODES = parseEntryNodes() -func parseEntryNodes() []*peer.Peer { - result := make([]*peer.Peer, 0) +func parseEntryNodes() peermanagerTypes.PeerList { + result := make(peermanagerTypes.PeerList, 0) for _, entryNodeDefinition := range strings.Fields(*parameters.ENTRY_NODES.Value) { if entryNodeDefinition == "" { @@ -29,12 +31,20 @@ func parseEntryNodes() []*peer.Peer { } switch protocolBits[0] { case "tcp": - entryNode.PeeringProtocolType = protocol.PROTOCOL_TYPE_TCP + entryNode.PeeringProtocolType = types.PROTOCOL_TYPE_TCP case "udp": - entryNode.PeeringProtocolType = protocol.PROTOCOL_TYPE_UDP + entryNode.PeeringProtocolType = types.PROTOCOL_TYPE_UDP } - addressBits := strings.Split(protocolBits[1], ":") + identityBits := strings.Split(protocolBits[1], "@") + if len(identityBits) != 2 { + panic("error while parsing identity of entry node: " + entryNodeDefinition) + } + entryNode.Identity = &identity.Identity{ + StringIdentifier: identityBits[0], + } + + addressBits := strings.Split(identityBits[1], ":") switch len(addressBits) { case 2: host := addressBits[0] diff --git a/plugins/autopeering/peermanager/events.go b/plugins/autopeering/peermanager/events.go new file mode 100644 index 00000000..db39e888 --- /dev/null +++ b/plugins/autopeering/peermanager/events.go @@ -0,0 +1,31 @@ +package peermanager + +import "reflect" + +var Events = moduleEvents{ + UpdateNeighborhood: &callbackEvent{make(map[uintptr]Callback)}, +} + +type moduleEvents struct { + UpdateNeighborhood *callbackEvent +} + +type callbackEvent struct { + callbacks map[uintptr]Callback +} + +func (this *callbackEvent) Attach(callback Callback) { + this.callbacks[reflect.ValueOf(callback).Pointer()] = callback +} + +func (this *callbackEvent) Detach(callback Callback) { + delete(this.callbacks, reflect.ValueOf(callback).Pointer()) +} + +func (this *callbackEvent) Trigger() { + for _, callback := range this.callbacks { + callback() + } +} + +type Callback = func() \ No newline at end of file diff --git a/plugins/autopeering/peermanager/known_peers.go b/plugins/autopeering/peermanager/known_peers.go index d8aeba0a..1b23fb82 100644 --- a/plugins/autopeering/peermanager/known_peers.go +++ b/plugins/autopeering/peermanager/known_peers.go @@ -1,5 +1,16 @@ package peermanager -import "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" +import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/peermanager/types" +) -var KNOWN_PEERS = &PeerList{make(map[string]*peer.Peer)} +var KNOWN_PEERS = initKnownPeers() + +func initKnownPeers() types.PeerRegister { + knownPeers := make(types.PeerRegister) + for _, entryNode := range ENTRY_NODES { + knownPeers.AddOrUpdate(entryNode) + } + + return knownPeers +} \ No newline at end of file diff --git a/plugins/autopeering/peermanager/neighborhood.go b/plugins/autopeering/peermanager/neighborhood.go new file mode 100644 index 00000000..e6d95d89 --- /dev/null +++ b/plugins/autopeering/peermanager/neighborhood.go @@ -0,0 +1,42 @@ +package peermanager + +import ( + "github.com/iotaledger/goshimmer/packages/timeutil" + "github.com/iotaledger/goshimmer/plugins/autopeering/peermanager/types" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" + "time" +) + +var NEIGHBORHOOD types.PeerRegister + +var NEIGHBORHOOD_LIST types.PeerList + +// Selects a fixed neighborhood from all known peers - this allows nodes to "stay in the same circles" that share their +// view on the ledger an is a preparation for economic clustering +var NEIGHBORHOOD_SELECTOR = func(this types.PeerRegister, req *request.Request) types.PeerRegister { + filteredPeers := make(types.PeerRegister) + for id, peer := range this { + filteredPeers[id] = peer + } + + return filteredPeers +} + +var lastUpdate = time.Now() + +func init() { + updateNeighborHood() + + go timeutil.Ticker(updateNeighborHood, 1 * time.Second) +} + +func updateNeighborHood() { + if float64(len(NEIGHBORHOOD)) * 1.2 <= float64(len(KNOWN_PEERS)) || lastUpdate.Before(time.Now().Add(-300 * time.Second)) { + NEIGHBORHOOD = KNOWN_PEERS.Filter(NEIGHBORHOOD_SELECTOR, request.OUTGOING_REQUEST) + NEIGHBORHOOD_LIST = NEIGHBORHOOD.List() + + lastUpdate = time.Now() + + Events.UpdateNeighborhood.Trigger() + } +} \ No newline at end of file diff --git a/plugins/autopeering/peermanager/peer_list.go b/plugins/autopeering/peermanager/peer_list.go deleted file mode 100644 index c8add980..00000000 --- a/plugins/autopeering/peermanager/peer_list.go +++ /dev/null @@ -1,45 +0,0 @@ -package peermanager - -import ( - "bytes" - "github.com/iotaledger/goshimmer/packages/accountability" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" -) - -type PeerList struct { - Peers map[string]*peer.Peer -} - -func (this *PeerList) Update(peer *peer.Peer) bool { - if peer.Identity == nil || bytes.Equal(peer.Identity.Identifier, accountability.OWN_ID.Identifier) { - return false - } - - if existingPeer, exists := this.Peers[peer.Identity.StringIdentifier]; exists { - existingPeer.Address = peer.Address - existingPeer.GossipPort = peer.GossipPort - existingPeer.PeeringPort = peer.PeeringPort - - // trigger update peer - - return false - } else { - this.Peers[peer.Identity.StringIdentifier] = peer - - // trigger add peer - - return true - } -} - -func (this *PeerList) Add(peer *peer.Peer) { - this.Peers[peer.Identity.StringIdentifier] = peer -} - -func (this *PeerList) Contains(key string) bool { - if _, exists := this.Peers[key]; exists { - return true - } else { - return false - } -} diff --git a/plugins/autopeering/peermanager/peer_manager.go b/plugins/autopeering/peermanager/peer_manager.go deleted file mode 100644 index b9979c53..00000000 --- a/plugins/autopeering/peermanager/peer_manager.go +++ /dev/null @@ -1,250 +0,0 @@ -package peermanager - -import ( - "github.com/iotaledger/goshimmer/packages/daemon" - "github.com/iotaledger/goshimmer/packages/network" - "github.com/iotaledger/goshimmer/packages/node" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response" - "github.com/iotaledger/goshimmer/plugins/autopeering/server/tcp" - "github.com/iotaledger/goshimmer/plugins/autopeering/server/udp" - "net" - "strconv" - "time" -) - -func Configure(plugin *node.Plugin) { - configurePeeringRequest() - - // setup processing of peering requests - udp.Events.ReceiveRequest.Attach(func(peeringRequest *request.Request) { - processPeeringRequest(plugin, peeringRequest, nil) - }) - tcp.Events.ReceiveRequest.Attach(func(conn *network.ManagedConnection, peeringRequest *request.Request) { - processPeeringRequest(plugin, peeringRequest, conn) - }) - - // setup processing of peering responses - udp.Events.ReceiveResponse.Attach(func(peeringResponse *response.Response) { - processPeeringResponse(plugin, peeringResponse, nil) - }) - tcp.Events.ReceiveResponse.Attach(func(conn *network.ManagedConnection, peeringResponse *response.Response) { - processPeeringResponse(plugin, peeringResponse, conn) - }) - - udp.Events.Error.Attach(func(ip net.IP, err error) { - plugin.LogDebug("error when communicating with " + ip.String() + ": " + err.Error()) - }) - tcp.Events.Error.Attach(func(ip net.IP, err error) { - plugin.LogDebug("invalid peering request from " + ip.String() + ": " + err.Error()) - }) -} - -func Run(plugin *node.Plugin) { - // setup background worker that contacts "chosen neighbors" - daemon.BackgroundWorker(func() { - plugin.LogInfo("Starting Peer Manager ...") - plugin.LogSuccess("Starting Peer Manager ... done") - - sendPeeringRequests(plugin) - - ticker := time.NewTicker(FIND_NEIGHBOR_INTERVAL) - for { - select { - case <-daemon.ShutdownSignal: - return - case <-ticker.C: - sendPeeringRequests(plugin) - } - } - }) -} - -func Shutdown(plugin *node.Plugin) { - plugin.LogInfo("Stopping Peer Manager ...") - plugin.LogSuccess("Stopping Peer Manager ... done") -} - -func processPeeringRequest(plugin *node.Plugin, peeringRequest *request.Request, conn net.Conn) { - if KNOWN_PEERS.Update(peeringRequest.Issuer) { - plugin.LogInfo("new peer detected: " + peeringRequest.Issuer.Address.String() + " / " + peeringRequest.Issuer.Identity.StringIdentifier) - } - - if conn == nil { - plugin.LogDebug("received UDP peering request from " + peeringRequest.Issuer.Identity.StringIdentifier) - - var protocolString string - switch peeringRequest.Issuer.PeeringProtocolType { - case protocol.PROTOCOL_TYPE_TCP: - protocolString = "tcp" - case protocol.PROTOCOL_TYPE_UDP: - protocolString = "udp" - default: - plugin.LogFailure("unsupported peering protocol in request from " + peeringRequest.Issuer.Address.String()) - - return - } - - var err error - conn, err = net.Dial(protocolString, peeringRequest.Issuer.Address.String() + ":" + strconv.Itoa(int(peeringRequest.Issuer.PeeringPort))) - if err != nil { - plugin.LogDebug("error when connecting to " + peeringRequest.Issuer.Address.String() + " during peering process: " + err.Error()) - - return - } - } else { - plugin.LogDebug("received TCP peering request from " + peeringRequest.Issuer.Identity.StringIdentifier) - } - - sendFittingPeeringResponse(conn) -} - -func processPeeringResponse(plugin *node.Plugin, peeringResponse *response.Response, conn *network.ManagedConnection) { - if KNOWN_PEERS.Update(peeringResponse.Issuer) { - plugin.LogInfo("new peer detected: " + peeringResponse.Issuer.Address.String() + " / " + peeringResponse.Issuer.Identity.StringIdentifier) - } - for _, peer := range peeringResponse.Peers { - if KNOWN_PEERS.Update(peer) { - plugin.LogInfo("new peer detected: " + peer.Address.String() + " / " + peer.Identity.StringIdentifier) - } - } - - if conn == nil { - plugin.LogDebug("received UDP peering response from " + peeringResponse.Issuer.Identity.StringIdentifier) - } else { - plugin.LogDebug("received TCP peering response from " + peeringResponse.Issuer.Identity.StringIdentifier) - - conn.Close() - } - - switch peeringResponse.Type { - case response.TYPE_ACCEPT: - CHOSEN_NEIGHBORS.Update(peeringResponse.Issuer) - case response.TYPE_REJECT: - default: - plugin.LogDebug("invalid response type in peering response of " + peeringResponse.Issuer.Address.String() + ":" + strconv.Itoa(int(peeringResponse.Issuer.PeeringPort))) - } -} - -func sendFittingPeeringResponse(conn net.Conn) { - var peeringResponse *response.Response - if len(ACCEPTED_NEIGHBORS.Peers) < protocol.NEIGHBOR_COUNT/2 { - peeringResponse = generateAcceptResponse() - } else { - peeringResponse = generateRejectResponse() - } - - peeringResponse.Sign() - - conn.Write(peeringResponse.Marshal()) - conn.Close() -} - -func generateAcceptResponse() *response.Response { - peeringResponse := &response.Response{ - Type: response.TYPE_ACCEPT, - Issuer: PEERING_REQUEST.Issuer, - Peers: generateProposedNodeCandidates(), - } - - return peeringResponse -} - -func generateRejectResponse() *response.Response { - peeringResponse := &response.Response{ - Type: response.TYPE_REJECT, - Issuer: PEERING_REQUEST.Issuer, - Peers: generateProposedNodeCandidates(), - } - - return peeringResponse -} - - -func generateProposedNodeCandidates() []*peer.Peer { - peers := make([]*peer.Peer, 0) - for _, peer := range KNOWN_PEERS.Peers { - peers = append(peers, peer) - } - - return peers -} - -//region PEERING REQUEST RELATED METHODS /////////////////////////////////////////////////////////////////////////////// - -func sendPeeringRequests(plugin *node.Plugin) { - for _, peer := range getChosenNeighborCandidates() { - sendPeeringRequest(plugin, peer) - } -} - -func sendPeeringRequest(plugin *node.Plugin, peer *peer.Peer) { - switch peer.PeeringProtocolType { - case protocol.PROTOCOL_TYPE_TCP: - sendTCPPeeringRequest(plugin, peer) - - case protocol.PROTOCOL_TYPE_UDP: - sendUDPPeeringRequest(plugin, peer) - - default: - panic("invalid protocol in known peers") - } -} - -func sendTCPPeeringRequest(plugin *node.Plugin, peer *peer.Peer) { - go func() { - tcpConnection, err := net.Dial("tcp", peer.Address.String() + ":" + strconv.Itoa(int(peer.PeeringPort))) - if err != nil { - plugin.LogDebug("error while trying to send TCP peering request to " + peer.String() + ": " + err.Error()) - } else { - mConn := network.NewManagedConnection(tcpConnection) - - plugin.LogDebug("sending TCP peering request to " + peer.String()) - - if _, err := mConn.Write(PEERING_REQUEST.Marshal()); err != nil { - plugin.LogDebug("error while trying to send TCP peering request to " + peer.String() + ": " + err.Error()) - - return - } - - tcp.HandleConnection(mConn) - } - }() -} - -func sendUDPPeeringRequest(plugin *node.Plugin, peer *peer.Peer) { - go func() { - udpConnection, err := net.Dial("udp", peer.Address.String()+":"+strconv.Itoa(int(peer.PeeringPort))) - if err != nil { - plugin.LogDebug("error while trying to send peering request to " + peer.Address.String() + ":" + strconv.Itoa(int(peer.PeeringPort)) + " / " + peer.Identity.StringIdentifier + ": " + err.Error()) - } else { - mConn := network.NewManagedConnection(udpConnection) - - if _, err := mConn.Write(PEERING_REQUEST.Marshal()); err != nil { - plugin.LogDebug("error while trying to send peering request to " + peer.Address.String() + ":" + strconv.Itoa(int(peer.PeeringPort)) + " / " + peer.Identity.StringIdentifier + ": " + err.Error()) - - return - } - - // setup listener for incoming responses - } - }() -} - -func getChosenNeighborCandidates() []*peer.Peer { - result := make([]*peer.Peer, 0) - - for _, peer := range KNOWN_PEERS.Peers { - result = append(result, peer) - } - - for _, peer := range ENTRY_NODES { - result = append(result, peer) - } - - return result -} - -//endregion //////////////////////////////////////////////////////////////////////////////////////////////////////////// \ No newline at end of file diff --git a/plugins/autopeering/peermanager/peering_request.go b/plugins/autopeering/peermanager/peering_request.go deleted file mode 100644 index 4475b6f4..00000000 --- a/plugins/autopeering/peermanager/peering_request.go +++ /dev/null @@ -1,33 +0,0 @@ -package peermanager - -import ( - "github.com/iotaledger/goshimmer/packages/accountability" - "github.com/iotaledger/goshimmer/plugins/autopeering/parameters" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/salt" - "github.com/iotaledger/goshimmer/plugins/autopeering/saltmanager" - "net" -) - -var PEERING_REQUEST *request.Request - -func configurePeeringRequest() { - PEERING_REQUEST = &request.Request{ - Issuer: &peer.Peer{ - Identity: accountability.OWN_ID, - PeeringProtocolType: protocol.PROTOCOL_TYPE_TCP, - PeeringPort: uint16(*parameters.UDP_PORT.Value), - GossipProtocolType: protocol.PROTOCOL_TYPE_TCP, - GossipPort: uint16(*parameters.UDP_PORT.Value), - Address: net.IPv4(0, 0, 0, 0), - }, - Salt: saltmanager.PUBLIC_SALT, - } - PEERING_REQUEST.Sign() - - saltmanager.Events.UpdatePublicSalt.Attach(func(salt *salt.Salt) { - PEERING_REQUEST.Sign() - }) -} diff --git a/plugins/autopeering/peermanager/types/peer_list.go b/plugins/autopeering/peermanager/types/peer_list.go new file mode 100644 index 00000000..3bcc3f86 --- /dev/null +++ b/plugins/autopeering/peermanager/types/peer_list.go @@ -0,0 +1,33 @@ +package types + +import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + "sort" +) + +type PeerList []*peer.Peer + +func (this PeerList) Filter(predicate func(p *peer.Peer) bool) PeerList { + peerList := make(PeerList, len(this)) + + counter := 0 + for _, peer := range this { + if predicate(peer) { + peerList[counter] = peer + counter++ + } + } + + return peerList[:counter] +} + +// Sorts the PeerRegister by their distance to an anchor. +func (this PeerList) Sort(distance func(p *peer.Peer) float64) PeerList { + sort.Slice(this, func(i, j int) bool { + return distance(this[i]) < distance(this[j]) + }) + + return this +} + + diff --git a/plugins/autopeering/peermanager/types/peer_register.go b/plugins/autopeering/peermanager/types/peer_register.go new file mode 100644 index 00000000..c4cbbb3f --- /dev/null +++ b/plugins/autopeering/peermanager/types/peer_register.go @@ -0,0 +1,57 @@ +package types + +import ( + "bytes" + "github.com/iotaledger/goshimmer/packages/accountability" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" +) + +type PeerRegister map[string]*peer.Peer + +// returns true if a new entry was added +func (this PeerRegister) AddOrUpdate(peer *peer.Peer) bool { + if peer.Identity == nil || bytes.Equal(peer.Identity.Identifier, accountability.OWN_ID.Identifier) { + return false + } + + if existingPeer, exists := this[peer.Identity.StringIdentifier]; exists { + existingPeer.Address = peer.Address + existingPeer.GossipPort = peer.GossipPort + existingPeer.PeeringPort = peer.PeeringPort + + // trigger update peer + + return false + } else { + this[peer.Identity.StringIdentifier] = peer + + // trigger add peer + + return true + } +} + +func (this PeerRegister) Contains(key string) bool { + if _, exists := this[key]; exists { + return true + } else { + return false + } +} + +func (this PeerRegister) Filter(filterFn func(this PeerRegister, req *request.Request) PeerRegister, req *request.Request) PeerRegister { + return filterFn(this, req) +} + +func (this PeerRegister) List() PeerList { + peerList := make(PeerList, len(this)) + + counter := 0 + for _, currentPeer := range this { + peerList[counter] = currentPeer + counter++ + } + + return peerList +} diff --git a/plugins/autopeering/plugin.go b/plugins/autopeering/plugin.go index d26a4e1f..4f8590d6 100644 --- a/plugins/autopeering/plugin.go +++ b/plugins/autopeering/plugin.go @@ -4,22 +4,44 @@ import ( "github.com/iotaledger/goshimmer/packages/daemon" "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/plugins/autopeering/peermanager" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response" "github.com/iotaledger/goshimmer/plugins/autopeering/server" + "github.com/iotaledger/goshimmer/plugins/gossip/neighbormanager" ) func configure(plugin *node.Plugin) { server.Configure(plugin) - peermanager.Configure(plugin) + protocol.Configure(plugin) daemon.Events.Shutdown.Attach(func() { server.Shutdown(plugin) - peermanager.Shutdown(plugin) + }) + + protocol.Events.IncomingRequestAccepted.Attach(func(req *request.Request) { + if neighbormanager.ACCEPTED_NEIGHBORS.AddOrUpdate(req.Issuer) { + plugin.LogSuccess("new neighbor accepted: " + req.Issuer.Address.String() + " / " + req.Issuer.Identity.StringIdentifier) + } + }) + + protocol.Events.OutgoingRequestAccepted.Attach(func(res *response.Response) { + if neighbormanager.CHOSEN_NEIGHBORS.AddOrUpdate(res.Issuer) { + plugin.LogSuccess("new neighbor chosen: " + res.Issuer.Address.String() + " / " + res.Issuer.Identity.StringIdentifier) + } + }) + + protocol.Events.DiscoverPeer.Attach(func(p *peer.Peer) { + if peermanager.KNOWN_PEERS.AddOrUpdate(p) { + plugin.LogInfo("new peer detected: " + p.Address.String() + " / " + p.Identity.StringIdentifier) + } }) } func run(plugin *node.Plugin) { server.Run(plugin) - peermanager.Run(plugin) + protocol.Run(plugin) } var PLUGIN = node.NewPlugin("Auto Peering", configure, run) diff --git a/plugins/autopeering/protocol/chosen_neighbor_processor.go b/plugins/autopeering/protocol/chosen_neighbor_processor.go new file mode 100644 index 00000000..d76a4a19 --- /dev/null +++ b/plugins/autopeering/protocol/chosen_neighbor_processor.go @@ -0,0 +1,61 @@ +package protocol + +import ( + "github.com/iotaledger/goshimmer/packages/accountability" + "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/plugins/autopeering/peermanager" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/constants" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" + "github.com/iotaledger/goshimmer/plugins/autopeering/server/tcp" + "github.com/iotaledger/goshimmer/plugins/gossip/neighbormanager" + "time" +) + +func createChosenNeighborProcessor(plugin *node.Plugin) func() { + return func() { + plugin.LogInfo("Starting Chosen Neighbor Processor ...") + plugin.LogSuccess("Starting Chosen Neighbor Processor ... done") + + chooseNeighbors(plugin) + + ticker := time.NewTicker(constants.FIND_NEIGHBOR_INTERVAL) + ticker: + for { + select { + case <- daemon.ShutdownSignal: + plugin.LogInfo("Stopping Chosen Neighbor Processor ...") + + break ticker + case <- ticker.C: + chooseNeighbors(plugin) + } + } + + plugin.LogSuccess("Stopping Chosen Neighbor Processor ... done") + } +} + +func chooseNeighbors(plugin *node.Plugin) { + for _, chosenNeighborCandidate := range peermanager.CHOSEN_NEIGHBOR_CANDIDATES { + go func(peer *peer.Peer) { + nodeId := peer.Identity.StringIdentifier + + if !neighbormanager.ACCEPTED_NEIGHBORS.Contains(nodeId) && + !neighbormanager.CHOSEN_NEIGHBORS.Contains(nodeId) && + accountability.OWN_ID.StringIdentifier != nodeId { + + if connectionAlive, err := request.Send(peer); err != nil { + plugin.LogDebug(err.Error()) + } else if connectionAlive { + plugin.LogDebug("sent TCP peering request to " + peer.String()) + + tcp.HandleConnection(peer.Conn) + } else { + plugin.LogDebug("sent UDP peering request to " + peer.String()) + } + } + }(chosenNeighborCandidate) + } +} diff --git a/plugins/autopeering/protocol/constants/constants.go b/plugins/autopeering/protocol/constants/constants.go new file mode 100644 index 00000000..4e26ea74 --- /dev/null +++ b/plugins/autopeering/protocol/constants/constants.go @@ -0,0 +1,11 @@ +package constants + +import "time" + +const ( + NEIGHBOR_COUNT = 8 + + FIND_NEIGHBOR_INTERVAL = 5 * time.Second + PING_RANDOM_PEERS_INTERVAL = 1 * time.Second + PING_RANDOM_PEERS_COUNT = 3 +) diff --git a/plugins/autopeering/protocol/error_handler.go b/plugins/autopeering/protocol/error_handler.go new file mode 100644 index 00000000..734a2f15 --- /dev/null +++ b/plugins/autopeering/protocol/error_handler.go @@ -0,0 +1,12 @@ +package protocol + +import ( + "github.com/iotaledger/goshimmer/packages/node" + "net" +) + +func createErrorHandler(plugin *node.Plugin) func(ip net.IP, err error) { + return func(ip net.IP, err error) { + plugin.LogDebug("error when communicating with " + ip.String() + ": " + err.Error()) + } +} diff --git a/plugins/autopeering/protocol/events.go b/plugins/autopeering/protocol/events.go new file mode 100644 index 00000000..6d4d78bc --- /dev/null +++ b/plugins/autopeering/protocol/events.go @@ -0,0 +1,84 @@ +package protocol + +import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response" + "reflect" +) + +var Events = protocolEvents{ + DiscoverPeer: &peerEvent{make(map[uintptr]PeerConsumer)}, + IncomingRequestAccepted: &requestEvent{make(map[uintptr]RequestConsumer)}, + IncomingRequestRejected: &requestEvent{make(map[uintptr]RequestConsumer)}, + OutgoingRequestAccepted: &responseEvent{make(map[uintptr]ResponseConsumer)}, + OutgoingRequestRejected: &responseEvent{make(map[uintptr]ResponseConsumer)}, +} + +type protocolEvents struct { + DiscoverPeer *peerEvent + IncomingRequestAccepted *requestEvent + IncomingRequestRejected *requestEvent + OutgoingRequestAccepted *responseEvent + OutgoingRequestRejected *responseEvent +} + +type peerEvent struct { + callbacks map[uintptr]PeerConsumer +} + +func (this *peerEvent) Attach(callback PeerConsumer) { + this.callbacks[reflect.ValueOf(callback).Pointer()] = callback +} + +func (this *peerEvent) Detach(callback PeerConsumer) { + delete(this.callbacks, reflect.ValueOf(callback).Pointer()) +} + +func (this *peerEvent) Trigger(p *peer.Peer) { + for _, callback := range this.callbacks { + callback(p) + } +} + +type requestEvent struct { + callbacks map[uintptr]RequestConsumer +} + +func (this *requestEvent) Attach(callback RequestConsumer) { + this.callbacks[reflect.ValueOf(callback).Pointer()] = callback +} + +func (this *requestEvent) Detach(callback RequestConsumer) { + delete(this.callbacks, reflect.ValueOf(callback).Pointer()) +} + +func (this *requestEvent) Trigger(req *request.Request) { + for _, callback := range this.callbacks { + callback(req) + } +} + +type responseEvent struct { + callbacks map[uintptr]ResponseConsumer +} + +func (this *responseEvent) Attach(callback ResponseConsumer) { + this.callbacks[reflect.ValueOf(callback).Pointer()] = callback +} + +func (this *responseEvent) Detach(callback ResponseConsumer) { + delete(this.callbacks, reflect.ValueOf(callback).Pointer()) +} + +func (this *responseEvent) Trigger(res *response.Response) { + for _, callback := range this.callbacks { + callback(res) + } +} + +type PeerConsumer = func(p *peer.Peer) + +type RequestConsumer = func(req *request.Request) + +type ResponseConsumer = func(res *response.Response) diff --git a/plugins/autopeering/protocol/incoming_ping_processor.go b/plugins/autopeering/protocol/incoming_ping_processor.go new file mode 100644 index 00000000..f2b30f9a --- /dev/null +++ b/plugins/autopeering/protocol/incoming_ping_processor.go @@ -0,0 +1,18 @@ +package protocol + +import ( + "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/ping" +) + +func createIncomingPingProcessor(plugin *node.Plugin) func(p *ping.Ping) { + return func(p *ping.Ping) { + if p.Issuer.Conn != nil { + plugin.LogDebug("received TCP ping from " + p.Issuer.String()) + } else { + plugin.LogDebug("received UDP ping from " + p.Issuer.String()) + } + + Events.DiscoverPeer.Trigger(p.Issuer) + } +} diff --git a/plugins/autopeering/protocol/incoming_request_processor.go b/plugins/autopeering/protocol/incoming_request_processor.go new file mode 100644 index 00000000..2f94ff19 --- /dev/null +++ b/plugins/autopeering/protocol/incoming_request_processor.go @@ -0,0 +1,47 @@ +package protocol + +import ( + "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/plugins/autopeering/peermanager" + "github.com/iotaledger/goshimmer/plugins/autopeering/peermanager/types" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/constants" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" + "github.com/iotaledger/goshimmer/plugins/gossip/neighbormanager" +) + +func createIncomingRequestProcessor(plugin *node.Plugin) func(req *request.Request) { + return func(req *request.Request) { + Events.DiscoverPeer.Trigger(req.Issuer) + + if req.Issuer.Conn != nil { + plugin.LogDebug("received TCP peering request from " + req.Issuer.String()) + } else { + plugin.LogDebug("received UDP peering request from " + req.Issuer.String()) + } + + if len(neighbormanager.ACCEPTED_NEIGHBORS) <= constants.NEIGHBOR_COUNT / 2 { + if err := req.Accept(proposedPeeringCandidates(req)); err != nil { + plugin.LogDebug("error when sending response to" + req.Issuer.String()) + } + + plugin.LogDebug("sent positive peering response to " + req.Issuer.String()) + + Events.IncomingRequestAccepted.Trigger(req) + } else { + if err := req.Reject(proposedPeeringCandidates(req)); err != nil { + plugin.LogDebug("error when sending response to" + req.Issuer.String()) + } + + plugin.LogDebug("sent negative peering response to " + req.Issuer.String()) + + Events.IncomingRequestRejected.Trigger(req) + } + } +} + +func proposedPeeringCandidates(req *request.Request) types.PeerList { + return peermanager.NEIGHBORHOOD.List().Filter(func(p *peer.Peer) bool { + return p.Identity.PublicKey != nil + }).Sort(peermanager.CHOSEN_NEIGHBOR_DISTANCE(req)) +} \ No newline at end of file diff --git a/plugins/autopeering/protocol/incoming_response_processor.go b/plugins/autopeering/protocol/incoming_response_processor.go new file mode 100644 index 00000000..d99cf1cd --- /dev/null +++ b/plugins/autopeering/protocol/incoming_response_processor.go @@ -0,0 +1,33 @@ +package protocol + +import ( + "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response" + "strconv" +) + +func createIncomingResponseProcessor(plugin *node.Plugin) func(peeringResponse *response.Response) { + return func(peeringResponse *response.Response) { + Events.DiscoverPeer.Trigger(peeringResponse.Issuer) + for _, peer := range peeringResponse.Peers { + Events.DiscoverPeer.Trigger(peer) + } + + if peeringResponse.Issuer.Conn == nil { + plugin.LogDebug("received UDP peering response from " + peeringResponse.Issuer.String()) + } else { + plugin.LogDebug("received TCP peering response from " + peeringResponse.Issuer.String()) + + peeringResponse.Issuer.Conn.Close() + } + + switch peeringResponse.Type { + case response.TYPE_ACCEPT: + Events.OutgoingRequestAccepted.Trigger(peeringResponse) + case response.TYPE_REJECT: + Events.OutgoingRequestRejected.Trigger(peeringResponse) + default: + plugin.LogDebug("invalid response type in peering response of " + peeringResponse.Issuer.Address.String() + ":" + strconv.Itoa(int(peeringResponse.Issuer.PeeringPort))) + } + } +} diff --git a/plugins/autopeering/protocol/outgoing_ping_processor.go b/plugins/autopeering/protocol/outgoing_ping_processor.go new file mode 100644 index 00000000..808cb1ae --- /dev/null +++ b/plugins/autopeering/protocol/outgoing_ping_processor.go @@ -0,0 +1,83 @@ +package protocol + +import ( + "github.com/iotaledger/goshimmer/packages/accountability" + "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/plugins/autopeering/parameters" + "github.com/iotaledger/goshimmer/plugins/autopeering/peermanager" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/constants" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/ping" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/salt" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/types" + "github.com/iotaledger/goshimmer/plugins/autopeering/saltmanager" + "github.com/iotaledger/goshimmer/plugins/gossip/neighbormanager" + "math/rand" + "net" + "time" +) + +func createOutgoingPingProcessor(plugin *node.Plugin) func() { + return func() { + plugin.LogInfo("Starting Ping Processor ...") + plugin.LogSuccess("Starting Ping Processor ... done") + + outgoingPing := &ping.Ping{ + Issuer: &peer.Peer{ + Identity: accountability.OWN_ID, + PeeringProtocolType: types.PROTOCOL_TYPE_TCP, + PeeringPort: uint16(*parameters.UDP_PORT.Value), + GossipProtocolType: types.PROTOCOL_TYPE_TCP, + GossipPort: uint16(*parameters.UDP_PORT.Value), + Address: net.IPv4(0, 0, 0, 0), + Salt: saltmanager.PUBLIC_SALT, + }, + } + outgoingPing.Sign() + + saltmanager.Events.UpdatePublicSalt.Attach(func(salt *salt.Salt) { + outgoingPing.Sign() + }) + + pingPeers(plugin, outgoingPing) + + ticker := time.NewTicker(constants.PING_RANDOM_PEERS_INTERVAL) + ticker: + for { + select { + case <- daemon.ShutdownSignal: + plugin.LogInfo("Stopping Ping Processor ...") + + break ticker + case <- ticker.C: + pingPeers(plugin, outgoingPing) + } + } + + plugin.LogSuccess("Stopping Ping Processor ... done") + } +} + +func pingPeers(plugin *node.Plugin, outgoingPing *ping.Ping) { + chosenPeers := make(map[string]*peer.Peer) + + for i := 0; i < constants.PING_RANDOM_PEERS_COUNT; i++ { + randomNeighborHoodPeer := peermanager.NEIGHBORHOOD_LIST[rand.Intn(len(peermanager.NEIGHBORHOOD_LIST))] + + nodeId := randomNeighborHoodPeer.Identity.StringIdentifier + + if !neighbormanager.ACCEPTED_NEIGHBORS.Contains(nodeId) && !neighbormanager.CHOSEN_NEIGHBORS.Contains(nodeId) && + nodeId != accountability.OWN_ID.StringIdentifier { + chosenPeers[randomNeighborHoodPeer.Identity.StringIdentifier] = randomNeighborHoodPeer + } + } + + for _, chosenPeer := range chosenPeers { + go func(chosenPeer *peer.Peer) { + chosenPeer.Send(outgoingPing.Marshal(), false) + + plugin.LogDebug("sent ping to " + chosenPeer.String()) + }(chosenPeer) + } +} \ No newline at end of file diff --git a/plugins/autopeering/protocol/peer/constants.go b/plugins/autopeering/protocol/peer/constants.go index b1b65ea4..53d53dd5 100644 --- a/plugins/autopeering/protocol/peer/constants.go +++ b/plugins/autopeering/protocol/peer/constants.go @@ -2,6 +2,7 @@ package peer import ( "github.com/iotaledger/goshimmer/packages/identity" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/salt" ) const ( @@ -12,6 +13,7 @@ const ( MARSHALLED_PEERING_PORT_START = MARSHALLED_PEERING_PROTOCOL_TYPE_END MARSHALLED_GOSSIP_PROTOCOL_TYPE_START = MARSHALLED_PEERING_PORT_END MARSHALLED_GOSSIP_PORT_START = MARSHALLED_GOSSIP_PROTOCOL_TYPE_END + MARSHALLED_SALT_START = MARSHALLED_GOSSIP_PORT_END MARSHALLED_PUBLIC_KEY_END = MARSHALLED_PUBLIC_KEY_START + MARSHALLED_PUBLIC_KEY_SIZE MARSHALLED_ADDRESS_TYPE_END = MARSHALLED_ADDRESS_TYPE_START + MARSHALLED_ADDRESS_TYPE_SIZE @@ -20,6 +22,7 @@ const ( MARSHALLED_PEERING_PORT_END = MARSHALLED_PEERING_PORT_START + MARSHALLED_PEERING_PORT_SIZE MARSHALLED_GOSSIP_PROTOCOL_TYPE_END = MARSHALLED_GOSSIP_PROTOCOL_TYPE_START + MARSHALLED_GOSSIP_PROTOCOL_TYPE_SIZE MARSHALLED_GOSSIP_PORT_END = MARSHALLED_GOSSIP_PORT_START + MARSHALLED_GOSSIP_PORT_SIZE + MARSHALLED_SALT_END = MARSHALLED_SALT_START + MARSHALLED_SALT_SIZE MARSHALLED_PUBLIC_KEY_SIZE = identity.PUBLIC_KEY_BYTE_LENGTH MARSHALLED_ADDRESS_TYPE_SIZE = 1 @@ -28,5 +31,6 @@ const ( MARSHALLED_PEERING_PORT_SIZE = 2 MARSHALLED_GOSSIP_PROTOCOL_TYPE_SIZE = 1 MARSHALLED_GOSSIP_PORT_SIZE = 2 - MARSHALLED_TOTAL_SIZE = MARSHALLED_GOSSIP_PORT_END + MARSHALLED_SALT_SIZE = salt.SALT_MARSHALLED_SIZE + MARSHALLED_TOTAL_SIZE = MARSHALLED_SALT_END ) diff --git a/plugins/autopeering/protocol/peer/peer.go b/plugins/autopeering/protocol/peer/peer.go index a004bfd6..8018281f 100644 --- a/plugins/autopeering/protocol/peer/peer.go +++ b/plugins/autopeering/protocol/peer/peer.go @@ -3,19 +3,25 @@ package peer import ( "encoding/binary" "github.com/iotaledger/goshimmer/packages/identity" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol" + "github.com/iotaledger/goshimmer/packages/network" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/salt" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/types" "github.com/pkg/errors" "net" "strconv" + "sync" ) type Peer struct { Identity *identity.Identity Address net.IP - PeeringProtocolType protocol.ProtocolType + PeeringProtocolType types.ProtocolType PeeringPort uint16 - GossipProtocolType protocol.ProtocolType + GossipProtocolType types.ProtocolType GossipPort uint16 + Salt *salt.Salt + Conn *network.ManagedConnection + connectMutex sync.Mutex } func Unmarshal(data []byte) (*Peer, error) { @@ -28,21 +34,78 @@ func Unmarshal(data []byte) (*Peer, error) { } switch data[MARSHALLED_ADDRESS_TYPE_START] { - case protocol.ADDRESS_TYPE_IPV4: + case types.ADDRESS_TYPE_IPV4: peer.Address = net.IP(data[MARSHALLED_ADDRESS_START:MARSHALLED_ADDRESS_END]).To4() - case protocol.ADDRESS_TYPE_IPV6: + case types.ADDRESS_TYPE_IPV6: peer.Address = net.IP(data[MARSHALLED_ADDRESS_START:MARSHALLED_ADDRESS_END]).To16() } - peer.PeeringProtocolType = protocol.ProtocolType(data[MARSHALLED_PEERING_PROTOCOL_TYPE_START]) + peer.PeeringProtocolType = types.ProtocolType(data[MARSHALLED_PEERING_PROTOCOL_TYPE_START]) peer.PeeringPort = binary.BigEndian.Uint16(data[MARSHALLED_PEERING_PORT_START:MARSHALLED_PEERING_PORT_END]) - peer.GossipProtocolType = protocol.ProtocolType(data[MARSHALLED_GOSSIP_PROTOCOL_TYPE_START]) + peer.GossipProtocolType = types.ProtocolType(data[MARSHALLED_GOSSIP_PROTOCOL_TYPE_START]) peer.GossipPort = binary.BigEndian.Uint16(data[MARSHALLED_GOSSIP_PORT_START:MARSHALLED_GOSSIP_PORT_END]) + if unmarshalledSalt, err := salt.Unmarshal(data[MARSHALLED_SALT_START:MARSHALLED_SALT_END]); err != nil { + return nil, err + } else { + peer.Salt = unmarshalledSalt + } + return peer, nil } +func (peer *Peer) Send(data []byte, keepConnectionAlive bool) error { + newConnection, err := peer.Connect() + if err != nil { + return err + } + + if _, err := peer.Conn.Write(data); err != nil { + return err + } + + if newConnection && !keepConnectionAlive { + peer.Conn.Close() + } + + return nil +} + +func (peer *Peer) Connect() (bool, error) { + if peer.Conn == nil { + peer.connectMutex.Lock() + defer peer.connectMutex.Unlock() + + if peer.Conn == nil { + var protocolString string + switch peer.PeeringProtocolType { + case types.PROTOCOL_TYPE_TCP: + protocolString = "tcp" + case types.PROTOCOL_TYPE_UDP: + protocolString = "udp" + default: + return false, errors.New("unsupported peering protocol in peer " + peer.Address.String()) + } + + conn, err := net.Dial(protocolString, peer.Address.String()+":"+strconv.Itoa(int(peer.PeeringPort))) + if err != nil { + return false, errors.New("error when connecting to " + peer.Address.String() + " during peering process: " + err.Error()) + } else { + peer.Conn = network.NewManagedConnection(conn) + + peer.Conn.Events.Close.Attach(func() { + peer.Conn = nil + }) + + return true, nil + } + } + } + + return false, nil +} + func (peer *Peer) Marshal() []byte { result := make([]byte, MARSHALLED_TOTAL_SIZE) @@ -51,9 +114,9 @@ func (peer *Peer) Marshal() []byte { switch len(peer.Address) { case net.IPv4len: - result[MARSHALLED_ADDRESS_TYPE_START] = protocol.ADDRESS_TYPE_IPV4 + result[MARSHALLED_ADDRESS_TYPE_START] = types.ADDRESS_TYPE_IPV4 case net.IPv6len: - result[MARSHALLED_ADDRESS_TYPE_START] = protocol.ADDRESS_TYPE_IPV6 + result[MARSHALLED_ADDRESS_TYPE_START] = types.ADDRESS_TYPE_IPV6 default: panic("invalid address in peer") } @@ -66,6 +129,8 @@ func (peer *Peer) Marshal() []byte { result[MARSHALLED_GOSSIP_PROTOCOL_TYPE_START] = peer.GossipProtocolType binary.BigEndian.PutUint16(result[MARSHALLED_GOSSIP_PORT_START:MARSHALLED_GOSSIP_PORT_END], peer.GossipPort) + copy(result[MARSHALLED_SALT_START:MARSHALLED_SALT_END], peer.Salt.Marshal()) + return result } diff --git a/plugins/autopeering/protocol/ping/constants.go b/plugins/autopeering/protocol/ping/constants.go new file mode 100644 index 00000000..aca4a719 --- /dev/null +++ b/plugins/autopeering/protocol/ping/constants.go @@ -0,0 +1,29 @@ +package ping + +import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/constants" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" +) + +const ( + MARSHALLED_PACKET_HEADER = 0x04 + + PACKET_HEADER_START = 0 + MARSHALLED_ISSUER_START = PACKET_HEADER_END + MARSHALLED_PEERS_START = MARSHALLED_ISSUER_END + MARSHALLED_SIGNATURE_START = MARSHALLED_PEERS_END + + PACKET_HEADER_END = PACKET_HEADER_START + PACKET_HEADER_SIZE + MARSHALLED_ISSUER_END = MARSHALLED_ISSUER_START + MARSHALLED_ISSUER_SIZE + MARSHALLED_PEERS_END = MARSHALLED_PEERS_START + MARSHALLED_PEERS_SIZE + MARSHALLED_SIGNATURE_END = MARSHALLED_SIGNATURE_START + MARSHALLED_SIGNATURE_SIZE + + PACKET_HEADER_SIZE = 1 + MARSHALLED_ISSUER_SIZE = peer.MARSHALLED_TOTAL_SIZE + MARSHALLED_PEER_ENTRY_FLAG_SIZE = 1 + MARSHALLED_PEER_ENTRY_SIZE = MARSHALLED_PEER_ENTRY_FLAG_SIZE + peer.MARSHALLED_TOTAL_SIZE + MARSHALLED_PEERS_SIZE = MARSHALLED_PEER_ENTRY_SIZE * constants.NEIGHBOR_COUNT + MARSHALLED_SIGNATURE_SIZE = 65 + + MARSHALLED_TOTAL_SIZE = MARSHALLED_SIGNATURE_END +) diff --git a/plugins/autopeering/protocol/ping/errors.go b/plugins/autopeering/protocol/ping/errors.go new file mode 100644 index 00000000..218d6091 --- /dev/null +++ b/plugins/autopeering/protocol/ping/errors.go @@ -0,0 +1,8 @@ +package ping + +import "github.com/pkg/errors" + +var ( + ErrInvalidSignature = errors.New("invalid signature in ping") + ErrMalformedPing = errors.New("malformed ping") +) diff --git a/plugins/autopeering/protocol/ping/ping.go b/plugins/autopeering/protocol/ping/ping.go new file mode 100644 index 00000000..bdd6b745 --- /dev/null +++ b/plugins/autopeering/protocol/ping/ping.go @@ -0,0 +1,69 @@ +package ping + +import ( + "bytes" + "github.com/iotaledger/goshimmer/packages/identity" + "github.com/iotaledger/goshimmer/plugins/autopeering/peermanager/types" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + "github.com/iotaledger/goshimmer/plugins/autopeering/saltmanager" +) + +type Ping struct { + Issuer *peer.Peer + Neighbors types.PeerList + Signature [MARSHALLED_SIGNATURE_SIZE]byte +} + +func Unmarshal(data []byte) (*Ping, error) { + if data[0] != MARSHALLED_PACKET_HEADER || len(data) != MARSHALLED_TOTAL_SIZE { + return nil, ErrMalformedPing + } + + ping := &Ping{} + + if unmarshalledPeer, err := peer.Unmarshal(data[MARSHALLED_ISSUER_START:MARSHALLED_ISSUER_END]); err != nil { + return nil, err + } else { + ping.Issuer = unmarshalledPeer + } + + if err := saltmanager.CheckSalt(ping.Issuer.Salt); err != nil { + return nil, err + } + + if issuer, err := identity.FromSignedData(data[:MARSHALLED_SIGNATURE_START], data[MARSHALLED_SIGNATURE_START:]); err != nil { + return nil, err + } else { + if !bytes.Equal(issuer.Identifier, ping.Issuer.Identity.Identifier) { + return nil, ErrInvalidSignature + } + } + copy(ping.Signature[:], data[MARSHALLED_SIGNATURE_START:MARSHALLED_SIGNATURE_END]) + + return ping, nil +} + +func (ping *Ping) Marshal() []byte { + result := make([]byte, MARSHALLED_TOTAL_SIZE) + + result[PACKET_HEADER_START] = MARSHALLED_PACKET_HEADER + copy(result[MARSHALLED_ISSUER_START:MARSHALLED_ISSUER_END], ping.Issuer.Marshal()) + for i, neighbor := range ping.Neighbors { + entryStartOffset := MARSHALLED_PEERS_START + i * MARSHALLED_PEER_ENTRY_SIZE + + result[entryStartOffset] = 1 + + copy(result[entryStartOffset + 1:entryStartOffset + MARSHALLED_PEER_ENTRY_SIZE], neighbor.Marshal()) + } + copy(result[MARSHALLED_SIGNATURE_START:MARSHALLED_SIGNATURE_END], ping.Signature[:MARSHALLED_SIGNATURE_SIZE]) + + return result +} + +func (this *Ping) Sign() { + if signature, err := this.Issuer.Identity.Sign(this.Marshal()[:MARSHALLED_SIGNATURE_START]); err != nil { + panic(err) + } else { + copy(this.Signature[:], signature) + } +} diff --git a/plugins/autopeering/protocol/plugin.go b/plugins/autopeering/protocol/plugin.go new file mode 100644 index 00000000..e7e9820a --- /dev/null +++ b/plugins/autopeering/protocol/plugin.go @@ -0,0 +1,30 @@ +package protocol + +import ( + "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/plugins/autopeering/server/tcp" + "github.com/iotaledger/goshimmer/plugins/autopeering/server/udp" +) + +func Configure(plugin *node.Plugin) { + incomingPingProcessor := createIncomingPingProcessor(plugin) + incomingRequestProcessor := createIncomingRequestProcessor(plugin) + incomingResponseProcessor := createIncomingResponseProcessor(plugin) + errorHandler := createErrorHandler(plugin) + + udp.Events.ReceivePing.Attach(incomingPingProcessor) + udp.Events.ReceiveRequest.Attach(incomingRequestProcessor) + udp.Events.ReceiveResponse.Attach(incomingResponseProcessor) + udp.Events.Error.Attach(errorHandler) + + tcp.Events.ReceivePing.Attach(incomingPingProcessor) + tcp.Events.ReceiveRequest.Attach(incomingRequestProcessor) + tcp.Events.ReceiveResponse.Attach(incomingResponseProcessor) + tcp.Events.Error.Attach(errorHandler) +} + +func Run(plugin *node.Plugin) { + daemon.BackgroundWorker(createChosenNeighborProcessor(plugin)) + daemon.BackgroundWorker(createOutgoingPingProcessor(plugin)) +} diff --git a/plugins/autopeering/protocol/request/constants.go b/plugins/autopeering/protocol/request/constants.go index eb05a832..acd24072 100644 --- a/plugins/autopeering/protocol/request/constants.go +++ b/plugins/autopeering/protocol/request/constants.go @@ -2,23 +2,19 @@ package request import ( "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/salt" ) const ( PACKET_HEADER_SIZE = 1 ISSUER_SIZE = peer.MARSHALLED_TOTAL_SIZE - SALT_SIZE = salt.SALT_MARSHALLED_SIZE SIGNATURE_SIZE = 65 PACKET_HEADER_START = 0 ISSUER_START = PACKET_HEADER_END - SALT_START = ISSUER_END - SIGNATURE_START = SALT_END + SIGNATURE_START = ISSUER_END PACKET_HEADER_END = PACKET_HEADER_START + PACKET_HEADER_SIZE ISSUER_END = ISSUER_START + ISSUER_SIZE - SALT_END = SALT_START + SALT_SIZE SIGNATURE_END = SIGNATURE_START + SIGNATURE_SIZE MARSHALLED_TOTAL_SIZE = SIGNATURE_END diff --git a/plugins/autopeering/protocol/request/request.go b/plugins/autopeering/protocol/request/request.go index bf499741..60eae5c9 100644 --- a/plugins/autopeering/protocol/request/request.go +++ b/plugins/autopeering/protocol/request/request.go @@ -4,14 +4,13 @@ import ( "bytes" "github.com/iotaledger/goshimmer/packages/identity" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/salt" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response" "github.com/iotaledger/goshimmer/plugins/autopeering/saltmanager" "time" ) type Request struct { Issuer *peer.Peer - Salt *salt.Salt Signature [SIGNATURE_SIZE]byte } @@ -28,17 +27,11 @@ func Unmarshal(data []byte) (*Request, error) { peeringRequest.Issuer = unmarshalledPeer } - if unmarshalledSalt, err := salt.Unmarshal(data[SALT_START:SALT_END]); err != nil { - return nil, err - } else { - peeringRequest.Salt = unmarshalledSalt - } - now := time.Now() - if peeringRequest.Salt.ExpirationTime.Before(now.Add(-1 * time.Minute)) { + if peeringRequest.Issuer.Salt.ExpirationTime.Before(now.Add(-1 * time.Minute)) { return nil, ErrPublicSaltExpired } - if peeringRequest.Salt.ExpirationTime.After(now.Add(saltmanager.PUBLIC_SALT_LIFETIME + 1*time.Minute)) { + if peeringRequest.Issuer.Salt.ExpirationTime.After(now.Add(saltmanager.PUBLIC_SALT_LIFETIME + 1*time.Minute)) { return nil, ErrPublicSaltInvalidLifetime } @@ -54,6 +47,29 @@ func Unmarshal(data []byte) (*Request, error) { return peeringRequest, nil } +func (this *Request) Accept(peers []*peer.Peer) error { + if _, err := this.Issuer.Connect(); err != nil { + return err + } + + peeringResponse := &response.Response{ + Type: response.TYPE_ACCEPT, + Issuer: OUTGOING_REQUEST.Issuer, + Peers: peers, + } + peeringResponse.Sign() + + if err := this.Issuer.Send(peeringResponse.Marshal(), false); err != nil { + return err + } + + return nil +} + +func (this *Request) Reject(peers []*peer.Peer) error { + return nil +} + func (this *Request) Sign() { if signature, err := this.Issuer.Identity.Sign(this.Marshal()[:SIGNATURE_START]); err != nil { panic(err) @@ -67,7 +83,6 @@ func (this *Request) Marshal() []byte { result[PACKET_HEADER_START] = MARSHALLED_PACKET_HEADER copy(result[ISSUER_START:ISSUER_END], this.Issuer.Marshal()) - copy(result[SALT_START:SALT_END], this.Salt.Marshal()) copy(result[SIGNATURE_START:SIGNATURE_END], this.Signature[:SIGNATURE_SIZE]) return result diff --git a/plugins/autopeering/protocol/request/send.go b/plugins/autopeering/protocol/request/send.go new file mode 100644 index 00000000..739eba9d --- /dev/null +++ b/plugins/autopeering/protocol/request/send.go @@ -0,0 +1,51 @@ +package request + +import ( + "github.com/iotaledger/goshimmer/packages/accountability" + "github.com/iotaledger/goshimmer/plugins/autopeering/parameters" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/salt" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/types" + "github.com/iotaledger/goshimmer/plugins/autopeering/saltmanager" + "github.com/pkg/errors" + "net" +) + +var OUTGOING_REQUEST *Request + +func Send(peer *peer.Peer) (bool, error) { + var keepAlive bool + switch peer.PeeringProtocolType { + case types.PROTOCOL_TYPE_TCP: + keepAlive = true + case types.PROTOCOL_TYPE_UDP: + keepAlive = false + default: + return false, errors.New("peer uses invalid protocol") + } + + if err := peer.Send(OUTGOING_REQUEST.Marshal(), keepAlive); err != nil { + return false, err + } + + return keepAlive, nil +} + +func init() { + OUTGOING_REQUEST = &Request{ + Issuer: &peer.Peer{ + Identity: accountability.OWN_ID, + PeeringProtocolType: types.PROTOCOL_TYPE_TCP, + PeeringPort: uint16(*parameters.UDP_PORT.Value), + GossipProtocolType: types.PROTOCOL_TYPE_TCP, + GossipPort: uint16(*parameters.UDP_PORT.Value), + Address: net.IPv4(0, 0, 0, 0), + Salt: saltmanager.PUBLIC_SALT, + }, + } + OUTGOING_REQUEST.Sign() + + saltmanager.Events.UpdatePublicSalt.Attach(func(salt *salt.Salt) { + OUTGOING_REQUEST.Sign() + }) +} diff --git a/plugins/autopeering/protocol/response/constants.go b/plugins/autopeering/protocol/response/constants.go index 19346565..250e5c98 100644 --- a/plugins/autopeering/protocol/response/constants.go +++ b/plugins/autopeering/protocol/response/constants.go @@ -1,7 +1,7 @@ package response import ( - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/constants" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" ) @@ -9,7 +9,7 @@ const ( TYPE_REJECT = Type(0) TYPE_ACCEPT = Type(1) - MARSHALLED_PEERS_AMOUNT = protocol.NEIGHBOR_COUNT + protocol.NEIGHBOR_COUNT*protocol.NEIGHBOR_COUNT + MARSHALLED_PEERS_AMOUNT = constants.NEIGHBOR_COUNT + constants.NEIGHBOR_COUNT * constants.NEIGHBOR_COUNT MARHSALLED_PACKET_HEADER = 0xBC MARSHALLED_PACKET_HEADER_START = 0 diff --git a/plugins/autopeering/protocol/types.go b/plugins/autopeering/protocol/types.go deleted file mode 100644 index 99e489a3..00000000 --- a/plugins/autopeering/protocol/types.go +++ /dev/null @@ -1,5 +0,0 @@ -package protocol - -type AddressType = byte - -type ProtocolType = byte diff --git a/plugins/autopeering/protocol/constants.go b/plugins/autopeering/protocol/types/constants.go similarity index 80% rename from plugins/autopeering/protocol/constants.go rename to plugins/autopeering/protocol/types/constants.go index 5405fa58..f17ad232 100644 --- a/plugins/autopeering/protocol/constants.go +++ b/plugins/autopeering/protocol/types/constants.go @@ -1,8 +1,6 @@ -package protocol +package types const ( - NEIGHBOR_COUNT = 8 - PROTOCOL_TYPE_TCP = ProtocolType(0) PROTOCOL_TYPE_UDP = ProtocolType(1) diff --git a/plugins/autopeering/protocol/types/types.go b/plugins/autopeering/protocol/types/types.go new file mode 100644 index 00000000..fb6e2465 --- /dev/null +++ b/plugins/autopeering/protocol/types/types.go @@ -0,0 +1,5 @@ +package types + +type AddressType = byte + +type ProtocolType = byte \ No newline at end of file diff --git a/plugins/autopeering/saltmanager/errors.go b/plugins/autopeering/saltmanager/errors.go new file mode 100644 index 00000000..bb2ec0d4 --- /dev/null +++ b/plugins/autopeering/saltmanager/errors.go @@ -0,0 +1,8 @@ +package saltmanager + +import "github.com/pkg/errors" + +var ( + ErrPublicSaltExpired = errors.New("expired public salt in ping") + ErrPublicSaltInvalidLifetime = errors.New("invalid public salt lifetime in ping") +) diff --git a/plugins/autopeering/saltmanager/utils.go b/plugins/autopeering/saltmanager/utils.go new file mode 100644 index 00000000..bc7c2046 --- /dev/null +++ b/plugins/autopeering/saltmanager/utils.go @@ -0,0 +1,19 @@ +package saltmanager + +import ( + "github.com/iotadevelopment/shimmer/plugins/autopeering/saltmanager" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/salt" + "time" +) + +func CheckSalt(saltToCheck *salt.Salt) error { + now := time.Now() + if saltToCheck.ExpirationTime.Before(now.Add(-1 * time.Minute)) { + return ErrPublicSaltExpired + } + if saltToCheck.ExpirationTime.After(now.Add(saltmanager.PUBLIC_SALT_LIFETIME + 1 * time.Minute)) { + return ErrPublicSaltInvalidLifetime + } + + return nil +} diff --git a/plugins/autopeering/server/tcp/constants.go b/plugins/autopeering/server/tcp/constants.go index df045312..85b2f705 100644 --- a/plugins/autopeering/server/tcp/constants.go +++ b/plugins/autopeering/server/tcp/constants.go @@ -8,4 +8,5 @@ const ( STATE_INITIAL = byte(0) STATE_REQUEST = byte(1) STATE_RESPONSE = byte(2) + STATE_PING = byte(3) ) diff --git a/plugins/autopeering/server/tcp/events.go b/plugins/autopeering/server/tcp/events.go index fa67810d..dedffa4f 100644 --- a/plugins/autopeering/server/tcp/events.go +++ b/plugins/autopeering/server/tcp/events.go @@ -1,7 +1,7 @@ package tcp import ( - "github.com/iotaledger/goshimmer/packages/network" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/ping" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response" "net" @@ -9,50 +9,70 @@ import ( ) var Events = &pluginEvents{ - ReceiveRequest: &requestEvent{make(map[uintptr]ConnectionPeeringRequestConsumer)}, - ReceiveResponse: &responseEvent{make(map[uintptr]ConnectionPeeringResponseConsumer)}, - Error: &ipErrorEvent{make(map[uintptr]IPErrorConsumer)}, + ReceivePing: &pingEvent{make(map[uintptr]PingConsumer)}, + ReceiveRequest: &requestEvent{make(map[uintptr]RequestConsumer)}, + ReceiveResponse: &responseEvent{make(map[uintptr]ResponseConsumer)}, + Error: &ipErrorEvent{make(map[uintptr]IPErrorConsumer)}, } type pluginEvents struct { + ReceivePing *pingEvent ReceiveRequest *requestEvent ReceiveResponse *responseEvent Error *ipErrorEvent } +type pingEvent struct { + callbacks map[uintptr]PingConsumer +} + +func (this *pingEvent) Attach(callback PingConsumer) { + this.callbacks[reflect.ValueOf(callback).Pointer()] = callback +} + +func (this *pingEvent) Detach(callback PingConsumer) { + delete(this.callbacks, reflect.ValueOf(callback).Pointer()) +} + +func (this *pingEvent) Trigger(ping *ping.Ping) { + for _, callback := range this.callbacks { + callback(ping) + } +} + type requestEvent struct { - callbacks map[uintptr]ConnectionPeeringRequestConsumer + callbacks map[uintptr]RequestConsumer } -func (this *requestEvent) Attach(callback ConnectionPeeringRequestConsumer) { +func (this *requestEvent) Attach(callback RequestConsumer) { this.callbacks[reflect.ValueOf(callback).Pointer()] = callback } -func (this *requestEvent) Detach(callback ConnectionPeeringRequestConsumer) { +func (this *requestEvent) Detach(callback RequestConsumer) { delete(this.callbacks, reflect.ValueOf(callback).Pointer()) } -func (this *requestEvent) Trigger(conn *network.ManagedConnection, request *request.Request) { +func (this *requestEvent) Trigger(req *request.Request) { for _, callback := range this.callbacks { - callback(conn, request) + callback(req) } } type responseEvent struct { - callbacks map[uintptr]ConnectionPeeringResponseConsumer + callbacks map[uintptr]ResponseConsumer } -func (this *responseEvent) Attach(callback ConnectionPeeringResponseConsumer) { +func (this *responseEvent) Attach(callback ResponseConsumer) { this.callbacks[reflect.ValueOf(callback).Pointer()] = callback } -func (this *responseEvent) Detach(callback ConnectionPeeringResponseConsumer) { +func (this *responseEvent) Detach(callback ResponseConsumer) { delete(this.callbacks, reflect.ValueOf(callback).Pointer()) } -func (this *responseEvent) Trigger(conn *network.ManagedConnection, peeringResponse *response.Response) { +func (this *responseEvent) Trigger(peeringResponse *response.Response) { for _, callback := range this.callbacks { - callback(conn, peeringResponse) + callback(peeringResponse) } } diff --git a/plugins/autopeering/server/tcp/server.go b/plugins/autopeering/server/tcp/server.go index 1544c703..81127fa8 100644 --- a/plugins/autopeering/server/tcp/server.go +++ b/plugins/autopeering/server/tcp/server.go @@ -6,6 +6,7 @@ import ( "github.com/iotaledger/goshimmer/packages/network/tcp" "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/plugins/autopeering/parameters" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/ping" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response" "github.com/pkg/errors" @@ -62,7 +63,7 @@ func HandleConnection(conn *network.ManagedConnection) { ProcessIncomingPacket(&connectionState, &receiveBuffer, conn, data, &offset) }) - go conn.Read(make([]byte, int(math.Max(request.MARSHALLED_TOTAL_SIZE, response.MARSHALLED_TOTAL_SIZE)))) + go conn.Read(make([]byte, int(math.Max(ping.MARSHALLED_TOTAL_SIZE, math.Max(request.MARSHALLED_TOTAL_SIZE, response.MARSHALLED_TOTAL_SIZE))))) } func ProcessIncomingPacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte, offset *int) { @@ -83,6 +84,8 @@ func ProcessIncomingPacket(connectionState *byte, receiveBuffer *[]byte, conn *n *receiveBuffer = make([]byte, request.MARSHALLED_TOTAL_SIZE) case STATE_RESPONSE: *receiveBuffer = make([]byte, response.MARSHALLED_TOTAL_SIZE) + case STATE_PING: + *receiveBuffer = make([]byte, ping.MARSHALLED_TOTAL_SIZE) } } @@ -91,27 +94,33 @@ func ProcessIncomingPacket(connectionState *byte, receiveBuffer *[]byte, conn *n processIncomingRequestPacket(connectionState, receiveBuffer, conn, data, offset) case STATE_RESPONSE: processIncomingResponsePacket(connectionState, receiveBuffer, conn, data, offset) + case STATE_PING: + processIncomingPingPacket(connectionState, receiveBuffer, conn, data, offset) } } func parsePackageHeader(data []byte) (ConnectionState, []byte, error) { var connectionState ConnectionState - var receivedData []byte + var receiveBuffer []byte switch data[0] { case request.MARSHALLED_PACKET_HEADER: - receivedData = make([]byte, request.MARSHALLED_TOTAL_SIZE) + receiveBuffer = make([]byte, request.MARSHALLED_TOTAL_SIZE) connectionState = STATE_REQUEST case response.MARHSALLED_PACKET_HEADER: - receivedData = make([]byte, response.MARSHALLED_TOTAL_SIZE) + receiveBuffer = make([]byte, response.MARSHALLED_TOTAL_SIZE) connectionState = STATE_RESPONSE + case ping.MARSHALLED_PACKET_HEADER: + receiveBuffer = make([]byte, ping.MARSHALLED_TOTAL_SIZE) + + connectionState = STATE_PING default: return 0, nil, errors.New("invalid package header") } - return connectionState, receivedData, nil + return connectionState, receiveBuffer, nil } func processIncomingRequestPacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte, offset *int) { @@ -122,16 +131,21 @@ func processIncomingRequestPacket(connectionState *byte, receiveBuffer *[]byte, if *offset + len(data) < request.MARSHALLED_TOTAL_SIZE { *offset += len(data) } else { - if peeringRequest, err := request.Unmarshal(*receiveBuffer); err != nil { + if req, err := request.Unmarshal(*receiveBuffer); err != nil { Events.Error.Trigger(conn.RemoteAddr().(*net.TCPAddr).IP, err) conn.Close() return } else { - peeringRequest.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP + req.Issuer.Conn = conn + req.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP - Events.ReceiveRequest.Trigger(conn, peeringRequest) + req.Issuer.Conn.Events.Close.Attach(func() { + req.Issuer.Conn = nil + }) + + Events.ReceiveRequest.Trigger(req) } *connectionState = STATE_INITIAL @@ -150,16 +164,21 @@ func processIncomingResponsePacket(connectionState *byte, receiveBuffer *[]byte, if *offset + len(data) < response.MARSHALLED_TOTAL_SIZE { *offset += len(data) } else { - if peeringResponse, err := response.Unmarshal(*receiveBuffer); err != nil { + if res, err := response.Unmarshal(*receiveBuffer); err != nil { Events.Error.Trigger(conn.RemoteAddr().(*net.TCPAddr).IP, err) conn.Close() return } else { - peeringResponse.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP + res.Issuer.Conn = conn + res.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP + + res.Issuer.Conn.Events.Close.Attach(func() { + res.Issuer.Conn = nil + }) - Events.ReceiveResponse.Trigger(conn, peeringResponse) + Events.ReceiveResponse.Trigger(res) } *connectionState = STATE_INITIAL @@ -169,3 +188,36 @@ func processIncomingResponsePacket(connectionState *byte, receiveBuffer *[]byte, } } } + +func processIncomingPingPacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte, offset *int) { + remainingCapacity := int(math.Min(float64(ping.MARSHALLED_TOTAL_SIZE - *offset), float64(len(data)))) + + copy((*receiveBuffer)[*offset:], data[:remainingCapacity]) + + if *offset + len(data) < ping.MARSHALLED_TOTAL_SIZE { + *offset += len(data) + } else { + if ping, err := ping.Unmarshal(*receiveBuffer); err != nil { + Events.Error.Trigger(conn.RemoteAddr().(*net.TCPAddr).IP, err) + + conn.Close() + + return + } else { + ping.Issuer.Conn = conn + ping.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP + + ping.Issuer.Conn.Events.Close.Attach(func() { + ping.Issuer.Conn = nil + }) + + Events.ReceivePing.Trigger(ping) + } + + *connectionState = STATE_INITIAL + + if *offset + len(data) > ping.MARSHALLED_TOTAL_SIZE { + ProcessIncomingPacket(connectionState, receiveBuffer, conn, data[ping.MARSHALLED_TOTAL_SIZE:], offset) + } + } +} diff --git a/plugins/autopeering/server/tcp/types.go b/plugins/autopeering/server/tcp/types.go index 4298cf0b..b8dca9d4 100644 --- a/plugins/autopeering/server/tcp/types.go +++ b/plugins/autopeering/server/tcp/types.go @@ -1,15 +1,17 @@ package tcp import ( - "github.com/iotaledger/goshimmer/packages/network" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/ping" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response" "net" ) -type ConnectionPeeringRequestConsumer = func(conn *network.ManagedConnection, request *request.Request) +type PingConsumer = func(p *ping.Ping) -type ConnectionPeeringResponseConsumer = func(conn *network.ManagedConnection, peeringResponse *response.Response) +type RequestConsumer = func(req *request.Request) + +type ResponseConsumer = func(peeringResponse *response.Response) type IPErrorConsumer = func(ip net.IP, err error) diff --git a/plugins/autopeering/server/udp/events.go b/plugins/autopeering/server/udp/events.go index d07d3cd7..588e85b8 100644 --- a/plugins/autopeering/server/udp/events.go +++ b/plugins/autopeering/server/udp/events.go @@ -1,6 +1,7 @@ package udp import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/ping" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response" "net" @@ -8,17 +9,37 @@ import ( ) var Events = &pluginEvents{ - ReceiveRequest: &requestEvent{make(map[uintptr]ConnectionPeeringRequestConsumer)}, + ReceivePing: &pingEvent{make(map[uintptr]PingConsumer)}, + ReceiveRequest: &requestEvent{make(map[uintptr]ConnectionPeeringRequestConsumer)}, ReceiveResponse: &responseEvent{make(map[uintptr]ConnectionPeeringResponseConsumer)}, - Error: &ipErrorEvent{make(map[uintptr]IPErrorConsumer)}, + Error: &ipErrorEvent{make(map[uintptr]IPErrorConsumer)}, } type pluginEvents struct { + ReceivePing *pingEvent ReceiveRequest *requestEvent ReceiveResponse *responseEvent Error *ipErrorEvent } +type pingEvent struct { + callbacks map[uintptr]PingConsumer +} + +func (this *pingEvent) Attach(callback PingConsumer) { + this.callbacks[reflect.ValueOf(callback).Pointer()] = callback +} + +func (this *pingEvent) Detach(callback PingConsumer) { + delete(this.callbacks, reflect.ValueOf(callback).Pointer()) +} + +func (this *pingEvent) Trigger(ping *ping.Ping) { + for _, callback := range this.callbacks { + callback(ping) + } +} + type requestEvent struct { callbacks map[uintptr]ConnectionPeeringRequestConsumer } @@ -72,4 +93,3 @@ func (this *ipErrorEvent) Trigger(ip net.IP, err error) { callback(ip, err) } } - diff --git a/plugins/autopeering/server/udp/server.go b/plugins/autopeering/server/udp/server.go index c4b09ade..46fc6e8f 100644 --- a/plugins/autopeering/server/udp/server.go +++ b/plugins/autopeering/server/udp/server.go @@ -5,6 +5,7 @@ import ( "github.com/iotaledger/goshimmer/packages/network/udp" "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/plugins/autopeering/parameters" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/ping" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response" "github.com/pkg/errors" @@ -72,6 +73,14 @@ func processReceivedData(addr *net.UDPAddr, data []byte) { Events.ReceiveResponse.Trigger(peeringResponse) } + case ping.MARSHALLED_PACKET_HEADER: + if ping, err := ping.Unmarshal(data); err != nil { + Events.Error.Trigger(addr.IP, err) + } else { + ping.Issuer.Address = addr.IP + + Events.ReceivePing.Trigger(ping) + } default: Events.Error.Trigger(addr.IP, errors.New("invalid UDP peering packet from " + addr.IP.String())) } diff --git a/plugins/autopeering/server/udp/types.go b/plugins/autopeering/server/udp/types.go index 48e103e9..c3d055d0 100644 --- a/plugins/autopeering/server/udp/types.go +++ b/plugins/autopeering/server/udp/types.go @@ -1,11 +1,14 @@ package udp import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/ping" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response" "net" ) +type PingConsumer = func(p *ping.Ping) + type ConnectionPeeringRequestConsumer = func(request *request.Request) type ConnectionPeeringResponseConsumer = func(peeringResponse *response.Response) diff --git a/plugins/gossip/neighbormanager/accepted_neighbors.go b/plugins/gossip/neighbormanager/accepted_neighbors.go new file mode 100644 index 00000000..10f4ca95 --- /dev/null +++ b/plugins/gossip/neighbormanager/accepted_neighbors.go @@ -0,0 +1,7 @@ +package neighbormanager + +import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/peermanager/types" +) + +var ACCEPTED_NEIGHBORS = make(types.PeerRegister) diff --git a/plugins/gossip/neighbormanager/chosen_neighbors.go b/plugins/gossip/neighbormanager/chosen_neighbors.go new file mode 100644 index 00000000..5e68468f --- /dev/null +++ b/plugins/gossip/neighbormanager/chosen_neighbors.go @@ -0,0 +1,7 @@ +package neighbormanager + +import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/peermanager/types" +) + +var CHOSEN_NEIGHBORS = make(types.PeerRegister) diff --git a/plugins/gossip/neighbormanager/neighbormanager.go b/plugins/gossip/neighbormanager/neighbormanager.go new file mode 100644 index 00000000..a664339e --- /dev/null +++ b/plugins/gossip/neighbormanager/neighbormanager.go @@ -0,0 +1,3 @@ +package neighbormanager + + diff --git a/plugins/statusscreen/ui_header_bar.go b/plugins/statusscreen/ui_header_bar.go index eb9bad6a..118ad837 100644 --- a/plugins/statusscreen/ui_header_bar.go +++ b/plugins/statusscreen/ui_header_bar.go @@ -5,6 +5,7 @@ import ( "github.com/gdamore/tcell" "github.com/iotaledger/goshimmer/packages/accountability" "github.com/iotaledger/goshimmer/plugins/autopeering/peermanager" + "github.com/iotaledger/goshimmer/plugins/gossip/neighbormanager" "github.com/rivo/tview" "math" "strconv" @@ -62,10 +63,11 @@ func (headerBar *UIHeaderBar) Update() { fmt.Fprintln(headerBar.InfoContainer) fmt.Fprintln(headerBar.InfoContainer) fmt.Fprintln(headerBar.InfoContainer) - fmt.Fprintln(headerBar.InfoContainer) fmt.Fprintf(headerBar.InfoContainer, "[::b]Node ID: [::d]%40v ", accountability.OWN_ID.StringIdentifier) fmt.Fprintln(headerBar.InfoContainer) - fmt.Fprintf(headerBar.InfoContainer, "[::b]Known Peers: [::d]%40v ", strconv.Itoa(len(peermanager.KNOWN_PEERS.Peers))) + fmt.Fprintf(headerBar.InfoContainer, "[::b]Neighbors: [::d]%40v ", strconv.Itoa(len(neighbormanager.CHOSEN_NEIGHBORS)) + " chosen / " + strconv.Itoa(len(neighbormanager.ACCEPTED_NEIGHBORS)) + " accepted") + fmt.Fprintln(headerBar.InfoContainer) + fmt.Fprintf(headerBar.InfoContainer, "[::b]Known Peers: [::d]%40v ", strconv.Itoa(len(peermanager.KNOWN_PEERS)) + " total / " + strconv.Itoa(len(peermanager.NEIGHBORHOOD)) + " neighborhood") fmt.Fprintln(headerBar.InfoContainer) fmt.Fprintf(headerBar.InfoContainer, "[::b]Uptime: [::d]"); -- GitLab