diff --git a/packages/timeutil/ticker.go b/packages/timeutil/ticker.go new file mode 100644 index 0000000000000000000000000000000000000000..0a7536941bd43f252b7ddfdf283c8c43f77c1b84 --- /dev/null +++ b/packages/timeutil/ticker.go @@ -0,0 +1,19 @@ +package timeutil + +import ( + "github.com/iotaledger/goshimmer/packages/daemon" + "time" +) + +func Ticker(handler func(), interval time.Duration) { + ticker := time.NewTicker(interval) + ticker: + for { + select { + case <- daemon.ShutdownSignal: + break ticker + case <- ticker.C: + handler() + } + } +} diff --git a/plugins/autopeering/parameters/parameters.go b/plugins/autopeering/parameters/parameters.go index f4cceaa347ab3d8d0ba408c8bd089d4c0075fa3b..8d5b29f79857a4493e5d46e215a4ef25edc60d6c 100644 --- a/plugins/autopeering/parameters/parameters.go +++ b/plugins/autopeering/parameters/parameters.go @@ -4,7 +4,7 @@ import "github.com/iotaledger/goshimmer/packages/parameter" var ( ADDRESS = parameter.AddString("AUTOPEERING/ADDRESS", "0.0.0.0", "address to bind for incoming peering requests") - ENTRY_NODES = parameter.AddString("AUTOPEERING/ENTRY_NODES", "tcp://82.165.29.179:14626", "list of trusted entry nodes for auto peering") + ENTRY_NODES = parameter.AddString("AUTOPEERING/ENTRY_NODES", "tcp://0d828930890386f036eb77982cc067c5429f7b8f@82.165.29.179:14626", "list of trusted entry nodes for auto peering") TCP_PORT = parameter.AddInt("AUTOPEERING/TCP_PORT", 14626, "tcp port for incoming peering requests") UDP_PORT = parameter.AddInt("AUTOPEERING/UDP_PORT", 14626, "udp port for incoming peering requests") ) diff --git a/plugins/autopeering/peermanager/accepted_neighbors.go b/plugins/autopeering/peermanager/accepted_neighbors.go deleted file mode 100644 index ff1e87ed1cb2fbff2855cea5a936548eae996196..0000000000000000000000000000000000000000 --- a/plugins/autopeering/peermanager/accepted_neighbors.go +++ /dev/null @@ -1,5 +0,0 @@ -package peermanager - -import "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" - -var ACCEPTED_NEIGHBORS = &PeerList{make(map[string]*peer.Peer)} diff --git a/plugins/autopeering/peermanager/chosen_neighbor_candidates.go b/plugins/autopeering/peermanager/chosen_neighbor_candidates.go new file mode 100644 index 0000000000000000000000000000000000000000..52f41864e244996b9e21c177e7bea61308767980 --- /dev/null +++ b/plugins/autopeering/peermanager/chosen_neighbor_candidates.go @@ -0,0 +1,25 @@ +package peermanager + +import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/peermanager/types" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" +) + +var CHOSEN_NEIGHBOR_CANDIDATES types.PeerList + +var CHOSEN_NEIGHBOR_DISTANCE = func(req *request.Request) func(p *peer.Peer) float64 { + return func(p *peer.Peer) float64 { + return 1 + } +} + +func init() { + updateNeighborCandidates() + + Events.UpdateNeighborhood.Attach(updateNeighborCandidates) +} + +func updateNeighborCandidates() { + CHOSEN_NEIGHBOR_CANDIDATES = NEIGHBORHOOD.List().Sort(CHOSEN_NEIGHBOR_DISTANCE(request.OUTGOING_REQUEST)) +} diff --git a/plugins/autopeering/peermanager/chosen_neighbors.go b/plugins/autopeering/peermanager/chosen_neighbors.go deleted file mode 100644 index 595d857104197067fc90db5bdc351d8efe2bcc98..0000000000000000000000000000000000000000 --- a/plugins/autopeering/peermanager/chosen_neighbors.go +++ /dev/null @@ -1,5 +0,0 @@ -package peermanager - -import "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" - -var CHOSEN_NEIGHBORS = &PeerList{make(map[string]*peer.Peer)} diff --git a/plugins/autopeering/peermanager/constants.go b/plugins/autopeering/peermanager/constants.go deleted file mode 100644 index df72cfb2ee2ccdd4dce8b7d96a83e4a3f79b4de3..0000000000000000000000000000000000000000 --- a/plugins/autopeering/peermanager/constants.go +++ /dev/null @@ -1,9 +0,0 @@ -package peermanager - -import ( - "time" -) - -const ( - FIND_NEIGHBOR_INTERVAL = 5 * time.Second -) diff --git a/plugins/autopeering/peermanager/entry_nodes.go b/plugins/autopeering/peermanager/entry_nodes.go index e5d6b27abba4f65f64b411b80456e04439f3afb4..f7f7b22c01ce73e96a4acc8f365ad0176e8a4322 100644 --- a/plugins/autopeering/peermanager/entry_nodes.go +++ b/plugins/autopeering/peermanager/entry_nodes.go @@ -1,9 +1,11 @@ package peermanager import ( + "github.com/iotaledger/goshimmer/packages/identity" "github.com/iotaledger/goshimmer/plugins/autopeering/parameters" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol" + peermanagerTypes "github.com/iotaledger/goshimmer/plugins/autopeering/peermanager/types" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/types" "net" "strconv" "strings" @@ -11,8 +13,8 @@ import ( var ENTRY_NODES = parseEntryNodes() -func parseEntryNodes() []*peer.Peer { - result := make([]*peer.Peer, 0) +func parseEntryNodes() peermanagerTypes.PeerList { + result := make(peermanagerTypes.PeerList, 0) for _, entryNodeDefinition := range strings.Fields(*parameters.ENTRY_NODES.Value) { if entryNodeDefinition == "" { @@ -29,12 +31,20 @@ func parseEntryNodes() []*peer.Peer { } switch protocolBits[0] { case "tcp": - entryNode.PeeringProtocolType = protocol.PROTOCOL_TYPE_TCP + entryNode.PeeringProtocolType = types.PROTOCOL_TYPE_TCP case "udp": - entryNode.PeeringProtocolType = protocol.PROTOCOL_TYPE_UDP + entryNode.PeeringProtocolType = types.PROTOCOL_TYPE_UDP } - addressBits := strings.Split(protocolBits[1], ":") + identityBits := strings.Split(protocolBits[1], "@") + if len(identityBits) != 2 { + panic("error while parsing identity of entry node: " + entryNodeDefinition) + } + entryNode.Identity = &identity.Identity{ + StringIdentifier: identityBits[0], + } + + addressBits := strings.Split(identityBits[1], ":") switch len(addressBits) { case 2: host := addressBits[0] diff --git a/plugins/autopeering/peermanager/events.go b/plugins/autopeering/peermanager/events.go new file mode 100644 index 0000000000000000000000000000000000000000..db39e88834ac1efbd67ad70c11ceb4231d7bba0e --- /dev/null +++ b/plugins/autopeering/peermanager/events.go @@ -0,0 +1,31 @@ +package peermanager + +import "reflect" + +var Events = moduleEvents{ + UpdateNeighborhood: &callbackEvent{make(map[uintptr]Callback)}, +} + +type moduleEvents struct { + UpdateNeighborhood *callbackEvent +} + +type callbackEvent struct { + callbacks map[uintptr]Callback +} + +func (this *callbackEvent) Attach(callback Callback) { + this.callbacks[reflect.ValueOf(callback).Pointer()] = callback +} + +func (this *callbackEvent) Detach(callback Callback) { + delete(this.callbacks, reflect.ValueOf(callback).Pointer()) +} + +func (this *callbackEvent) Trigger() { + for _, callback := range this.callbacks { + callback() + } +} + +type Callback = func() \ No newline at end of file diff --git a/plugins/autopeering/peermanager/known_peers.go b/plugins/autopeering/peermanager/known_peers.go index d8aeba0ad6335f5dc88b7839c204d303bec71e1b..1b23fb82e7821e65b576f953fb60ce0e371e5efb 100644 --- a/plugins/autopeering/peermanager/known_peers.go +++ b/plugins/autopeering/peermanager/known_peers.go @@ -1,5 +1,16 @@ package peermanager -import "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" +import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/peermanager/types" +) -var KNOWN_PEERS = &PeerList{make(map[string]*peer.Peer)} +var KNOWN_PEERS = initKnownPeers() + +func initKnownPeers() types.PeerRegister { + knownPeers := make(types.PeerRegister) + for _, entryNode := range ENTRY_NODES { + knownPeers.AddOrUpdate(entryNode) + } + + return knownPeers +} \ No newline at end of file diff --git a/plugins/autopeering/peermanager/neighborhood.go b/plugins/autopeering/peermanager/neighborhood.go new file mode 100644 index 0000000000000000000000000000000000000000..e6d95d89ccaa06fdf5c7f3d882a0a86e8448809c --- /dev/null +++ b/plugins/autopeering/peermanager/neighborhood.go @@ -0,0 +1,42 @@ +package peermanager + +import ( + "github.com/iotaledger/goshimmer/packages/timeutil" + "github.com/iotaledger/goshimmer/plugins/autopeering/peermanager/types" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" + "time" +) + +var NEIGHBORHOOD types.PeerRegister + +var NEIGHBORHOOD_LIST types.PeerList + +// Selects a fixed neighborhood from all known peers - this allows nodes to "stay in the same circles" that share their +// view on the ledger an is a preparation for economic clustering +var NEIGHBORHOOD_SELECTOR = func(this types.PeerRegister, req *request.Request) types.PeerRegister { + filteredPeers := make(types.PeerRegister) + for id, peer := range this { + filteredPeers[id] = peer + } + + return filteredPeers +} + +var lastUpdate = time.Now() + +func init() { + updateNeighborHood() + + go timeutil.Ticker(updateNeighborHood, 1 * time.Second) +} + +func updateNeighborHood() { + if float64(len(NEIGHBORHOOD)) * 1.2 <= float64(len(KNOWN_PEERS)) || lastUpdate.Before(time.Now().Add(-300 * time.Second)) { + NEIGHBORHOOD = KNOWN_PEERS.Filter(NEIGHBORHOOD_SELECTOR, request.OUTGOING_REQUEST) + NEIGHBORHOOD_LIST = NEIGHBORHOOD.List() + + lastUpdate = time.Now() + + Events.UpdateNeighborhood.Trigger() + } +} \ No newline at end of file diff --git a/plugins/autopeering/peermanager/peer_list.go b/plugins/autopeering/peermanager/peer_list.go deleted file mode 100644 index c8add98070a69cb6a09c5b28f8b42ef7165e3d66..0000000000000000000000000000000000000000 --- a/plugins/autopeering/peermanager/peer_list.go +++ /dev/null @@ -1,45 +0,0 @@ -package peermanager - -import ( - "bytes" - "github.com/iotaledger/goshimmer/packages/accountability" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" -) - -type PeerList struct { - Peers map[string]*peer.Peer -} - -func (this *PeerList) Update(peer *peer.Peer) bool { - if peer.Identity == nil || bytes.Equal(peer.Identity.Identifier, accountability.OWN_ID.Identifier) { - return false - } - - if existingPeer, exists := this.Peers[peer.Identity.StringIdentifier]; exists { - existingPeer.Address = peer.Address - existingPeer.GossipPort = peer.GossipPort - existingPeer.PeeringPort = peer.PeeringPort - - // trigger update peer - - return false - } else { - this.Peers[peer.Identity.StringIdentifier] = peer - - // trigger add peer - - return true - } -} - -func (this *PeerList) Add(peer *peer.Peer) { - this.Peers[peer.Identity.StringIdentifier] = peer -} - -func (this *PeerList) Contains(key string) bool { - if _, exists := this.Peers[key]; exists { - return true - } else { - return false - } -} diff --git a/plugins/autopeering/peermanager/peer_manager.go b/plugins/autopeering/peermanager/peer_manager.go deleted file mode 100644 index b9979c53b9dda4c7ea13cb0dce6b782ff98788b0..0000000000000000000000000000000000000000 --- a/plugins/autopeering/peermanager/peer_manager.go +++ /dev/null @@ -1,250 +0,0 @@ -package peermanager - -import ( - "github.com/iotaledger/goshimmer/packages/daemon" - "github.com/iotaledger/goshimmer/packages/network" - "github.com/iotaledger/goshimmer/packages/node" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response" - "github.com/iotaledger/goshimmer/plugins/autopeering/server/tcp" - "github.com/iotaledger/goshimmer/plugins/autopeering/server/udp" - "net" - "strconv" - "time" -) - -func Configure(plugin *node.Plugin) { - configurePeeringRequest() - - // setup processing of peering requests - udp.Events.ReceiveRequest.Attach(func(peeringRequest *request.Request) { - processPeeringRequest(plugin, peeringRequest, nil) - }) - tcp.Events.ReceiveRequest.Attach(func(conn *network.ManagedConnection, peeringRequest *request.Request) { - processPeeringRequest(plugin, peeringRequest, conn) - }) - - // setup processing of peering responses - udp.Events.ReceiveResponse.Attach(func(peeringResponse *response.Response) { - processPeeringResponse(plugin, peeringResponse, nil) - }) - tcp.Events.ReceiveResponse.Attach(func(conn *network.ManagedConnection, peeringResponse *response.Response) { - processPeeringResponse(plugin, peeringResponse, conn) - }) - - udp.Events.Error.Attach(func(ip net.IP, err error) { - plugin.LogDebug("error when communicating with " + ip.String() + ": " + err.Error()) - }) - tcp.Events.Error.Attach(func(ip net.IP, err error) { - plugin.LogDebug("invalid peering request from " + ip.String() + ": " + err.Error()) - }) -} - -func Run(plugin *node.Plugin) { - // setup background worker that contacts "chosen neighbors" - daemon.BackgroundWorker(func() { - plugin.LogInfo("Starting Peer Manager ...") - plugin.LogSuccess("Starting Peer Manager ... done") - - sendPeeringRequests(plugin) - - ticker := time.NewTicker(FIND_NEIGHBOR_INTERVAL) - for { - select { - case <-daemon.ShutdownSignal: - return - case <-ticker.C: - sendPeeringRequests(plugin) - } - } - }) -} - -func Shutdown(plugin *node.Plugin) { - plugin.LogInfo("Stopping Peer Manager ...") - plugin.LogSuccess("Stopping Peer Manager ... done") -} - -func processPeeringRequest(plugin *node.Plugin, peeringRequest *request.Request, conn net.Conn) { - if KNOWN_PEERS.Update(peeringRequest.Issuer) { - plugin.LogInfo("new peer detected: " + peeringRequest.Issuer.Address.String() + " / " + peeringRequest.Issuer.Identity.StringIdentifier) - } - - if conn == nil { - plugin.LogDebug("received UDP peering request from " + peeringRequest.Issuer.Identity.StringIdentifier) - - var protocolString string - switch peeringRequest.Issuer.PeeringProtocolType { - case protocol.PROTOCOL_TYPE_TCP: - protocolString = "tcp" - case protocol.PROTOCOL_TYPE_UDP: - protocolString = "udp" - default: - plugin.LogFailure("unsupported peering protocol in request from " + peeringRequest.Issuer.Address.String()) - - return - } - - var err error - conn, err = net.Dial(protocolString, peeringRequest.Issuer.Address.String() + ":" + strconv.Itoa(int(peeringRequest.Issuer.PeeringPort))) - if err != nil { - plugin.LogDebug("error when connecting to " + peeringRequest.Issuer.Address.String() + " during peering process: " + err.Error()) - - return - } - } else { - plugin.LogDebug("received TCP peering request from " + peeringRequest.Issuer.Identity.StringIdentifier) - } - - sendFittingPeeringResponse(conn) -} - -func processPeeringResponse(plugin *node.Plugin, peeringResponse *response.Response, conn *network.ManagedConnection) { - if KNOWN_PEERS.Update(peeringResponse.Issuer) { - plugin.LogInfo("new peer detected: " + peeringResponse.Issuer.Address.String() + " / " + peeringResponse.Issuer.Identity.StringIdentifier) - } - for _, peer := range peeringResponse.Peers { - if KNOWN_PEERS.Update(peer) { - plugin.LogInfo("new peer detected: " + peer.Address.String() + " / " + peer.Identity.StringIdentifier) - } - } - - if conn == nil { - plugin.LogDebug("received UDP peering response from " + peeringResponse.Issuer.Identity.StringIdentifier) - } else { - plugin.LogDebug("received TCP peering response from " + peeringResponse.Issuer.Identity.StringIdentifier) - - conn.Close() - } - - switch peeringResponse.Type { - case response.TYPE_ACCEPT: - CHOSEN_NEIGHBORS.Update(peeringResponse.Issuer) - case response.TYPE_REJECT: - default: - plugin.LogDebug("invalid response type in peering response of " + peeringResponse.Issuer.Address.String() + ":" + strconv.Itoa(int(peeringResponse.Issuer.PeeringPort))) - } -} - -func sendFittingPeeringResponse(conn net.Conn) { - var peeringResponse *response.Response - if len(ACCEPTED_NEIGHBORS.Peers) < protocol.NEIGHBOR_COUNT/2 { - peeringResponse = generateAcceptResponse() - } else { - peeringResponse = generateRejectResponse() - } - - peeringResponse.Sign() - - conn.Write(peeringResponse.Marshal()) - conn.Close() -} - -func generateAcceptResponse() *response.Response { - peeringResponse := &response.Response{ - Type: response.TYPE_ACCEPT, - Issuer: PEERING_REQUEST.Issuer, - Peers: generateProposedNodeCandidates(), - } - - return peeringResponse -} - -func generateRejectResponse() *response.Response { - peeringResponse := &response.Response{ - Type: response.TYPE_REJECT, - Issuer: PEERING_REQUEST.Issuer, - Peers: generateProposedNodeCandidates(), - } - - return peeringResponse -} - - -func generateProposedNodeCandidates() []*peer.Peer { - peers := make([]*peer.Peer, 0) - for _, peer := range KNOWN_PEERS.Peers { - peers = append(peers, peer) - } - - return peers -} - -//region PEERING REQUEST RELATED METHODS /////////////////////////////////////////////////////////////////////////////// - -func sendPeeringRequests(plugin *node.Plugin) { - for _, peer := range getChosenNeighborCandidates() { - sendPeeringRequest(plugin, peer) - } -} - -func sendPeeringRequest(plugin *node.Plugin, peer *peer.Peer) { - switch peer.PeeringProtocolType { - case protocol.PROTOCOL_TYPE_TCP: - sendTCPPeeringRequest(plugin, peer) - - case protocol.PROTOCOL_TYPE_UDP: - sendUDPPeeringRequest(plugin, peer) - - default: - panic("invalid protocol in known peers") - } -} - -func sendTCPPeeringRequest(plugin *node.Plugin, peer *peer.Peer) { - go func() { - tcpConnection, err := net.Dial("tcp", peer.Address.String() + ":" + strconv.Itoa(int(peer.PeeringPort))) - if err != nil { - plugin.LogDebug("error while trying to send TCP peering request to " + peer.String() + ": " + err.Error()) - } else { - mConn := network.NewManagedConnection(tcpConnection) - - plugin.LogDebug("sending TCP peering request to " + peer.String()) - - if _, err := mConn.Write(PEERING_REQUEST.Marshal()); err != nil { - plugin.LogDebug("error while trying to send TCP peering request to " + peer.String() + ": " + err.Error()) - - return - } - - tcp.HandleConnection(mConn) - } - }() -} - -func sendUDPPeeringRequest(plugin *node.Plugin, peer *peer.Peer) { - go func() { - udpConnection, err := net.Dial("udp", peer.Address.String()+":"+strconv.Itoa(int(peer.PeeringPort))) - if err != nil { - plugin.LogDebug("error while trying to send peering request to " + peer.Address.String() + ":" + strconv.Itoa(int(peer.PeeringPort)) + " / " + peer.Identity.StringIdentifier + ": " + err.Error()) - } else { - mConn := network.NewManagedConnection(udpConnection) - - if _, err := mConn.Write(PEERING_REQUEST.Marshal()); err != nil { - plugin.LogDebug("error while trying to send peering request to " + peer.Address.String() + ":" + strconv.Itoa(int(peer.PeeringPort)) + " / " + peer.Identity.StringIdentifier + ": " + err.Error()) - - return - } - - // setup listener for incoming responses - } - }() -} - -func getChosenNeighborCandidates() []*peer.Peer { - result := make([]*peer.Peer, 0) - - for _, peer := range KNOWN_PEERS.Peers { - result = append(result, peer) - } - - for _, peer := range ENTRY_NODES { - result = append(result, peer) - } - - return result -} - -//endregion //////////////////////////////////////////////////////////////////////////////////////////////////////////// \ No newline at end of file diff --git a/plugins/autopeering/peermanager/peering_request.go b/plugins/autopeering/peermanager/peering_request.go deleted file mode 100644 index 4475b6f42b8a57fc74d8c1dac91e3fa57da71a13..0000000000000000000000000000000000000000 --- a/plugins/autopeering/peermanager/peering_request.go +++ /dev/null @@ -1,33 +0,0 @@ -package peermanager - -import ( - "github.com/iotaledger/goshimmer/packages/accountability" - "github.com/iotaledger/goshimmer/plugins/autopeering/parameters" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/salt" - "github.com/iotaledger/goshimmer/plugins/autopeering/saltmanager" - "net" -) - -var PEERING_REQUEST *request.Request - -func configurePeeringRequest() { - PEERING_REQUEST = &request.Request{ - Issuer: &peer.Peer{ - Identity: accountability.OWN_ID, - PeeringProtocolType: protocol.PROTOCOL_TYPE_TCP, - PeeringPort: uint16(*parameters.UDP_PORT.Value), - GossipProtocolType: protocol.PROTOCOL_TYPE_TCP, - GossipPort: uint16(*parameters.UDP_PORT.Value), - Address: net.IPv4(0, 0, 0, 0), - }, - Salt: saltmanager.PUBLIC_SALT, - } - PEERING_REQUEST.Sign() - - saltmanager.Events.UpdatePublicSalt.Attach(func(salt *salt.Salt) { - PEERING_REQUEST.Sign() - }) -} diff --git a/plugins/autopeering/peermanager/types/peer_list.go b/plugins/autopeering/peermanager/types/peer_list.go new file mode 100644 index 0000000000000000000000000000000000000000..3bcc3f86e8cb62370cbd75e8d3b73030083ebde6 --- /dev/null +++ b/plugins/autopeering/peermanager/types/peer_list.go @@ -0,0 +1,33 @@ +package types + +import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + "sort" +) + +type PeerList []*peer.Peer + +func (this PeerList) Filter(predicate func(p *peer.Peer) bool) PeerList { + peerList := make(PeerList, len(this)) + + counter := 0 + for _, peer := range this { + if predicate(peer) { + peerList[counter] = peer + counter++ + } + } + + return peerList[:counter] +} + +// Sorts the PeerRegister by their distance to an anchor. +func (this PeerList) Sort(distance func(p *peer.Peer) float64) PeerList { + sort.Slice(this, func(i, j int) bool { + return distance(this[i]) < distance(this[j]) + }) + + return this +} + + diff --git a/plugins/autopeering/peermanager/types/peer_register.go b/plugins/autopeering/peermanager/types/peer_register.go new file mode 100644 index 0000000000000000000000000000000000000000..c4cbbb3fac0f97e8cb5cf374539e990d9dc0e870 --- /dev/null +++ b/plugins/autopeering/peermanager/types/peer_register.go @@ -0,0 +1,57 @@ +package types + +import ( + "bytes" + "github.com/iotaledger/goshimmer/packages/accountability" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" +) + +type PeerRegister map[string]*peer.Peer + +// returns true if a new entry was added +func (this PeerRegister) AddOrUpdate(peer *peer.Peer) bool { + if peer.Identity == nil || bytes.Equal(peer.Identity.Identifier, accountability.OWN_ID.Identifier) { + return false + } + + if existingPeer, exists := this[peer.Identity.StringIdentifier]; exists { + existingPeer.Address = peer.Address + existingPeer.GossipPort = peer.GossipPort + existingPeer.PeeringPort = peer.PeeringPort + + // trigger update peer + + return false + } else { + this[peer.Identity.StringIdentifier] = peer + + // trigger add peer + + return true + } +} + +func (this PeerRegister) Contains(key string) bool { + if _, exists := this[key]; exists { + return true + } else { + return false + } +} + +func (this PeerRegister) Filter(filterFn func(this PeerRegister, req *request.Request) PeerRegister, req *request.Request) PeerRegister { + return filterFn(this, req) +} + +func (this PeerRegister) List() PeerList { + peerList := make(PeerList, len(this)) + + counter := 0 + for _, currentPeer := range this { + peerList[counter] = currentPeer + counter++ + } + + return peerList +} diff --git a/plugins/autopeering/plugin.go b/plugins/autopeering/plugin.go index d26a4e1f4b2292f5e583aa05197004bc86200594..4f8590d6304b29825fd63422bc9d210a7c0eee33 100644 --- a/plugins/autopeering/plugin.go +++ b/plugins/autopeering/plugin.go @@ -4,22 +4,44 @@ import ( "github.com/iotaledger/goshimmer/packages/daemon" "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/plugins/autopeering/peermanager" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response" "github.com/iotaledger/goshimmer/plugins/autopeering/server" + "github.com/iotaledger/goshimmer/plugins/gossip/neighbormanager" ) func configure(plugin *node.Plugin) { server.Configure(plugin) - peermanager.Configure(plugin) + protocol.Configure(plugin) daemon.Events.Shutdown.Attach(func() { server.Shutdown(plugin) - peermanager.Shutdown(plugin) + }) + + protocol.Events.IncomingRequestAccepted.Attach(func(req *request.Request) { + if neighbormanager.ACCEPTED_NEIGHBORS.AddOrUpdate(req.Issuer) { + plugin.LogSuccess("new neighbor accepted: " + req.Issuer.Address.String() + " / " + req.Issuer.Identity.StringIdentifier) + } + }) + + protocol.Events.OutgoingRequestAccepted.Attach(func(res *response.Response) { + if neighbormanager.CHOSEN_NEIGHBORS.AddOrUpdate(res.Issuer) { + plugin.LogSuccess("new neighbor chosen: " + res.Issuer.Address.String() + " / " + res.Issuer.Identity.StringIdentifier) + } + }) + + protocol.Events.DiscoverPeer.Attach(func(p *peer.Peer) { + if peermanager.KNOWN_PEERS.AddOrUpdate(p) { + plugin.LogInfo("new peer detected: " + p.Address.String() + " / " + p.Identity.StringIdentifier) + } }) } func run(plugin *node.Plugin) { server.Run(plugin) - peermanager.Run(plugin) + protocol.Run(plugin) } var PLUGIN = node.NewPlugin("Auto Peering", configure, run) diff --git a/plugins/autopeering/protocol/chosen_neighbor_processor.go b/plugins/autopeering/protocol/chosen_neighbor_processor.go new file mode 100644 index 0000000000000000000000000000000000000000..d76a4a190f53592e18ba7f48cf4d76557007c512 --- /dev/null +++ b/plugins/autopeering/protocol/chosen_neighbor_processor.go @@ -0,0 +1,61 @@ +package protocol + +import ( + "github.com/iotaledger/goshimmer/packages/accountability" + "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/plugins/autopeering/peermanager" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/constants" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" + "github.com/iotaledger/goshimmer/plugins/autopeering/server/tcp" + "github.com/iotaledger/goshimmer/plugins/gossip/neighbormanager" + "time" +) + +func createChosenNeighborProcessor(plugin *node.Plugin) func() { + return func() { + plugin.LogInfo("Starting Chosen Neighbor Processor ...") + plugin.LogSuccess("Starting Chosen Neighbor Processor ... done") + + chooseNeighbors(plugin) + + ticker := time.NewTicker(constants.FIND_NEIGHBOR_INTERVAL) + ticker: + for { + select { + case <- daemon.ShutdownSignal: + plugin.LogInfo("Stopping Chosen Neighbor Processor ...") + + break ticker + case <- ticker.C: + chooseNeighbors(plugin) + } + } + + plugin.LogSuccess("Stopping Chosen Neighbor Processor ... done") + } +} + +func chooseNeighbors(plugin *node.Plugin) { + for _, chosenNeighborCandidate := range peermanager.CHOSEN_NEIGHBOR_CANDIDATES { + go func(peer *peer.Peer) { + nodeId := peer.Identity.StringIdentifier + + if !neighbormanager.ACCEPTED_NEIGHBORS.Contains(nodeId) && + !neighbormanager.CHOSEN_NEIGHBORS.Contains(nodeId) && + accountability.OWN_ID.StringIdentifier != nodeId { + + if connectionAlive, err := request.Send(peer); err != nil { + plugin.LogDebug(err.Error()) + } else if connectionAlive { + plugin.LogDebug("sent TCP peering request to " + peer.String()) + + tcp.HandleConnection(peer.Conn) + } else { + plugin.LogDebug("sent UDP peering request to " + peer.String()) + } + } + }(chosenNeighborCandidate) + } +} diff --git a/plugins/autopeering/protocol/constants/constants.go b/plugins/autopeering/protocol/constants/constants.go new file mode 100644 index 0000000000000000000000000000000000000000..4e26ea74b9fce29ea226f23f1a1a5357b8161de5 --- /dev/null +++ b/plugins/autopeering/protocol/constants/constants.go @@ -0,0 +1,11 @@ +package constants + +import "time" + +const ( + NEIGHBOR_COUNT = 8 + + FIND_NEIGHBOR_INTERVAL = 5 * time.Second + PING_RANDOM_PEERS_INTERVAL = 1 * time.Second + PING_RANDOM_PEERS_COUNT = 3 +) diff --git a/plugins/autopeering/protocol/error_handler.go b/plugins/autopeering/protocol/error_handler.go new file mode 100644 index 0000000000000000000000000000000000000000..734a2f15961fb96f5a1eadfaa1e2264dafccca48 --- /dev/null +++ b/plugins/autopeering/protocol/error_handler.go @@ -0,0 +1,12 @@ +package protocol + +import ( + "github.com/iotaledger/goshimmer/packages/node" + "net" +) + +func createErrorHandler(plugin *node.Plugin) func(ip net.IP, err error) { + return func(ip net.IP, err error) { + plugin.LogDebug("error when communicating with " + ip.String() + ": " + err.Error()) + } +} diff --git a/plugins/autopeering/protocol/events.go b/plugins/autopeering/protocol/events.go new file mode 100644 index 0000000000000000000000000000000000000000..6d4d78bca27f20e033d62263aa0dd2028f609a92 --- /dev/null +++ b/plugins/autopeering/protocol/events.go @@ -0,0 +1,84 @@ +package protocol + +import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response" + "reflect" +) + +var Events = protocolEvents{ + DiscoverPeer: &peerEvent{make(map[uintptr]PeerConsumer)}, + IncomingRequestAccepted: &requestEvent{make(map[uintptr]RequestConsumer)}, + IncomingRequestRejected: &requestEvent{make(map[uintptr]RequestConsumer)}, + OutgoingRequestAccepted: &responseEvent{make(map[uintptr]ResponseConsumer)}, + OutgoingRequestRejected: &responseEvent{make(map[uintptr]ResponseConsumer)}, +} + +type protocolEvents struct { + DiscoverPeer *peerEvent + IncomingRequestAccepted *requestEvent + IncomingRequestRejected *requestEvent + OutgoingRequestAccepted *responseEvent + OutgoingRequestRejected *responseEvent +} + +type peerEvent struct { + callbacks map[uintptr]PeerConsumer +} + +func (this *peerEvent) Attach(callback PeerConsumer) { + this.callbacks[reflect.ValueOf(callback).Pointer()] = callback +} + +func (this *peerEvent) Detach(callback PeerConsumer) { + delete(this.callbacks, reflect.ValueOf(callback).Pointer()) +} + +func (this *peerEvent) Trigger(p *peer.Peer) { + for _, callback := range this.callbacks { + callback(p) + } +} + +type requestEvent struct { + callbacks map[uintptr]RequestConsumer +} + +func (this *requestEvent) Attach(callback RequestConsumer) { + this.callbacks[reflect.ValueOf(callback).Pointer()] = callback +} + +func (this *requestEvent) Detach(callback RequestConsumer) { + delete(this.callbacks, reflect.ValueOf(callback).Pointer()) +} + +func (this *requestEvent) Trigger(req *request.Request) { + for _, callback := range this.callbacks { + callback(req) + } +} + +type responseEvent struct { + callbacks map[uintptr]ResponseConsumer +} + +func (this *responseEvent) Attach(callback ResponseConsumer) { + this.callbacks[reflect.ValueOf(callback).Pointer()] = callback +} + +func (this *responseEvent) Detach(callback ResponseConsumer) { + delete(this.callbacks, reflect.ValueOf(callback).Pointer()) +} + +func (this *responseEvent) Trigger(res *response.Response) { + for _, callback := range this.callbacks { + callback(res) + } +} + +type PeerConsumer = func(p *peer.Peer) + +type RequestConsumer = func(req *request.Request) + +type ResponseConsumer = func(res *response.Response) diff --git a/plugins/autopeering/protocol/incoming_ping_processor.go b/plugins/autopeering/protocol/incoming_ping_processor.go new file mode 100644 index 0000000000000000000000000000000000000000..f2b30f9ad90ea593aa8f212b99c6061ef1fc2cca --- /dev/null +++ b/plugins/autopeering/protocol/incoming_ping_processor.go @@ -0,0 +1,18 @@ +package protocol + +import ( + "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/ping" +) + +func createIncomingPingProcessor(plugin *node.Plugin) func(p *ping.Ping) { + return func(p *ping.Ping) { + if p.Issuer.Conn != nil { + plugin.LogDebug("received TCP ping from " + p.Issuer.String()) + } else { + plugin.LogDebug("received UDP ping from " + p.Issuer.String()) + } + + Events.DiscoverPeer.Trigger(p.Issuer) + } +} diff --git a/plugins/autopeering/protocol/incoming_request_processor.go b/plugins/autopeering/protocol/incoming_request_processor.go new file mode 100644 index 0000000000000000000000000000000000000000..2f94ff19ef1f8fd35a0cd405b2433292d76332dc --- /dev/null +++ b/plugins/autopeering/protocol/incoming_request_processor.go @@ -0,0 +1,47 @@ +package protocol + +import ( + "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/plugins/autopeering/peermanager" + "github.com/iotaledger/goshimmer/plugins/autopeering/peermanager/types" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/constants" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" + "github.com/iotaledger/goshimmer/plugins/gossip/neighbormanager" +) + +func createIncomingRequestProcessor(plugin *node.Plugin) func(req *request.Request) { + return func(req *request.Request) { + Events.DiscoverPeer.Trigger(req.Issuer) + + if req.Issuer.Conn != nil { + plugin.LogDebug("received TCP peering request from " + req.Issuer.String()) + } else { + plugin.LogDebug("received UDP peering request from " + req.Issuer.String()) + } + + if len(neighbormanager.ACCEPTED_NEIGHBORS) <= constants.NEIGHBOR_COUNT / 2 { + if err := req.Accept(proposedPeeringCandidates(req)); err != nil { + plugin.LogDebug("error when sending response to" + req.Issuer.String()) + } + + plugin.LogDebug("sent positive peering response to " + req.Issuer.String()) + + Events.IncomingRequestAccepted.Trigger(req) + } else { + if err := req.Reject(proposedPeeringCandidates(req)); err != nil { + plugin.LogDebug("error when sending response to" + req.Issuer.String()) + } + + plugin.LogDebug("sent negative peering response to " + req.Issuer.String()) + + Events.IncomingRequestRejected.Trigger(req) + } + } +} + +func proposedPeeringCandidates(req *request.Request) types.PeerList { + return peermanager.NEIGHBORHOOD.List().Filter(func(p *peer.Peer) bool { + return p.Identity.PublicKey != nil + }).Sort(peermanager.CHOSEN_NEIGHBOR_DISTANCE(req)) +} \ No newline at end of file diff --git a/plugins/autopeering/protocol/incoming_response_processor.go b/plugins/autopeering/protocol/incoming_response_processor.go new file mode 100644 index 0000000000000000000000000000000000000000..d99cf1cd0bb245963a46303fb755ef8ddbf58434 --- /dev/null +++ b/plugins/autopeering/protocol/incoming_response_processor.go @@ -0,0 +1,33 @@ +package protocol + +import ( + "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response" + "strconv" +) + +func createIncomingResponseProcessor(plugin *node.Plugin) func(peeringResponse *response.Response) { + return func(peeringResponse *response.Response) { + Events.DiscoverPeer.Trigger(peeringResponse.Issuer) + for _, peer := range peeringResponse.Peers { + Events.DiscoverPeer.Trigger(peer) + } + + if peeringResponse.Issuer.Conn == nil { + plugin.LogDebug("received UDP peering response from " + peeringResponse.Issuer.String()) + } else { + plugin.LogDebug("received TCP peering response from " + peeringResponse.Issuer.String()) + + peeringResponse.Issuer.Conn.Close() + } + + switch peeringResponse.Type { + case response.TYPE_ACCEPT: + Events.OutgoingRequestAccepted.Trigger(peeringResponse) + case response.TYPE_REJECT: + Events.OutgoingRequestRejected.Trigger(peeringResponse) + default: + plugin.LogDebug("invalid response type in peering response of " + peeringResponse.Issuer.Address.String() + ":" + strconv.Itoa(int(peeringResponse.Issuer.PeeringPort))) + } + } +} diff --git a/plugins/autopeering/protocol/outgoing_ping_processor.go b/plugins/autopeering/protocol/outgoing_ping_processor.go new file mode 100644 index 0000000000000000000000000000000000000000..808cb1aefb2e80564101612192621c768a06bfa0 --- /dev/null +++ b/plugins/autopeering/protocol/outgoing_ping_processor.go @@ -0,0 +1,83 @@ +package protocol + +import ( + "github.com/iotaledger/goshimmer/packages/accountability" + "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/plugins/autopeering/parameters" + "github.com/iotaledger/goshimmer/plugins/autopeering/peermanager" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/constants" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/ping" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/salt" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/types" + "github.com/iotaledger/goshimmer/plugins/autopeering/saltmanager" + "github.com/iotaledger/goshimmer/plugins/gossip/neighbormanager" + "math/rand" + "net" + "time" +) + +func createOutgoingPingProcessor(plugin *node.Plugin) func() { + return func() { + plugin.LogInfo("Starting Ping Processor ...") + plugin.LogSuccess("Starting Ping Processor ... done") + + outgoingPing := &ping.Ping{ + Issuer: &peer.Peer{ + Identity: accountability.OWN_ID, + PeeringProtocolType: types.PROTOCOL_TYPE_TCP, + PeeringPort: uint16(*parameters.UDP_PORT.Value), + GossipProtocolType: types.PROTOCOL_TYPE_TCP, + GossipPort: uint16(*parameters.UDP_PORT.Value), + Address: net.IPv4(0, 0, 0, 0), + Salt: saltmanager.PUBLIC_SALT, + }, + } + outgoingPing.Sign() + + saltmanager.Events.UpdatePublicSalt.Attach(func(salt *salt.Salt) { + outgoingPing.Sign() + }) + + pingPeers(plugin, outgoingPing) + + ticker := time.NewTicker(constants.PING_RANDOM_PEERS_INTERVAL) + ticker: + for { + select { + case <- daemon.ShutdownSignal: + plugin.LogInfo("Stopping Ping Processor ...") + + break ticker + case <- ticker.C: + pingPeers(plugin, outgoingPing) + } + } + + plugin.LogSuccess("Stopping Ping Processor ... done") + } +} + +func pingPeers(plugin *node.Plugin, outgoingPing *ping.Ping) { + chosenPeers := make(map[string]*peer.Peer) + + for i := 0; i < constants.PING_RANDOM_PEERS_COUNT; i++ { + randomNeighborHoodPeer := peermanager.NEIGHBORHOOD_LIST[rand.Intn(len(peermanager.NEIGHBORHOOD_LIST))] + + nodeId := randomNeighborHoodPeer.Identity.StringIdentifier + + if !neighbormanager.ACCEPTED_NEIGHBORS.Contains(nodeId) && !neighbormanager.CHOSEN_NEIGHBORS.Contains(nodeId) && + nodeId != accountability.OWN_ID.StringIdentifier { + chosenPeers[randomNeighborHoodPeer.Identity.StringIdentifier] = randomNeighborHoodPeer + } + } + + for _, chosenPeer := range chosenPeers { + go func(chosenPeer *peer.Peer) { + chosenPeer.Send(outgoingPing.Marshal(), false) + + plugin.LogDebug("sent ping to " + chosenPeer.String()) + }(chosenPeer) + } +} \ No newline at end of file diff --git a/plugins/autopeering/protocol/peer/constants.go b/plugins/autopeering/protocol/peer/constants.go index b1b65ea4003558b2fb83c6195c5361428df96278..53d53dd5245f7a588ee4bb43894ed89f91126531 100644 --- a/plugins/autopeering/protocol/peer/constants.go +++ b/plugins/autopeering/protocol/peer/constants.go @@ -2,6 +2,7 @@ package peer import ( "github.com/iotaledger/goshimmer/packages/identity" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/salt" ) const ( @@ -12,6 +13,7 @@ const ( MARSHALLED_PEERING_PORT_START = MARSHALLED_PEERING_PROTOCOL_TYPE_END MARSHALLED_GOSSIP_PROTOCOL_TYPE_START = MARSHALLED_PEERING_PORT_END MARSHALLED_GOSSIP_PORT_START = MARSHALLED_GOSSIP_PROTOCOL_TYPE_END + MARSHALLED_SALT_START = MARSHALLED_GOSSIP_PORT_END MARSHALLED_PUBLIC_KEY_END = MARSHALLED_PUBLIC_KEY_START + MARSHALLED_PUBLIC_KEY_SIZE MARSHALLED_ADDRESS_TYPE_END = MARSHALLED_ADDRESS_TYPE_START + MARSHALLED_ADDRESS_TYPE_SIZE @@ -20,6 +22,7 @@ const ( MARSHALLED_PEERING_PORT_END = MARSHALLED_PEERING_PORT_START + MARSHALLED_PEERING_PORT_SIZE MARSHALLED_GOSSIP_PROTOCOL_TYPE_END = MARSHALLED_GOSSIP_PROTOCOL_TYPE_START + MARSHALLED_GOSSIP_PROTOCOL_TYPE_SIZE MARSHALLED_GOSSIP_PORT_END = MARSHALLED_GOSSIP_PORT_START + MARSHALLED_GOSSIP_PORT_SIZE + MARSHALLED_SALT_END = MARSHALLED_SALT_START + MARSHALLED_SALT_SIZE MARSHALLED_PUBLIC_KEY_SIZE = identity.PUBLIC_KEY_BYTE_LENGTH MARSHALLED_ADDRESS_TYPE_SIZE = 1 @@ -28,5 +31,6 @@ const ( MARSHALLED_PEERING_PORT_SIZE = 2 MARSHALLED_GOSSIP_PROTOCOL_TYPE_SIZE = 1 MARSHALLED_GOSSIP_PORT_SIZE = 2 - MARSHALLED_TOTAL_SIZE = MARSHALLED_GOSSIP_PORT_END + MARSHALLED_SALT_SIZE = salt.SALT_MARSHALLED_SIZE + MARSHALLED_TOTAL_SIZE = MARSHALLED_SALT_END ) diff --git a/plugins/autopeering/protocol/peer/peer.go b/plugins/autopeering/protocol/peer/peer.go index a004bfd6b2a7dfb4febe35b2ffa346f1a7597b69..8018281fac539da5cd08de3481e8181273ce157a 100644 --- a/plugins/autopeering/protocol/peer/peer.go +++ b/plugins/autopeering/protocol/peer/peer.go @@ -3,19 +3,25 @@ package peer import ( "encoding/binary" "github.com/iotaledger/goshimmer/packages/identity" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol" + "github.com/iotaledger/goshimmer/packages/network" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/salt" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/types" "github.com/pkg/errors" "net" "strconv" + "sync" ) type Peer struct { Identity *identity.Identity Address net.IP - PeeringProtocolType protocol.ProtocolType + PeeringProtocolType types.ProtocolType PeeringPort uint16 - GossipProtocolType protocol.ProtocolType + GossipProtocolType types.ProtocolType GossipPort uint16 + Salt *salt.Salt + Conn *network.ManagedConnection + connectMutex sync.Mutex } func Unmarshal(data []byte) (*Peer, error) { @@ -28,21 +34,78 @@ func Unmarshal(data []byte) (*Peer, error) { } switch data[MARSHALLED_ADDRESS_TYPE_START] { - case protocol.ADDRESS_TYPE_IPV4: + case types.ADDRESS_TYPE_IPV4: peer.Address = net.IP(data[MARSHALLED_ADDRESS_START:MARSHALLED_ADDRESS_END]).To4() - case protocol.ADDRESS_TYPE_IPV6: + case types.ADDRESS_TYPE_IPV6: peer.Address = net.IP(data[MARSHALLED_ADDRESS_START:MARSHALLED_ADDRESS_END]).To16() } - peer.PeeringProtocolType = protocol.ProtocolType(data[MARSHALLED_PEERING_PROTOCOL_TYPE_START]) + peer.PeeringProtocolType = types.ProtocolType(data[MARSHALLED_PEERING_PROTOCOL_TYPE_START]) peer.PeeringPort = binary.BigEndian.Uint16(data[MARSHALLED_PEERING_PORT_START:MARSHALLED_PEERING_PORT_END]) - peer.GossipProtocolType = protocol.ProtocolType(data[MARSHALLED_GOSSIP_PROTOCOL_TYPE_START]) + peer.GossipProtocolType = types.ProtocolType(data[MARSHALLED_GOSSIP_PROTOCOL_TYPE_START]) peer.GossipPort = binary.BigEndian.Uint16(data[MARSHALLED_GOSSIP_PORT_START:MARSHALLED_GOSSIP_PORT_END]) + if unmarshalledSalt, err := salt.Unmarshal(data[MARSHALLED_SALT_START:MARSHALLED_SALT_END]); err != nil { + return nil, err + } else { + peer.Salt = unmarshalledSalt + } + return peer, nil } +func (peer *Peer) Send(data []byte, keepConnectionAlive bool) error { + newConnection, err := peer.Connect() + if err != nil { + return err + } + + if _, err := peer.Conn.Write(data); err != nil { + return err + } + + if newConnection && !keepConnectionAlive { + peer.Conn.Close() + } + + return nil +} + +func (peer *Peer) Connect() (bool, error) { + if peer.Conn == nil { + peer.connectMutex.Lock() + defer peer.connectMutex.Unlock() + + if peer.Conn == nil { + var protocolString string + switch peer.PeeringProtocolType { + case types.PROTOCOL_TYPE_TCP: + protocolString = "tcp" + case types.PROTOCOL_TYPE_UDP: + protocolString = "udp" + default: + return false, errors.New("unsupported peering protocol in peer " + peer.Address.String()) + } + + conn, err := net.Dial(protocolString, peer.Address.String()+":"+strconv.Itoa(int(peer.PeeringPort))) + if err != nil { + return false, errors.New("error when connecting to " + peer.Address.String() + " during peering process: " + err.Error()) + } else { + peer.Conn = network.NewManagedConnection(conn) + + peer.Conn.Events.Close.Attach(func() { + peer.Conn = nil + }) + + return true, nil + } + } + } + + return false, nil +} + func (peer *Peer) Marshal() []byte { result := make([]byte, MARSHALLED_TOTAL_SIZE) @@ -51,9 +114,9 @@ func (peer *Peer) Marshal() []byte { switch len(peer.Address) { case net.IPv4len: - result[MARSHALLED_ADDRESS_TYPE_START] = protocol.ADDRESS_TYPE_IPV4 + result[MARSHALLED_ADDRESS_TYPE_START] = types.ADDRESS_TYPE_IPV4 case net.IPv6len: - result[MARSHALLED_ADDRESS_TYPE_START] = protocol.ADDRESS_TYPE_IPV6 + result[MARSHALLED_ADDRESS_TYPE_START] = types.ADDRESS_TYPE_IPV6 default: panic("invalid address in peer") } @@ -66,6 +129,8 @@ func (peer *Peer) Marshal() []byte { result[MARSHALLED_GOSSIP_PROTOCOL_TYPE_START] = peer.GossipProtocolType binary.BigEndian.PutUint16(result[MARSHALLED_GOSSIP_PORT_START:MARSHALLED_GOSSIP_PORT_END], peer.GossipPort) + copy(result[MARSHALLED_SALT_START:MARSHALLED_SALT_END], peer.Salt.Marshal()) + return result } diff --git a/plugins/autopeering/protocol/ping/constants.go b/plugins/autopeering/protocol/ping/constants.go new file mode 100644 index 0000000000000000000000000000000000000000..aca4a719836d8b7f5f68b0a1f7ff2a383b066a73 --- /dev/null +++ b/plugins/autopeering/protocol/ping/constants.go @@ -0,0 +1,29 @@ +package ping + +import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/constants" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" +) + +const ( + MARSHALLED_PACKET_HEADER = 0x04 + + PACKET_HEADER_START = 0 + MARSHALLED_ISSUER_START = PACKET_HEADER_END + MARSHALLED_PEERS_START = MARSHALLED_ISSUER_END + MARSHALLED_SIGNATURE_START = MARSHALLED_PEERS_END + + PACKET_HEADER_END = PACKET_HEADER_START + PACKET_HEADER_SIZE + MARSHALLED_ISSUER_END = MARSHALLED_ISSUER_START + MARSHALLED_ISSUER_SIZE + MARSHALLED_PEERS_END = MARSHALLED_PEERS_START + MARSHALLED_PEERS_SIZE + MARSHALLED_SIGNATURE_END = MARSHALLED_SIGNATURE_START + MARSHALLED_SIGNATURE_SIZE + + PACKET_HEADER_SIZE = 1 + MARSHALLED_ISSUER_SIZE = peer.MARSHALLED_TOTAL_SIZE + MARSHALLED_PEER_ENTRY_FLAG_SIZE = 1 + MARSHALLED_PEER_ENTRY_SIZE = MARSHALLED_PEER_ENTRY_FLAG_SIZE + peer.MARSHALLED_TOTAL_SIZE + MARSHALLED_PEERS_SIZE = MARSHALLED_PEER_ENTRY_SIZE * constants.NEIGHBOR_COUNT + MARSHALLED_SIGNATURE_SIZE = 65 + + MARSHALLED_TOTAL_SIZE = MARSHALLED_SIGNATURE_END +) diff --git a/plugins/autopeering/protocol/ping/errors.go b/plugins/autopeering/protocol/ping/errors.go new file mode 100644 index 0000000000000000000000000000000000000000..218d60917a52a4080f5a2ed142b44ed43e23099f --- /dev/null +++ b/plugins/autopeering/protocol/ping/errors.go @@ -0,0 +1,8 @@ +package ping + +import "github.com/pkg/errors" + +var ( + ErrInvalidSignature = errors.New("invalid signature in ping") + ErrMalformedPing = errors.New("malformed ping") +) diff --git a/plugins/autopeering/protocol/ping/ping.go b/plugins/autopeering/protocol/ping/ping.go new file mode 100644 index 0000000000000000000000000000000000000000..bdd6b745f8ceabe2a70365d1287d0dd471c7703b --- /dev/null +++ b/plugins/autopeering/protocol/ping/ping.go @@ -0,0 +1,69 @@ +package ping + +import ( + "bytes" + "github.com/iotaledger/goshimmer/packages/identity" + "github.com/iotaledger/goshimmer/plugins/autopeering/peermanager/types" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + "github.com/iotaledger/goshimmer/plugins/autopeering/saltmanager" +) + +type Ping struct { + Issuer *peer.Peer + Neighbors types.PeerList + Signature [MARSHALLED_SIGNATURE_SIZE]byte +} + +func Unmarshal(data []byte) (*Ping, error) { + if data[0] != MARSHALLED_PACKET_HEADER || len(data) != MARSHALLED_TOTAL_SIZE { + return nil, ErrMalformedPing + } + + ping := &Ping{} + + if unmarshalledPeer, err := peer.Unmarshal(data[MARSHALLED_ISSUER_START:MARSHALLED_ISSUER_END]); err != nil { + return nil, err + } else { + ping.Issuer = unmarshalledPeer + } + + if err := saltmanager.CheckSalt(ping.Issuer.Salt); err != nil { + return nil, err + } + + if issuer, err := identity.FromSignedData(data[:MARSHALLED_SIGNATURE_START], data[MARSHALLED_SIGNATURE_START:]); err != nil { + return nil, err + } else { + if !bytes.Equal(issuer.Identifier, ping.Issuer.Identity.Identifier) { + return nil, ErrInvalidSignature + } + } + copy(ping.Signature[:], data[MARSHALLED_SIGNATURE_START:MARSHALLED_SIGNATURE_END]) + + return ping, nil +} + +func (ping *Ping) Marshal() []byte { + result := make([]byte, MARSHALLED_TOTAL_SIZE) + + result[PACKET_HEADER_START] = MARSHALLED_PACKET_HEADER + copy(result[MARSHALLED_ISSUER_START:MARSHALLED_ISSUER_END], ping.Issuer.Marshal()) + for i, neighbor := range ping.Neighbors { + entryStartOffset := MARSHALLED_PEERS_START + i * MARSHALLED_PEER_ENTRY_SIZE + + result[entryStartOffset] = 1 + + copy(result[entryStartOffset + 1:entryStartOffset + MARSHALLED_PEER_ENTRY_SIZE], neighbor.Marshal()) + } + copy(result[MARSHALLED_SIGNATURE_START:MARSHALLED_SIGNATURE_END], ping.Signature[:MARSHALLED_SIGNATURE_SIZE]) + + return result +} + +func (this *Ping) Sign() { + if signature, err := this.Issuer.Identity.Sign(this.Marshal()[:MARSHALLED_SIGNATURE_START]); err != nil { + panic(err) + } else { + copy(this.Signature[:], signature) + } +} diff --git a/plugins/autopeering/protocol/plugin.go b/plugins/autopeering/protocol/plugin.go new file mode 100644 index 0000000000000000000000000000000000000000..e7e9820a2c8c1536a330ce34ef0c3760cd1a303b --- /dev/null +++ b/plugins/autopeering/protocol/plugin.go @@ -0,0 +1,30 @@ +package protocol + +import ( + "github.com/iotaledger/goshimmer/packages/daemon" + "github.com/iotaledger/goshimmer/packages/node" + "github.com/iotaledger/goshimmer/plugins/autopeering/server/tcp" + "github.com/iotaledger/goshimmer/plugins/autopeering/server/udp" +) + +func Configure(plugin *node.Plugin) { + incomingPingProcessor := createIncomingPingProcessor(plugin) + incomingRequestProcessor := createIncomingRequestProcessor(plugin) + incomingResponseProcessor := createIncomingResponseProcessor(plugin) + errorHandler := createErrorHandler(plugin) + + udp.Events.ReceivePing.Attach(incomingPingProcessor) + udp.Events.ReceiveRequest.Attach(incomingRequestProcessor) + udp.Events.ReceiveResponse.Attach(incomingResponseProcessor) + udp.Events.Error.Attach(errorHandler) + + tcp.Events.ReceivePing.Attach(incomingPingProcessor) + tcp.Events.ReceiveRequest.Attach(incomingRequestProcessor) + tcp.Events.ReceiveResponse.Attach(incomingResponseProcessor) + tcp.Events.Error.Attach(errorHandler) +} + +func Run(plugin *node.Plugin) { + daemon.BackgroundWorker(createChosenNeighborProcessor(plugin)) + daemon.BackgroundWorker(createOutgoingPingProcessor(plugin)) +} diff --git a/plugins/autopeering/protocol/request/constants.go b/plugins/autopeering/protocol/request/constants.go index eb05a8323e745424743a52b1184cb2cab6fa2255..acd24072bc7d13e2eca0441fc4e4cddf617536e9 100644 --- a/plugins/autopeering/protocol/request/constants.go +++ b/plugins/autopeering/protocol/request/constants.go @@ -2,23 +2,19 @@ package request import ( "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/salt" ) const ( PACKET_HEADER_SIZE = 1 ISSUER_SIZE = peer.MARSHALLED_TOTAL_SIZE - SALT_SIZE = salt.SALT_MARSHALLED_SIZE SIGNATURE_SIZE = 65 PACKET_HEADER_START = 0 ISSUER_START = PACKET_HEADER_END - SALT_START = ISSUER_END - SIGNATURE_START = SALT_END + SIGNATURE_START = ISSUER_END PACKET_HEADER_END = PACKET_HEADER_START + PACKET_HEADER_SIZE ISSUER_END = ISSUER_START + ISSUER_SIZE - SALT_END = SALT_START + SALT_SIZE SIGNATURE_END = SIGNATURE_START + SIGNATURE_SIZE MARSHALLED_TOTAL_SIZE = SIGNATURE_END diff --git a/plugins/autopeering/protocol/request/request.go b/plugins/autopeering/protocol/request/request.go index bf499741007b2193603fdeb1bfdcd5ddcb56d0a1..60eae5c9c8837fd0379132bd0ff6fc7a4efdfaeb 100644 --- a/plugins/autopeering/protocol/request/request.go +++ b/plugins/autopeering/protocol/request/request.go @@ -4,14 +4,13 @@ import ( "bytes" "github.com/iotaledger/goshimmer/packages/identity" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/salt" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response" "github.com/iotaledger/goshimmer/plugins/autopeering/saltmanager" "time" ) type Request struct { Issuer *peer.Peer - Salt *salt.Salt Signature [SIGNATURE_SIZE]byte } @@ -28,17 +27,11 @@ func Unmarshal(data []byte) (*Request, error) { peeringRequest.Issuer = unmarshalledPeer } - if unmarshalledSalt, err := salt.Unmarshal(data[SALT_START:SALT_END]); err != nil { - return nil, err - } else { - peeringRequest.Salt = unmarshalledSalt - } - now := time.Now() - if peeringRequest.Salt.ExpirationTime.Before(now.Add(-1 * time.Minute)) { + if peeringRequest.Issuer.Salt.ExpirationTime.Before(now.Add(-1 * time.Minute)) { return nil, ErrPublicSaltExpired } - if peeringRequest.Salt.ExpirationTime.After(now.Add(saltmanager.PUBLIC_SALT_LIFETIME + 1*time.Minute)) { + if peeringRequest.Issuer.Salt.ExpirationTime.After(now.Add(saltmanager.PUBLIC_SALT_LIFETIME + 1*time.Minute)) { return nil, ErrPublicSaltInvalidLifetime } @@ -54,6 +47,29 @@ func Unmarshal(data []byte) (*Request, error) { return peeringRequest, nil } +func (this *Request) Accept(peers []*peer.Peer) error { + if _, err := this.Issuer.Connect(); err != nil { + return err + } + + peeringResponse := &response.Response{ + Type: response.TYPE_ACCEPT, + Issuer: OUTGOING_REQUEST.Issuer, + Peers: peers, + } + peeringResponse.Sign() + + if err := this.Issuer.Send(peeringResponse.Marshal(), false); err != nil { + return err + } + + return nil +} + +func (this *Request) Reject(peers []*peer.Peer) error { + return nil +} + func (this *Request) Sign() { if signature, err := this.Issuer.Identity.Sign(this.Marshal()[:SIGNATURE_START]); err != nil { panic(err) @@ -67,7 +83,6 @@ func (this *Request) Marshal() []byte { result[PACKET_HEADER_START] = MARSHALLED_PACKET_HEADER copy(result[ISSUER_START:ISSUER_END], this.Issuer.Marshal()) - copy(result[SALT_START:SALT_END], this.Salt.Marshal()) copy(result[SIGNATURE_START:SIGNATURE_END], this.Signature[:SIGNATURE_SIZE]) return result diff --git a/plugins/autopeering/protocol/request/send.go b/plugins/autopeering/protocol/request/send.go new file mode 100644 index 0000000000000000000000000000000000000000..739eba9dcf77cd3af58834a8360b68c575a77cc6 --- /dev/null +++ b/plugins/autopeering/protocol/request/send.go @@ -0,0 +1,51 @@ +package request + +import ( + "github.com/iotaledger/goshimmer/packages/accountability" + "github.com/iotaledger/goshimmer/plugins/autopeering/parameters" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/salt" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/types" + "github.com/iotaledger/goshimmer/plugins/autopeering/saltmanager" + "github.com/pkg/errors" + "net" +) + +var OUTGOING_REQUEST *Request + +func Send(peer *peer.Peer) (bool, error) { + var keepAlive bool + switch peer.PeeringProtocolType { + case types.PROTOCOL_TYPE_TCP: + keepAlive = true + case types.PROTOCOL_TYPE_UDP: + keepAlive = false + default: + return false, errors.New("peer uses invalid protocol") + } + + if err := peer.Send(OUTGOING_REQUEST.Marshal(), keepAlive); err != nil { + return false, err + } + + return keepAlive, nil +} + +func init() { + OUTGOING_REQUEST = &Request{ + Issuer: &peer.Peer{ + Identity: accountability.OWN_ID, + PeeringProtocolType: types.PROTOCOL_TYPE_TCP, + PeeringPort: uint16(*parameters.UDP_PORT.Value), + GossipProtocolType: types.PROTOCOL_TYPE_TCP, + GossipPort: uint16(*parameters.UDP_PORT.Value), + Address: net.IPv4(0, 0, 0, 0), + Salt: saltmanager.PUBLIC_SALT, + }, + } + OUTGOING_REQUEST.Sign() + + saltmanager.Events.UpdatePublicSalt.Attach(func(salt *salt.Salt) { + OUTGOING_REQUEST.Sign() + }) +} diff --git a/plugins/autopeering/protocol/response/constants.go b/plugins/autopeering/protocol/response/constants.go index 1934656553ab90adf5b941ad6f17b91dfad9d572..250e5c98e45d495c0beaf7a3a2d849b440fa9e9b 100644 --- a/plugins/autopeering/protocol/response/constants.go +++ b/plugins/autopeering/protocol/response/constants.go @@ -1,7 +1,7 @@ package response import ( - "github.com/iotaledger/goshimmer/plugins/autopeering/protocol" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/constants" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" ) @@ -9,7 +9,7 @@ const ( TYPE_REJECT = Type(0) TYPE_ACCEPT = Type(1) - MARSHALLED_PEERS_AMOUNT = protocol.NEIGHBOR_COUNT + protocol.NEIGHBOR_COUNT*protocol.NEIGHBOR_COUNT + MARSHALLED_PEERS_AMOUNT = constants.NEIGHBOR_COUNT + constants.NEIGHBOR_COUNT * constants.NEIGHBOR_COUNT MARHSALLED_PACKET_HEADER = 0xBC MARSHALLED_PACKET_HEADER_START = 0 diff --git a/plugins/autopeering/protocol/types.go b/plugins/autopeering/protocol/types.go deleted file mode 100644 index 99e489a3b3ae8cf6ec25e9230d874c3b7f3562e2..0000000000000000000000000000000000000000 --- a/plugins/autopeering/protocol/types.go +++ /dev/null @@ -1,5 +0,0 @@ -package protocol - -type AddressType = byte - -type ProtocolType = byte diff --git a/plugins/autopeering/protocol/constants.go b/plugins/autopeering/protocol/types/constants.go similarity index 80% rename from plugins/autopeering/protocol/constants.go rename to plugins/autopeering/protocol/types/constants.go index 5405fa58a93d3e0a0eea8e5b7ab3f265d4963aaa..f17ad2320186975a0b89b78ad796202641855653 100644 --- a/plugins/autopeering/protocol/constants.go +++ b/plugins/autopeering/protocol/types/constants.go @@ -1,8 +1,6 @@ -package protocol +package types const ( - NEIGHBOR_COUNT = 8 - PROTOCOL_TYPE_TCP = ProtocolType(0) PROTOCOL_TYPE_UDP = ProtocolType(1) diff --git a/plugins/autopeering/protocol/types/types.go b/plugins/autopeering/protocol/types/types.go new file mode 100644 index 0000000000000000000000000000000000000000..fb6e24658eb3af95298e0429caa0b0c1a3fff257 --- /dev/null +++ b/plugins/autopeering/protocol/types/types.go @@ -0,0 +1,5 @@ +package types + +type AddressType = byte + +type ProtocolType = byte \ No newline at end of file diff --git a/plugins/autopeering/saltmanager/errors.go b/plugins/autopeering/saltmanager/errors.go new file mode 100644 index 0000000000000000000000000000000000000000..bb2ec0d4192a275471ac46fba282557b8f3c66f6 --- /dev/null +++ b/plugins/autopeering/saltmanager/errors.go @@ -0,0 +1,8 @@ +package saltmanager + +import "github.com/pkg/errors" + +var ( + ErrPublicSaltExpired = errors.New("expired public salt in ping") + ErrPublicSaltInvalidLifetime = errors.New("invalid public salt lifetime in ping") +) diff --git a/plugins/autopeering/saltmanager/utils.go b/plugins/autopeering/saltmanager/utils.go new file mode 100644 index 0000000000000000000000000000000000000000..bc7c2046b38ded5dfbe29b47c0cb9868338ba895 --- /dev/null +++ b/plugins/autopeering/saltmanager/utils.go @@ -0,0 +1,19 @@ +package saltmanager + +import ( + "github.com/iotadevelopment/shimmer/plugins/autopeering/saltmanager" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/salt" + "time" +) + +func CheckSalt(saltToCheck *salt.Salt) error { + now := time.Now() + if saltToCheck.ExpirationTime.Before(now.Add(-1 * time.Minute)) { + return ErrPublicSaltExpired + } + if saltToCheck.ExpirationTime.After(now.Add(saltmanager.PUBLIC_SALT_LIFETIME + 1 * time.Minute)) { + return ErrPublicSaltInvalidLifetime + } + + return nil +} diff --git a/plugins/autopeering/server/tcp/constants.go b/plugins/autopeering/server/tcp/constants.go index df04531275bf96f2c254282a40f84af277929e28..85b2f705559a1de8d8215fe7e3c24a23ffda1568 100644 --- a/plugins/autopeering/server/tcp/constants.go +++ b/plugins/autopeering/server/tcp/constants.go @@ -8,4 +8,5 @@ const ( STATE_INITIAL = byte(0) STATE_REQUEST = byte(1) STATE_RESPONSE = byte(2) + STATE_PING = byte(3) ) diff --git a/plugins/autopeering/server/tcp/events.go b/plugins/autopeering/server/tcp/events.go index fa67810d14e56bdcaeb5df3992db37defb8440e4..dedffa4f111b6c614e0db62a5d5347c7e5573d46 100644 --- a/plugins/autopeering/server/tcp/events.go +++ b/plugins/autopeering/server/tcp/events.go @@ -1,7 +1,7 @@ package tcp import ( - "github.com/iotaledger/goshimmer/packages/network" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/ping" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response" "net" @@ -9,50 +9,70 @@ import ( ) var Events = &pluginEvents{ - ReceiveRequest: &requestEvent{make(map[uintptr]ConnectionPeeringRequestConsumer)}, - ReceiveResponse: &responseEvent{make(map[uintptr]ConnectionPeeringResponseConsumer)}, - Error: &ipErrorEvent{make(map[uintptr]IPErrorConsumer)}, + ReceivePing: &pingEvent{make(map[uintptr]PingConsumer)}, + ReceiveRequest: &requestEvent{make(map[uintptr]RequestConsumer)}, + ReceiveResponse: &responseEvent{make(map[uintptr]ResponseConsumer)}, + Error: &ipErrorEvent{make(map[uintptr]IPErrorConsumer)}, } type pluginEvents struct { + ReceivePing *pingEvent ReceiveRequest *requestEvent ReceiveResponse *responseEvent Error *ipErrorEvent } +type pingEvent struct { + callbacks map[uintptr]PingConsumer +} + +func (this *pingEvent) Attach(callback PingConsumer) { + this.callbacks[reflect.ValueOf(callback).Pointer()] = callback +} + +func (this *pingEvent) Detach(callback PingConsumer) { + delete(this.callbacks, reflect.ValueOf(callback).Pointer()) +} + +func (this *pingEvent) Trigger(ping *ping.Ping) { + for _, callback := range this.callbacks { + callback(ping) + } +} + type requestEvent struct { - callbacks map[uintptr]ConnectionPeeringRequestConsumer + callbacks map[uintptr]RequestConsumer } -func (this *requestEvent) Attach(callback ConnectionPeeringRequestConsumer) { +func (this *requestEvent) Attach(callback RequestConsumer) { this.callbacks[reflect.ValueOf(callback).Pointer()] = callback } -func (this *requestEvent) Detach(callback ConnectionPeeringRequestConsumer) { +func (this *requestEvent) Detach(callback RequestConsumer) { delete(this.callbacks, reflect.ValueOf(callback).Pointer()) } -func (this *requestEvent) Trigger(conn *network.ManagedConnection, request *request.Request) { +func (this *requestEvent) Trigger(req *request.Request) { for _, callback := range this.callbacks { - callback(conn, request) + callback(req) } } type responseEvent struct { - callbacks map[uintptr]ConnectionPeeringResponseConsumer + callbacks map[uintptr]ResponseConsumer } -func (this *responseEvent) Attach(callback ConnectionPeeringResponseConsumer) { +func (this *responseEvent) Attach(callback ResponseConsumer) { this.callbacks[reflect.ValueOf(callback).Pointer()] = callback } -func (this *responseEvent) Detach(callback ConnectionPeeringResponseConsumer) { +func (this *responseEvent) Detach(callback ResponseConsumer) { delete(this.callbacks, reflect.ValueOf(callback).Pointer()) } -func (this *responseEvent) Trigger(conn *network.ManagedConnection, peeringResponse *response.Response) { +func (this *responseEvent) Trigger(peeringResponse *response.Response) { for _, callback := range this.callbacks { - callback(conn, peeringResponse) + callback(peeringResponse) } } diff --git a/plugins/autopeering/server/tcp/server.go b/plugins/autopeering/server/tcp/server.go index 1544c703848961e91f8a63c6584906132af36077..81127fa8daa6fa8227798fe057adf4a9eb7a268f 100644 --- a/plugins/autopeering/server/tcp/server.go +++ b/plugins/autopeering/server/tcp/server.go @@ -6,6 +6,7 @@ import ( "github.com/iotaledger/goshimmer/packages/network/tcp" "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/plugins/autopeering/parameters" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/ping" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response" "github.com/pkg/errors" @@ -62,7 +63,7 @@ func HandleConnection(conn *network.ManagedConnection) { ProcessIncomingPacket(&connectionState, &receiveBuffer, conn, data, &offset) }) - go conn.Read(make([]byte, int(math.Max(request.MARSHALLED_TOTAL_SIZE, response.MARSHALLED_TOTAL_SIZE)))) + go conn.Read(make([]byte, int(math.Max(ping.MARSHALLED_TOTAL_SIZE, math.Max(request.MARSHALLED_TOTAL_SIZE, response.MARSHALLED_TOTAL_SIZE))))) } func ProcessIncomingPacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte, offset *int) { @@ -83,6 +84,8 @@ func ProcessIncomingPacket(connectionState *byte, receiveBuffer *[]byte, conn *n *receiveBuffer = make([]byte, request.MARSHALLED_TOTAL_SIZE) case STATE_RESPONSE: *receiveBuffer = make([]byte, response.MARSHALLED_TOTAL_SIZE) + case STATE_PING: + *receiveBuffer = make([]byte, ping.MARSHALLED_TOTAL_SIZE) } } @@ -91,27 +94,33 @@ func ProcessIncomingPacket(connectionState *byte, receiveBuffer *[]byte, conn *n processIncomingRequestPacket(connectionState, receiveBuffer, conn, data, offset) case STATE_RESPONSE: processIncomingResponsePacket(connectionState, receiveBuffer, conn, data, offset) + case STATE_PING: + processIncomingPingPacket(connectionState, receiveBuffer, conn, data, offset) } } func parsePackageHeader(data []byte) (ConnectionState, []byte, error) { var connectionState ConnectionState - var receivedData []byte + var receiveBuffer []byte switch data[0] { case request.MARSHALLED_PACKET_HEADER: - receivedData = make([]byte, request.MARSHALLED_TOTAL_SIZE) + receiveBuffer = make([]byte, request.MARSHALLED_TOTAL_SIZE) connectionState = STATE_REQUEST case response.MARHSALLED_PACKET_HEADER: - receivedData = make([]byte, response.MARSHALLED_TOTAL_SIZE) + receiveBuffer = make([]byte, response.MARSHALLED_TOTAL_SIZE) connectionState = STATE_RESPONSE + case ping.MARSHALLED_PACKET_HEADER: + receiveBuffer = make([]byte, ping.MARSHALLED_TOTAL_SIZE) + + connectionState = STATE_PING default: return 0, nil, errors.New("invalid package header") } - return connectionState, receivedData, nil + return connectionState, receiveBuffer, nil } func processIncomingRequestPacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte, offset *int) { @@ -122,16 +131,21 @@ func processIncomingRequestPacket(connectionState *byte, receiveBuffer *[]byte, if *offset + len(data) < request.MARSHALLED_TOTAL_SIZE { *offset += len(data) } else { - if peeringRequest, err := request.Unmarshal(*receiveBuffer); err != nil { + if req, err := request.Unmarshal(*receiveBuffer); err != nil { Events.Error.Trigger(conn.RemoteAddr().(*net.TCPAddr).IP, err) conn.Close() return } else { - peeringRequest.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP + req.Issuer.Conn = conn + req.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP - Events.ReceiveRequest.Trigger(conn, peeringRequest) + req.Issuer.Conn.Events.Close.Attach(func() { + req.Issuer.Conn = nil + }) + + Events.ReceiveRequest.Trigger(req) } *connectionState = STATE_INITIAL @@ -150,16 +164,21 @@ func processIncomingResponsePacket(connectionState *byte, receiveBuffer *[]byte, if *offset + len(data) < response.MARSHALLED_TOTAL_SIZE { *offset += len(data) } else { - if peeringResponse, err := response.Unmarshal(*receiveBuffer); err != nil { + if res, err := response.Unmarshal(*receiveBuffer); err != nil { Events.Error.Trigger(conn.RemoteAddr().(*net.TCPAddr).IP, err) conn.Close() return } else { - peeringResponse.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP + res.Issuer.Conn = conn + res.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP + + res.Issuer.Conn.Events.Close.Attach(func() { + res.Issuer.Conn = nil + }) - Events.ReceiveResponse.Trigger(conn, peeringResponse) + Events.ReceiveResponse.Trigger(res) } *connectionState = STATE_INITIAL @@ -169,3 +188,36 @@ func processIncomingResponsePacket(connectionState *byte, receiveBuffer *[]byte, } } } + +func processIncomingPingPacket(connectionState *byte, receiveBuffer *[]byte, conn *network.ManagedConnection, data []byte, offset *int) { + remainingCapacity := int(math.Min(float64(ping.MARSHALLED_TOTAL_SIZE - *offset), float64(len(data)))) + + copy((*receiveBuffer)[*offset:], data[:remainingCapacity]) + + if *offset + len(data) < ping.MARSHALLED_TOTAL_SIZE { + *offset += len(data) + } else { + if ping, err := ping.Unmarshal(*receiveBuffer); err != nil { + Events.Error.Trigger(conn.RemoteAddr().(*net.TCPAddr).IP, err) + + conn.Close() + + return + } else { + ping.Issuer.Conn = conn + ping.Issuer.Address = conn.RemoteAddr().(*net.TCPAddr).IP + + ping.Issuer.Conn.Events.Close.Attach(func() { + ping.Issuer.Conn = nil + }) + + Events.ReceivePing.Trigger(ping) + } + + *connectionState = STATE_INITIAL + + if *offset + len(data) > ping.MARSHALLED_TOTAL_SIZE { + ProcessIncomingPacket(connectionState, receiveBuffer, conn, data[ping.MARSHALLED_TOTAL_SIZE:], offset) + } + } +} diff --git a/plugins/autopeering/server/tcp/types.go b/plugins/autopeering/server/tcp/types.go index 4298cf0b67b7ec02213532a585807d9af9a588a4..b8dca9d4e01aa82c3e09fac8b3bfcf76ba930e47 100644 --- a/plugins/autopeering/server/tcp/types.go +++ b/plugins/autopeering/server/tcp/types.go @@ -1,15 +1,17 @@ package tcp import ( - "github.com/iotaledger/goshimmer/packages/network" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/ping" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response" "net" ) -type ConnectionPeeringRequestConsumer = func(conn *network.ManagedConnection, request *request.Request) +type PingConsumer = func(p *ping.Ping) -type ConnectionPeeringResponseConsumer = func(conn *network.ManagedConnection, peeringResponse *response.Response) +type RequestConsumer = func(req *request.Request) + +type ResponseConsumer = func(peeringResponse *response.Response) type IPErrorConsumer = func(ip net.IP, err error) diff --git a/plugins/autopeering/server/udp/events.go b/plugins/autopeering/server/udp/events.go index d07d3cd76dcbd4ef3fd38592876ff3151744ba72..588e85b898a46a7dbe028f77c0d55a88033efb23 100644 --- a/plugins/autopeering/server/udp/events.go +++ b/plugins/autopeering/server/udp/events.go @@ -1,6 +1,7 @@ package udp import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/ping" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response" "net" @@ -8,17 +9,37 @@ import ( ) var Events = &pluginEvents{ - ReceiveRequest: &requestEvent{make(map[uintptr]ConnectionPeeringRequestConsumer)}, + ReceivePing: &pingEvent{make(map[uintptr]PingConsumer)}, + ReceiveRequest: &requestEvent{make(map[uintptr]ConnectionPeeringRequestConsumer)}, ReceiveResponse: &responseEvent{make(map[uintptr]ConnectionPeeringResponseConsumer)}, - Error: &ipErrorEvent{make(map[uintptr]IPErrorConsumer)}, + Error: &ipErrorEvent{make(map[uintptr]IPErrorConsumer)}, } type pluginEvents struct { + ReceivePing *pingEvent ReceiveRequest *requestEvent ReceiveResponse *responseEvent Error *ipErrorEvent } +type pingEvent struct { + callbacks map[uintptr]PingConsumer +} + +func (this *pingEvent) Attach(callback PingConsumer) { + this.callbacks[reflect.ValueOf(callback).Pointer()] = callback +} + +func (this *pingEvent) Detach(callback PingConsumer) { + delete(this.callbacks, reflect.ValueOf(callback).Pointer()) +} + +func (this *pingEvent) Trigger(ping *ping.Ping) { + for _, callback := range this.callbacks { + callback(ping) + } +} + type requestEvent struct { callbacks map[uintptr]ConnectionPeeringRequestConsumer } @@ -72,4 +93,3 @@ func (this *ipErrorEvent) Trigger(ip net.IP, err error) { callback(ip, err) } } - diff --git a/plugins/autopeering/server/udp/server.go b/plugins/autopeering/server/udp/server.go index c4b09ade0fc6d6896cec977084fdf605cfa66e40..46fc6e8f892af7f15ef97e6dfcbfe1c0a4e4d3c8 100644 --- a/plugins/autopeering/server/udp/server.go +++ b/plugins/autopeering/server/udp/server.go @@ -5,6 +5,7 @@ import ( "github.com/iotaledger/goshimmer/packages/network/udp" "github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/plugins/autopeering/parameters" + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/ping" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response" "github.com/pkg/errors" @@ -72,6 +73,14 @@ func processReceivedData(addr *net.UDPAddr, data []byte) { Events.ReceiveResponse.Trigger(peeringResponse) } + case ping.MARSHALLED_PACKET_HEADER: + if ping, err := ping.Unmarshal(data); err != nil { + Events.Error.Trigger(addr.IP, err) + } else { + ping.Issuer.Address = addr.IP + + Events.ReceivePing.Trigger(ping) + } default: Events.Error.Trigger(addr.IP, errors.New("invalid UDP peering packet from " + addr.IP.String())) } diff --git a/plugins/autopeering/server/udp/types.go b/plugins/autopeering/server/udp/types.go index 48e103e92311f81c4e7c4e98182e31fd680534bf..c3d055d0ae28c19229d038541f8768e3d241ae15 100644 --- a/plugins/autopeering/server/udp/types.go +++ b/plugins/autopeering/server/udp/types.go @@ -1,11 +1,14 @@ package udp import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/ping" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response" "net" ) +type PingConsumer = func(p *ping.Ping) + type ConnectionPeeringRequestConsumer = func(request *request.Request) type ConnectionPeeringResponseConsumer = func(peeringResponse *response.Response) diff --git a/plugins/gossip/neighbormanager/accepted_neighbors.go b/plugins/gossip/neighbormanager/accepted_neighbors.go new file mode 100644 index 0000000000000000000000000000000000000000..10f4ca957be93c07c9f5fc8f7fd855dc83c5db15 --- /dev/null +++ b/plugins/gossip/neighbormanager/accepted_neighbors.go @@ -0,0 +1,7 @@ +package neighbormanager + +import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/peermanager/types" +) + +var ACCEPTED_NEIGHBORS = make(types.PeerRegister) diff --git a/plugins/gossip/neighbormanager/chosen_neighbors.go b/plugins/gossip/neighbormanager/chosen_neighbors.go new file mode 100644 index 0000000000000000000000000000000000000000..5e68468faa2adae1d056310b3b6b5d37c3025151 --- /dev/null +++ b/plugins/gossip/neighbormanager/chosen_neighbors.go @@ -0,0 +1,7 @@ +package neighbormanager + +import ( + "github.com/iotaledger/goshimmer/plugins/autopeering/peermanager/types" +) + +var CHOSEN_NEIGHBORS = make(types.PeerRegister) diff --git a/plugins/gossip/neighbormanager/neighbormanager.go b/plugins/gossip/neighbormanager/neighbormanager.go new file mode 100644 index 0000000000000000000000000000000000000000..a664339eca9f5f45e28fbe0aeea3c37968047ea0 --- /dev/null +++ b/plugins/gossip/neighbormanager/neighbormanager.go @@ -0,0 +1,3 @@ +package neighbormanager + + diff --git a/plugins/statusscreen/ui_header_bar.go b/plugins/statusscreen/ui_header_bar.go index eb9bad6a353ffad43b6932e5041f216f5e78b47f..118ad837cb1c3a26b247bc777cc157420a012704 100644 --- a/plugins/statusscreen/ui_header_bar.go +++ b/plugins/statusscreen/ui_header_bar.go @@ -5,6 +5,7 @@ import ( "github.com/gdamore/tcell" "github.com/iotaledger/goshimmer/packages/accountability" "github.com/iotaledger/goshimmer/plugins/autopeering/peermanager" + "github.com/iotaledger/goshimmer/plugins/gossip/neighbormanager" "github.com/rivo/tview" "math" "strconv" @@ -62,10 +63,11 @@ func (headerBar *UIHeaderBar) Update() { fmt.Fprintln(headerBar.InfoContainer) fmt.Fprintln(headerBar.InfoContainer) fmt.Fprintln(headerBar.InfoContainer) - fmt.Fprintln(headerBar.InfoContainer) fmt.Fprintf(headerBar.InfoContainer, "[::b]Node ID: [::d]%40v ", accountability.OWN_ID.StringIdentifier) fmt.Fprintln(headerBar.InfoContainer) - fmt.Fprintf(headerBar.InfoContainer, "[::b]Known Peers: [::d]%40v ", strconv.Itoa(len(peermanager.KNOWN_PEERS.Peers))) + fmt.Fprintf(headerBar.InfoContainer, "[::b]Neighbors: [::d]%40v ", strconv.Itoa(len(neighbormanager.CHOSEN_NEIGHBORS)) + " chosen / " + strconv.Itoa(len(neighbormanager.ACCEPTED_NEIGHBORS)) + " accepted") + fmt.Fprintln(headerBar.InfoContainer) + fmt.Fprintf(headerBar.InfoContainer, "[::b]Known Peers: [::d]%40v ", strconv.Itoa(len(peermanager.KNOWN_PEERS)) + " total / " + strconv.Itoa(len(peermanager.NEIGHBORHOOD)) + " neighborhood") fmt.Fprintln(headerBar.InfoContainer) fmt.Fprintf(headerBar.InfoContainer, "[::b]Uptime: [::d]");