Skip to content
Snippets Groups Projects
Commit 92393769 authored by Hans Moog's avatar Hans Moog
Browse files

Refactor: added ping messages for auto peering and refactored code

parent d93335f2
Branches
Tags
No related merge requests found
Showing
with 495 additions and 281 deletions
package timeutil
import (
"github.com/iotaledger/goshimmer/packages/daemon"
"time"
)
func Ticker(handler func(), interval time.Duration) {
ticker := time.NewTicker(interval)
ticker:
for {
select {
case <- daemon.ShutdownSignal:
break ticker
case <- ticker.C:
handler()
}
}
}
...@@ -4,7 +4,7 @@ import "github.com/iotaledger/goshimmer/packages/parameter" ...@@ -4,7 +4,7 @@ import "github.com/iotaledger/goshimmer/packages/parameter"
var ( var (
ADDRESS = parameter.AddString("AUTOPEERING/ADDRESS", "0.0.0.0", "address to bind for incoming peering requests") ADDRESS = parameter.AddString("AUTOPEERING/ADDRESS", "0.0.0.0", "address to bind for incoming peering requests")
ENTRY_NODES = parameter.AddString("AUTOPEERING/ENTRY_NODES", "tcp://82.165.29.179:14626", "list of trusted entry nodes for auto peering") ENTRY_NODES = parameter.AddString("AUTOPEERING/ENTRY_NODES", "tcp://0d828930890386f036eb77982cc067c5429f7b8f@82.165.29.179:14626", "list of trusted entry nodes for auto peering")
TCP_PORT = parameter.AddInt("AUTOPEERING/TCP_PORT", 14626, "tcp port for incoming peering requests") TCP_PORT = parameter.AddInt("AUTOPEERING/TCP_PORT", 14626, "tcp port for incoming peering requests")
UDP_PORT = parameter.AddInt("AUTOPEERING/UDP_PORT", 14626, "udp port for incoming peering requests") UDP_PORT = parameter.AddInt("AUTOPEERING/UDP_PORT", 14626, "udp port for incoming peering requests")
) )
package peermanager
import "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer"
var ACCEPTED_NEIGHBORS = &PeerList{make(map[string]*peer.Peer)}
package peermanager
import (
"github.com/iotaledger/goshimmer/plugins/autopeering/peermanager/types"
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer"
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request"
)
var CHOSEN_NEIGHBOR_CANDIDATES types.PeerList
var CHOSEN_NEIGHBOR_DISTANCE = func(req *request.Request) func(p *peer.Peer) float64 {
return func(p *peer.Peer) float64 {
return 1
}
}
func init() {
updateNeighborCandidates()
Events.UpdateNeighborhood.Attach(updateNeighborCandidates)
}
func updateNeighborCandidates() {
CHOSEN_NEIGHBOR_CANDIDATES = NEIGHBORHOOD.List().Sort(CHOSEN_NEIGHBOR_DISTANCE(request.OUTGOING_REQUEST))
}
package peermanager
import "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer"
var CHOSEN_NEIGHBORS = &PeerList{make(map[string]*peer.Peer)}
package peermanager
import (
"time"
)
const (
FIND_NEIGHBOR_INTERVAL = 5 * time.Second
)
package peermanager package peermanager
import ( import (
"github.com/iotaledger/goshimmer/packages/identity"
"github.com/iotaledger/goshimmer/plugins/autopeering/parameters" "github.com/iotaledger/goshimmer/plugins/autopeering/parameters"
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol" peermanagerTypes "github.com/iotaledger/goshimmer/plugins/autopeering/peermanager/types"
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer"
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol/types"
"net" "net"
"strconv" "strconv"
"strings" "strings"
...@@ -11,8 +13,8 @@ import ( ...@@ -11,8 +13,8 @@ import (
var ENTRY_NODES = parseEntryNodes() var ENTRY_NODES = parseEntryNodes()
func parseEntryNodes() []*peer.Peer { func parseEntryNodes() peermanagerTypes.PeerList {
result := make([]*peer.Peer, 0) result := make(peermanagerTypes.PeerList, 0)
for _, entryNodeDefinition := range strings.Fields(*parameters.ENTRY_NODES.Value) { for _, entryNodeDefinition := range strings.Fields(*parameters.ENTRY_NODES.Value) {
if entryNodeDefinition == "" { if entryNodeDefinition == "" {
...@@ -29,12 +31,20 @@ func parseEntryNodes() []*peer.Peer { ...@@ -29,12 +31,20 @@ func parseEntryNodes() []*peer.Peer {
} }
switch protocolBits[0] { switch protocolBits[0] {
case "tcp": case "tcp":
entryNode.PeeringProtocolType = protocol.PROTOCOL_TYPE_TCP entryNode.PeeringProtocolType = types.PROTOCOL_TYPE_TCP
case "udp": case "udp":
entryNode.PeeringProtocolType = protocol.PROTOCOL_TYPE_UDP entryNode.PeeringProtocolType = types.PROTOCOL_TYPE_UDP
} }
addressBits := strings.Split(protocolBits[1], ":") identityBits := strings.Split(protocolBits[1], "@")
if len(identityBits) != 2 {
panic("error while parsing identity of entry node: " + entryNodeDefinition)
}
entryNode.Identity = &identity.Identity{
StringIdentifier: identityBits[0],
}
addressBits := strings.Split(identityBits[1], ":")
switch len(addressBits) { switch len(addressBits) {
case 2: case 2:
host := addressBits[0] host := addressBits[0]
......
package peermanager
import "reflect"
var Events = moduleEvents{
UpdateNeighborhood: &callbackEvent{make(map[uintptr]Callback)},
}
type moduleEvents struct {
UpdateNeighborhood *callbackEvent
}
type callbackEvent struct {
callbacks map[uintptr]Callback
}
func (this *callbackEvent) Attach(callback Callback) {
this.callbacks[reflect.ValueOf(callback).Pointer()] = callback
}
func (this *callbackEvent) Detach(callback Callback) {
delete(this.callbacks, reflect.ValueOf(callback).Pointer())
}
func (this *callbackEvent) Trigger() {
for _, callback := range this.callbacks {
callback()
}
}
type Callback = func()
\ No newline at end of file
package peermanager package peermanager
import "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" import (
"github.com/iotaledger/goshimmer/plugins/autopeering/peermanager/types"
)
var KNOWN_PEERS = &PeerList{make(map[string]*peer.Peer)} var KNOWN_PEERS = initKnownPeers()
func initKnownPeers() types.PeerRegister {
knownPeers := make(types.PeerRegister)
for _, entryNode := range ENTRY_NODES {
knownPeers.AddOrUpdate(entryNode)
}
return knownPeers
}
\ No newline at end of file
package peermanager
import (
"github.com/iotaledger/goshimmer/packages/timeutil"
"github.com/iotaledger/goshimmer/plugins/autopeering/peermanager/types"
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request"
"time"
)
var NEIGHBORHOOD types.PeerRegister
var NEIGHBORHOOD_LIST types.PeerList
// Selects a fixed neighborhood from all known peers - this allows nodes to "stay in the same circles" that share their
// view on the ledger an is a preparation for economic clustering
var NEIGHBORHOOD_SELECTOR = func(this types.PeerRegister, req *request.Request) types.PeerRegister {
filteredPeers := make(types.PeerRegister)
for id, peer := range this {
filteredPeers[id] = peer
}
return filteredPeers
}
var lastUpdate = time.Now()
func init() {
updateNeighborHood()
go timeutil.Ticker(updateNeighborHood, 1 * time.Second)
}
func updateNeighborHood() {
if float64(len(NEIGHBORHOOD)) * 1.2 <= float64(len(KNOWN_PEERS)) || lastUpdate.Before(time.Now().Add(-300 * time.Second)) {
NEIGHBORHOOD = KNOWN_PEERS.Filter(NEIGHBORHOOD_SELECTOR, request.OUTGOING_REQUEST)
NEIGHBORHOOD_LIST = NEIGHBORHOOD.List()
lastUpdate = time.Now()
Events.UpdateNeighborhood.Trigger()
}
}
\ No newline at end of file
package peermanager
import (
"github.com/iotaledger/goshimmer/packages/daemon"
"github.com/iotaledger/goshimmer/packages/network"
"github.com/iotaledger/goshimmer/packages/node"
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol"
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer"
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request"
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response"
"github.com/iotaledger/goshimmer/plugins/autopeering/server/tcp"
"github.com/iotaledger/goshimmer/plugins/autopeering/server/udp"
"net"
"strconv"
"time"
)
func Configure(plugin *node.Plugin) {
configurePeeringRequest()
// setup processing of peering requests
udp.Events.ReceiveRequest.Attach(func(peeringRequest *request.Request) {
processPeeringRequest(plugin, peeringRequest, nil)
})
tcp.Events.ReceiveRequest.Attach(func(conn *network.ManagedConnection, peeringRequest *request.Request) {
processPeeringRequest(plugin, peeringRequest, conn)
})
// setup processing of peering responses
udp.Events.ReceiveResponse.Attach(func(peeringResponse *response.Response) {
processPeeringResponse(plugin, peeringResponse, nil)
})
tcp.Events.ReceiveResponse.Attach(func(conn *network.ManagedConnection, peeringResponse *response.Response) {
processPeeringResponse(plugin, peeringResponse, conn)
})
udp.Events.Error.Attach(func(ip net.IP, err error) {
plugin.LogDebug("error when communicating with " + ip.String() + ": " + err.Error())
})
tcp.Events.Error.Attach(func(ip net.IP, err error) {
plugin.LogDebug("invalid peering request from " + ip.String() + ": " + err.Error())
})
}
func Run(plugin *node.Plugin) {
// setup background worker that contacts "chosen neighbors"
daemon.BackgroundWorker(func() {
plugin.LogInfo("Starting Peer Manager ...")
plugin.LogSuccess("Starting Peer Manager ... done")
sendPeeringRequests(plugin)
ticker := time.NewTicker(FIND_NEIGHBOR_INTERVAL)
for {
select {
case <-daemon.ShutdownSignal:
return
case <-ticker.C:
sendPeeringRequests(plugin)
}
}
})
}
func Shutdown(plugin *node.Plugin) {
plugin.LogInfo("Stopping Peer Manager ...")
plugin.LogSuccess("Stopping Peer Manager ... done")
}
func processPeeringRequest(plugin *node.Plugin, peeringRequest *request.Request, conn net.Conn) {
if KNOWN_PEERS.Update(peeringRequest.Issuer) {
plugin.LogInfo("new peer detected: " + peeringRequest.Issuer.Address.String() + " / " + peeringRequest.Issuer.Identity.StringIdentifier)
}
if conn == nil {
plugin.LogDebug("received UDP peering request from " + peeringRequest.Issuer.Identity.StringIdentifier)
var protocolString string
switch peeringRequest.Issuer.PeeringProtocolType {
case protocol.PROTOCOL_TYPE_TCP:
protocolString = "tcp"
case protocol.PROTOCOL_TYPE_UDP:
protocolString = "udp"
default:
plugin.LogFailure("unsupported peering protocol in request from " + peeringRequest.Issuer.Address.String())
return
}
var err error
conn, err = net.Dial(protocolString, peeringRequest.Issuer.Address.String() + ":" + strconv.Itoa(int(peeringRequest.Issuer.PeeringPort)))
if err != nil {
plugin.LogDebug("error when connecting to " + peeringRequest.Issuer.Address.String() + " during peering process: " + err.Error())
return
}
} else {
plugin.LogDebug("received TCP peering request from " + peeringRequest.Issuer.Identity.StringIdentifier)
}
sendFittingPeeringResponse(conn)
}
func processPeeringResponse(plugin *node.Plugin, peeringResponse *response.Response, conn *network.ManagedConnection) {
if KNOWN_PEERS.Update(peeringResponse.Issuer) {
plugin.LogInfo("new peer detected: " + peeringResponse.Issuer.Address.String() + " / " + peeringResponse.Issuer.Identity.StringIdentifier)
}
for _, peer := range peeringResponse.Peers {
if KNOWN_PEERS.Update(peer) {
plugin.LogInfo("new peer detected: " + peer.Address.String() + " / " + peer.Identity.StringIdentifier)
}
}
if conn == nil {
plugin.LogDebug("received UDP peering response from " + peeringResponse.Issuer.Identity.StringIdentifier)
} else {
plugin.LogDebug("received TCP peering response from " + peeringResponse.Issuer.Identity.StringIdentifier)
conn.Close()
}
switch peeringResponse.Type {
case response.TYPE_ACCEPT:
CHOSEN_NEIGHBORS.Update(peeringResponse.Issuer)
case response.TYPE_REJECT:
default:
plugin.LogDebug("invalid response type in peering response of " + peeringResponse.Issuer.Address.String() + ":" + strconv.Itoa(int(peeringResponse.Issuer.PeeringPort)))
}
}
func sendFittingPeeringResponse(conn net.Conn) {
var peeringResponse *response.Response
if len(ACCEPTED_NEIGHBORS.Peers) < protocol.NEIGHBOR_COUNT/2 {
peeringResponse = generateAcceptResponse()
} else {
peeringResponse = generateRejectResponse()
}
peeringResponse.Sign()
conn.Write(peeringResponse.Marshal())
conn.Close()
}
func generateAcceptResponse() *response.Response {
peeringResponse := &response.Response{
Type: response.TYPE_ACCEPT,
Issuer: PEERING_REQUEST.Issuer,
Peers: generateProposedNodeCandidates(),
}
return peeringResponse
}
func generateRejectResponse() *response.Response {
peeringResponse := &response.Response{
Type: response.TYPE_REJECT,
Issuer: PEERING_REQUEST.Issuer,
Peers: generateProposedNodeCandidates(),
}
return peeringResponse
}
func generateProposedNodeCandidates() []*peer.Peer {
peers := make([]*peer.Peer, 0)
for _, peer := range KNOWN_PEERS.Peers {
peers = append(peers, peer)
}
return peers
}
//region PEERING REQUEST RELATED METHODS ///////////////////////////////////////////////////////////////////////////////
func sendPeeringRequests(plugin *node.Plugin) {
for _, peer := range getChosenNeighborCandidates() {
sendPeeringRequest(plugin, peer)
}
}
func sendPeeringRequest(plugin *node.Plugin, peer *peer.Peer) {
switch peer.PeeringProtocolType {
case protocol.PROTOCOL_TYPE_TCP:
sendTCPPeeringRequest(plugin, peer)
case protocol.PROTOCOL_TYPE_UDP:
sendUDPPeeringRequest(plugin, peer)
default:
panic("invalid protocol in known peers")
}
}
func sendTCPPeeringRequest(plugin *node.Plugin, peer *peer.Peer) {
go func() {
tcpConnection, err := net.Dial("tcp", peer.Address.String() + ":" + strconv.Itoa(int(peer.PeeringPort)))
if err != nil {
plugin.LogDebug("error while trying to send TCP peering request to " + peer.String() + ": " + err.Error())
} else {
mConn := network.NewManagedConnection(tcpConnection)
plugin.LogDebug("sending TCP peering request to " + peer.String())
if _, err := mConn.Write(PEERING_REQUEST.Marshal()); err != nil {
plugin.LogDebug("error while trying to send TCP peering request to " + peer.String() + ": " + err.Error())
return
}
tcp.HandleConnection(mConn)
}
}()
}
func sendUDPPeeringRequest(plugin *node.Plugin, peer *peer.Peer) {
go func() {
udpConnection, err := net.Dial("udp", peer.Address.String()+":"+strconv.Itoa(int(peer.PeeringPort)))
if err != nil {
plugin.LogDebug("error while trying to send peering request to " + peer.Address.String() + ":" + strconv.Itoa(int(peer.PeeringPort)) + " / " + peer.Identity.StringIdentifier + ": " + err.Error())
} else {
mConn := network.NewManagedConnection(udpConnection)
if _, err := mConn.Write(PEERING_REQUEST.Marshal()); err != nil {
plugin.LogDebug("error while trying to send peering request to " + peer.Address.String() + ":" + strconv.Itoa(int(peer.PeeringPort)) + " / " + peer.Identity.StringIdentifier + ": " + err.Error())
return
}
// setup listener for incoming responses
}
}()
}
func getChosenNeighborCandidates() []*peer.Peer {
result := make([]*peer.Peer, 0)
for _, peer := range KNOWN_PEERS.Peers {
result = append(result, peer)
}
for _, peer := range ENTRY_NODES {
result = append(result, peer)
}
return result
}
//endregion ////////////////////////////////////////////////////////////////////////////////////////////////////////////
\ No newline at end of file
package types
import (
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer"
"sort"
)
type PeerList []*peer.Peer
func (this PeerList) Filter(predicate func(p *peer.Peer) bool) PeerList {
peerList := make(PeerList, len(this))
counter := 0
for _, peer := range this {
if predicate(peer) {
peerList[counter] = peer
counter++
}
}
return peerList[:counter]
}
// Sorts the PeerRegister by their distance to an anchor.
func (this PeerList) Sort(distance func(p *peer.Peer) float64) PeerList {
sort.Slice(this, func(i, j int) bool {
return distance(this[i]) < distance(this[j])
})
return this
}
package peermanager package types
import ( import (
"bytes" "bytes"
"github.com/iotaledger/goshimmer/packages/accountability" "github.com/iotaledger/goshimmer/packages/accountability"
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer" "github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer"
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request"
) )
type PeerList struct { type PeerRegister map[string]*peer.Peer
Peers map[string]*peer.Peer
}
func (this *PeerList) Update(peer *peer.Peer) bool { // returns true if a new entry was added
func (this PeerRegister) AddOrUpdate(peer *peer.Peer) bool {
if peer.Identity == nil || bytes.Equal(peer.Identity.Identifier, accountability.OWN_ID.Identifier) { if peer.Identity == nil || bytes.Equal(peer.Identity.Identifier, accountability.OWN_ID.Identifier) {
return false return false
} }
if existingPeer, exists := this.Peers[peer.Identity.StringIdentifier]; exists { if existingPeer, exists := this[peer.Identity.StringIdentifier]; exists {
existingPeer.Address = peer.Address existingPeer.Address = peer.Address
existingPeer.GossipPort = peer.GossipPort existingPeer.GossipPort = peer.GossipPort
existingPeer.PeeringPort = peer.PeeringPort existingPeer.PeeringPort = peer.PeeringPort
...@@ -24,7 +24,7 @@ func (this *PeerList) Update(peer *peer.Peer) bool { ...@@ -24,7 +24,7 @@ func (this *PeerList) Update(peer *peer.Peer) bool {
return false return false
} else { } else {
this.Peers[peer.Identity.StringIdentifier] = peer this[peer.Identity.StringIdentifier] = peer
// trigger add peer // trigger add peer
...@@ -32,14 +32,26 @@ func (this *PeerList) Update(peer *peer.Peer) bool { ...@@ -32,14 +32,26 @@ func (this *PeerList) Update(peer *peer.Peer) bool {
} }
} }
func (this *PeerList) Add(peer *peer.Peer) { func (this PeerRegister) Contains(key string) bool {
this.Peers[peer.Identity.StringIdentifier] = peer if _, exists := this[key]; exists {
}
func (this *PeerList) Contains(key string) bool {
if _, exists := this.Peers[key]; exists {
return true return true
} else { } else {
return false return false
} }
} }
func (this PeerRegister) Filter(filterFn func(this PeerRegister, req *request.Request) PeerRegister, req *request.Request) PeerRegister {
return filterFn(this, req)
}
func (this PeerRegister) List() PeerList {
peerList := make(PeerList, len(this))
counter := 0
for _, currentPeer := range this {
peerList[counter] = currentPeer
counter++
}
return peerList
}
...@@ -4,22 +4,44 @@ import ( ...@@ -4,22 +4,44 @@ import (
"github.com/iotaledger/goshimmer/packages/daemon" "github.com/iotaledger/goshimmer/packages/daemon"
"github.com/iotaledger/goshimmer/packages/node" "github.com/iotaledger/goshimmer/packages/node"
"github.com/iotaledger/goshimmer/plugins/autopeering/peermanager" "github.com/iotaledger/goshimmer/plugins/autopeering/peermanager"
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol"
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer"
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request"
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response"
"github.com/iotaledger/goshimmer/plugins/autopeering/server" "github.com/iotaledger/goshimmer/plugins/autopeering/server"
"github.com/iotaledger/goshimmer/plugins/gossip/neighbormanager"
) )
func configure(plugin *node.Plugin) { func configure(plugin *node.Plugin) {
server.Configure(plugin) server.Configure(plugin)
peermanager.Configure(plugin) protocol.Configure(plugin)
daemon.Events.Shutdown.Attach(func() { daemon.Events.Shutdown.Attach(func() {
server.Shutdown(plugin) server.Shutdown(plugin)
peermanager.Shutdown(plugin) })
protocol.Events.IncomingRequestAccepted.Attach(func(req *request.Request) {
if neighbormanager.ACCEPTED_NEIGHBORS.AddOrUpdate(req.Issuer) {
plugin.LogSuccess("new neighbor accepted: " + req.Issuer.Address.String() + " / " + req.Issuer.Identity.StringIdentifier)
}
})
protocol.Events.OutgoingRequestAccepted.Attach(func(res *response.Response) {
if neighbormanager.CHOSEN_NEIGHBORS.AddOrUpdate(res.Issuer) {
plugin.LogSuccess("new neighbor chosen: " + res.Issuer.Address.String() + " / " + res.Issuer.Identity.StringIdentifier)
}
})
protocol.Events.DiscoverPeer.Attach(func(p *peer.Peer) {
if peermanager.KNOWN_PEERS.AddOrUpdate(p) {
plugin.LogInfo("new peer detected: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
}
}) })
} }
func run(plugin *node.Plugin) { func run(plugin *node.Plugin) {
server.Run(plugin) server.Run(plugin)
peermanager.Run(plugin) protocol.Run(plugin)
} }
var PLUGIN = node.NewPlugin("Auto Peering", configure, run) var PLUGIN = node.NewPlugin("Auto Peering", configure, run)
package protocol
import (
"github.com/iotaledger/goshimmer/packages/accountability"
"github.com/iotaledger/goshimmer/packages/daemon"
"github.com/iotaledger/goshimmer/packages/node"
"github.com/iotaledger/goshimmer/plugins/autopeering/peermanager"
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol/constants"
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer"
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request"
"github.com/iotaledger/goshimmer/plugins/autopeering/server/tcp"
"github.com/iotaledger/goshimmer/plugins/gossip/neighbormanager"
"time"
)
func createChosenNeighborProcessor(plugin *node.Plugin) func() {
return func() {
plugin.LogInfo("Starting Chosen Neighbor Processor ...")
plugin.LogSuccess("Starting Chosen Neighbor Processor ... done")
chooseNeighbors(plugin)
ticker := time.NewTicker(constants.FIND_NEIGHBOR_INTERVAL)
ticker:
for {
select {
case <- daemon.ShutdownSignal:
plugin.LogInfo("Stopping Chosen Neighbor Processor ...")
break ticker
case <- ticker.C:
chooseNeighbors(plugin)
}
}
plugin.LogSuccess("Stopping Chosen Neighbor Processor ... done")
}
}
func chooseNeighbors(plugin *node.Plugin) {
for _, chosenNeighborCandidate := range peermanager.CHOSEN_NEIGHBOR_CANDIDATES {
go func(peer *peer.Peer) {
nodeId := peer.Identity.StringIdentifier
if !neighbormanager.ACCEPTED_NEIGHBORS.Contains(nodeId) &&
!neighbormanager.CHOSEN_NEIGHBORS.Contains(nodeId) &&
accountability.OWN_ID.StringIdentifier != nodeId {
if connectionAlive, err := request.Send(peer); err != nil {
plugin.LogDebug(err.Error())
} else if connectionAlive {
plugin.LogDebug("sent TCP peering request to " + peer.String())
tcp.HandleConnection(peer.Conn)
} else {
plugin.LogDebug("sent UDP peering request to " + peer.String())
}
}
}(chosenNeighborCandidate)
}
}
package constants
import "time"
const (
NEIGHBOR_COUNT = 8
FIND_NEIGHBOR_INTERVAL = 5 * time.Second
PING_RANDOM_PEERS_INTERVAL = 1 * time.Second
PING_RANDOM_PEERS_COUNT = 3
)
package protocol
import (
"github.com/iotaledger/goshimmer/packages/node"
"net"
)
func createErrorHandler(plugin *node.Plugin) func(ip net.IP, err error) {
return func(ip net.IP, err error) {
plugin.LogDebug("error when communicating with " + ip.String() + ": " + err.Error())
}
}
package protocol
import (
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer"
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request"
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol/response"
"reflect"
)
var Events = protocolEvents{
DiscoverPeer: &peerEvent{make(map[uintptr]PeerConsumer)},
IncomingRequestAccepted: &requestEvent{make(map[uintptr]RequestConsumer)},
IncomingRequestRejected: &requestEvent{make(map[uintptr]RequestConsumer)},
OutgoingRequestAccepted: &responseEvent{make(map[uintptr]ResponseConsumer)},
OutgoingRequestRejected: &responseEvent{make(map[uintptr]ResponseConsumer)},
}
type protocolEvents struct {
DiscoverPeer *peerEvent
IncomingRequestAccepted *requestEvent
IncomingRequestRejected *requestEvent
OutgoingRequestAccepted *responseEvent
OutgoingRequestRejected *responseEvent
}
type peerEvent struct {
callbacks map[uintptr]PeerConsumer
}
func (this *peerEvent) Attach(callback PeerConsumer) {
this.callbacks[reflect.ValueOf(callback).Pointer()] = callback
}
func (this *peerEvent) Detach(callback PeerConsumer) {
delete(this.callbacks, reflect.ValueOf(callback).Pointer())
}
func (this *peerEvent) Trigger(p *peer.Peer) {
for _, callback := range this.callbacks {
callback(p)
}
}
type requestEvent struct {
callbacks map[uintptr]RequestConsumer
}
func (this *requestEvent) Attach(callback RequestConsumer) {
this.callbacks[reflect.ValueOf(callback).Pointer()] = callback
}
func (this *requestEvent) Detach(callback RequestConsumer) {
delete(this.callbacks, reflect.ValueOf(callback).Pointer())
}
func (this *requestEvent) Trigger(req *request.Request) {
for _, callback := range this.callbacks {
callback(req)
}
}
type responseEvent struct {
callbacks map[uintptr]ResponseConsumer
}
func (this *responseEvent) Attach(callback ResponseConsumer) {
this.callbacks[reflect.ValueOf(callback).Pointer()] = callback
}
func (this *responseEvent) Detach(callback ResponseConsumer) {
delete(this.callbacks, reflect.ValueOf(callback).Pointer())
}
func (this *responseEvent) Trigger(res *response.Response) {
for _, callback := range this.callbacks {
callback(res)
}
}
type PeerConsumer = func(p *peer.Peer)
type RequestConsumer = func(req *request.Request)
type ResponseConsumer = func(res *response.Response)
package protocol
import (
"github.com/iotaledger/goshimmer/packages/node"
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol/ping"
)
func createIncomingPingProcessor(plugin *node.Plugin) func(p *ping.Ping) {
return func(p *ping.Ping) {
if p.Issuer.Conn != nil {
plugin.LogDebug("received TCP ping from " + p.Issuer.String())
} else {
plugin.LogDebug("received UDP ping from " + p.Issuer.String())
}
Events.DiscoverPeer.Trigger(p.Issuer)
}
}
package protocol
import (
"github.com/iotaledger/goshimmer/packages/node"
"github.com/iotaledger/goshimmer/plugins/autopeering/peermanager"
"github.com/iotaledger/goshimmer/plugins/autopeering/peermanager/types"
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol/constants"
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol/peer"
"github.com/iotaledger/goshimmer/plugins/autopeering/protocol/request"
"github.com/iotaledger/goshimmer/plugins/gossip/neighbormanager"
)
func createIncomingRequestProcessor(plugin *node.Plugin) func(req *request.Request) {
return func(req *request.Request) {
Events.DiscoverPeer.Trigger(req.Issuer)
if req.Issuer.Conn != nil {
plugin.LogDebug("received TCP peering request from " + req.Issuer.String())
} else {
plugin.LogDebug("received UDP peering request from " + req.Issuer.String())
}
if len(neighbormanager.ACCEPTED_NEIGHBORS) <= constants.NEIGHBOR_COUNT / 2 {
if err := req.Accept(proposedPeeringCandidates(req)); err != nil {
plugin.LogDebug("error when sending response to" + req.Issuer.String())
}
plugin.LogDebug("sent positive peering response to " + req.Issuer.String())
Events.IncomingRequestAccepted.Trigger(req)
} else {
if err := req.Reject(proposedPeeringCandidates(req)); err != nil {
plugin.LogDebug("error when sending response to" + req.Issuer.String())
}
plugin.LogDebug("sent negative peering response to " + req.Issuer.String())
Events.IncomingRequestRejected.Trigger(req)
}
}
}
func proposedPeeringCandidates(req *request.Request) types.PeerList {
return peermanager.NEIGHBORHOOD.List().Filter(func(p *peer.Peer) bool {
return p.Identity.PublicKey != nil
}).Sort(peermanager.CHOSEN_NEIGHBOR_DISTANCE(req))
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment