Skip to content
Snippets Groups Projects
Unverified Commit b3f3ae19 authored by Wolfgang Welz's avatar Wolfgang Welz Committed by GitHub
Browse files

Merge pull request #75 from iotaledger/fix/autopeering-mutex

Fix/autopeering mutex
parents 80563867 f6f5e41a
Branches
Tags
No related merge requests found
Showing
with 149 additions and 107 deletions
......@@ -3,20 +3,30 @@ package tcp
import (
"net"
"strconv"
"sync"
"github.com/iotaledger/goshimmer/packages/events"
"github.com/iotaledger/goshimmer/packages/network"
)
type Server struct {
Socket net.Listener
socket net.Listener
socketMutex sync.RWMutex
Events serverEvents
}
func (this *Server) GetSocket() net.Listener {
this.socketMutex.RLock()
defer this.socketMutex.RUnlock()
return this.socket
}
func (this *Server) Shutdown() {
if this.Socket != nil {
socket := this.Socket
this.Socket = nil
this.socketMutex.Lock()
defer this.socketMutex.Unlock()
if this.socket != nil {
socket := this.socket
this.socket = nil
socket.Close()
}
......@@ -29,15 +39,17 @@ func (this *Server) Listen(port int) *Server {
return this
} else {
this.Socket = socket
this.socketMutex.Lock()
this.socket = socket
this.socketMutex.Unlock()
}
this.Events.Start.Trigger()
defer this.Events.Shutdown.Trigger()
for this.Socket != nil {
if socket, err := this.Socket.Accept(); err != nil {
if this.Socket != nil {
for this.GetSocket() != nil {
if socket, err := this.GetSocket().Accept(); err != nil {
if this.GetSocket() != nil {
this.Events.Error.Trigger(err)
}
} else {
......
......@@ -3,20 +3,30 @@ package udp
import (
"net"
"strconv"
"sync"
"github.com/iotaledger/goshimmer/packages/events"
)
type Server struct {
Socket net.PacketConn
socket net.PacketConn
socketMutex sync.RWMutex
ReceiveBufferSize int
Events serverEvents
}
func (this *Server) GetSocket() net.PacketConn {
this.socketMutex.RLock()
defer this.socketMutex.RUnlock()
return this.socket
}
func (this *Server) Shutdown() {
if this.Socket != nil {
socket := this.Socket
this.Socket = nil
this.socketMutex.Lock()
defer this.socketMutex.Unlock()
if this.socket != nil {
socket := this.socket
this.socket = nil
socket.Close()
}
......@@ -28,16 +38,18 @@ func (this *Server) Listen(address string, port int) {
return
} else {
this.Socket = socket
this.socketMutex.Lock()
this.socket = socket
this.socketMutex.Unlock()
}
this.Events.Start.Trigger()
defer this.Events.Shutdown.Trigger()
buf := make([]byte, this.ReceiveBufferSize)
for this.Socket != nil {
if bytesRead, addr, err := this.Socket.ReadFrom(buf); err != nil {
if this.Socket != nil {
for this.GetSocket() != nil {
if bytesRead, addr, err := this.GetSocket().ReadFrom(buf); err != nil {
if this.GetSocket() != nil {
this.Events.Error.Trigger(err)
}
} else {
......
package node
import "fmt"
import (
"fmt"
"sync"
)
type Logger struct {
Enabled bool
enabled bool
enabledMutex sync.RWMutex
LogInfo func(pluginName string, message string)
LogSuccess func(pluginName string, message string)
LogWarning func(pluginName string, message string)
......@@ -11,6 +15,19 @@ type Logger struct {
LogDebug func(pluginName string, message string)
}
func (logger *Logger) SetEnabled(value bool) {
logger.enabledMutex.Lock()
logger.enabled = value
logger.enabledMutex.Unlock()
}
func (logger *Logger) GetEnabled() (result bool) {
logger.enabledMutex.RLock()
result = logger.enabled
logger.enabledMutex.RUnlock()
return
}
func pluginPrefix(pluginName string) string {
var pluginPrefix string
if pluginName == "Node" {
......@@ -23,7 +40,7 @@ func pluginPrefix(pluginName string) string {
}
var DEFAULT_LOGGER = &Logger{
Enabled: true,
enabled: true,
LogSuccess: func(pluginName string, message string) {
fmt.Println("[ OK ] " + pluginPrefix(pluginName) + message)
},
......
......@@ -55,7 +55,7 @@ func (node *Node) AddLogger(logger *Logger) {
func (node *Node) LogSuccess(pluginName string, message string) {
if *LOG_LEVEL.Value >= LOG_LEVEL_SUCCESS {
for _, logger := range node.loggers {
if logger.Enabled {
if logger.GetEnabled() {
logger.LogSuccess(pluginName, message)
}
}
......@@ -65,7 +65,7 @@ func (node *Node) LogSuccess(pluginName string, message string) {
func (node *Node) LogInfo(pluginName string, message string) {
if *LOG_LEVEL.Value >= LOG_LEVEL_INFO {
for _, logger := range node.loggers {
if logger.Enabled {
if logger.GetEnabled() {
logger.LogInfo(pluginName, message)
}
}
......@@ -75,7 +75,7 @@ func (node *Node) LogInfo(pluginName string, message string) {
func (node *Node) LogDebug(pluginName string, message string) {
if *LOG_LEVEL.Value >= LOG_LEVEL_DEBUG {
for _, logger := range node.loggers {
if logger.Enabled {
if logger.GetEnabled() {
logger.LogDebug(pluginName, message)
}
}
......@@ -85,7 +85,7 @@ func (node *Node) LogDebug(pluginName string, message string) {
func (node *Node) LogWarning(pluginName string, message string) {
if *LOG_LEVEL.Value >= LOG_LEVEL_WARNING {
for _, logger := range node.loggers {
if logger.Enabled {
if logger.GetEnabled() {
logger.LogWarning(pluginName, message)
}
}
......@@ -95,7 +95,7 @@ func (node *Node) LogWarning(pluginName string, message string) {
func (node *Node) LogFailure(pluginName string, message string) {
if *LOG_LEVEL.Value >= LOG_LEVEL_FAILURE {
for _, logger := range node.loggers {
if logger.Enabled {
if logger.GetEnabled() {
logger.LogFailure(pluginName, message)
}
}
......
......@@ -72,23 +72,23 @@ func setupHooks(conn *network.ManagedConnection, eventDispatchers *EventDispatch
// define hooks ////////////////////////////////////////////////////////////////////////////////////////////////////
onDiscoverPeer := events.NewClosure(func(p *peer.Peer) {
go eventDispatchers.AddNode(p.Identity.Identifier)
go eventDispatchers.AddNode(p.GetIdentity().Identifier)
})
onAddAcceptedNeighbor := events.NewClosure(func(p *peer.Peer) {
eventDispatchers.ConnectNodes(p.Identity.Identifier, accountability.OwnId().Identifier)
eventDispatchers.ConnectNodes(p.GetIdentity().Identifier, accountability.OwnId().Identifier)
})
onRemoveAcceptedNeighbor := events.NewClosure(func(p *peer.Peer) {
eventDispatchers.DisconnectNodes(p.Identity.Identifier, accountability.OwnId().Identifier)
eventDispatchers.DisconnectNodes(p.GetIdentity().Identifier, accountability.OwnId().Identifier)
})
onAddChosenNeighbor := events.NewClosure(func(p *peer.Peer) {
eventDispatchers.ConnectNodes(accountability.OwnId().Identifier, p.Identity.Identifier)
eventDispatchers.ConnectNodes(accountability.OwnId().Identifier, p.GetIdentity().Identifier)
})
onRemoveChosenNeighbor := events.NewClosure(func(p *peer.Peer) {
eventDispatchers.DisconnectNodes(accountability.OwnId().Identifier, p.Identity.Identifier)
eventDispatchers.DisconnectNodes(accountability.OwnId().Identifier, p.GetIdentity().Identifier)
})
// setup hooks /////////////////////////////////////////////////////////////////////////////////////////////////////
......@@ -115,11 +115,11 @@ func setupHooks(conn *network.ManagedConnection, eventDispatchers *EventDispatch
}
func reportChosenNeighbors(dispatchers *EventDispatchers) {
for _, chosenNeighbor := range chosenneighbors.INSTANCE.Peers {
dispatchers.AddNode(chosenNeighbor.Identity.Identifier)
for _, chosenNeighbor := range chosenneighbors.INSTANCE.Peers.GetMap() {
dispatchers.AddNode(chosenNeighbor.GetIdentity().Identifier)
}
for _, chosenNeighbor := range chosenneighbors.INSTANCE.Peers {
dispatchers.ConnectNodes(accountability.OwnId().Identifier, chosenNeighbor.Identity.Identifier)
for _, chosenNeighbor := range chosenneighbors.INSTANCE.Peers.GetMap() {
dispatchers.ConnectNodes(accountability.OwnId().Identifier, chosenNeighbor.GetIdentity().Identifier)
}
}
......
......@@ -10,11 +10,11 @@ import (
var DISTANCE = func(anchor *peer.Peer) func(p *peer.Peer) uint64 {
return func(p *peer.Peer) uint64 {
saltedIdentifier := make([]byte, len(anchor.Identity.Identifier)+len(saltmanager.PRIVATE_SALT.Bytes))
copy(saltedIdentifier[0:], anchor.Identity.Identifier)
copy(saltedIdentifier[len(anchor.Identity.Identifier):], saltmanager.PRIVATE_SALT.Bytes)
saltedIdentifier := make([]byte, len(anchor.GetIdentity().Identifier)+len(saltmanager.PRIVATE_SALT.GetBytes()))
copy(saltedIdentifier[0:], anchor.GetIdentity().Identifier)
copy(saltedIdentifier[len(anchor.GetIdentity().Identifier):], saltmanager.PRIVATE_SALT.GetBytes())
return hash(saltedIdentifier) ^ hash(p.Identity.Identifier)
return hash(saltedIdentifier) ^ hash(p.GetIdentity().Identifier)
}
}
......
......@@ -25,11 +25,11 @@ func configureFurthestNeighbor() {
FurthestNeighborLock.Lock()
defer FurthestNeighborLock.Unlock()
if p.Identity.StringIdentifier == FURTHEST_NEIGHBOR.Identity.StringIdentifier {
if p.GetIdentity().StringIdentifier == FURTHEST_NEIGHBOR.GetIdentity().StringIdentifier {
FURTHEST_NEIGHBOR_DISTANCE = uint64(0)
FURTHEST_NEIGHBOR = nil
for _, furthestNeighborCandidate := range INSTANCE.Peers {
for _, furthestNeighborCandidate := range INSTANCE.Peers.GetMap() {
updateFurthestNeighbor(furthestNeighborCandidate)
}
}
......
......@@ -6,14 +6,15 @@ import (
"github.com/iotaledger/goshimmer/plugins/autopeering/types/peerlist"
)
var CANDIDATES peerlist.PeerList
var CANDIDATES *peerlist.PeerList
func configureCandidates() {
CANDIDATES = peerlist.NewPeerList()
updateNeighborCandidates()
neighborhood.Events.Update.Attach(updateNeighborCandidates)
}
func updateNeighborCandidates() {
CANDIDATES = neighborhood.LIST_INSTANCE.Sort(DISTANCE(ownpeer.INSTANCE))
CANDIDATES.Update(neighborhood.LIST_INSTANCE.Sort(DISTANCE(ownpeer.INSTANCE)).GetPeers())
}
......@@ -9,11 +9,11 @@ import (
var DISTANCE = func(anchor *peer.Peer) func(p *peer.Peer) uint64 {
return func(p *peer.Peer) uint64 {
saltedIdentifier := make([]byte, len(anchor.Identity.Identifier)+len(anchor.Salt.Bytes))
copy(saltedIdentifier[0:], anchor.Identity.Identifier)
copy(saltedIdentifier[len(anchor.Identity.Identifier):], anchor.Salt.Bytes)
saltedIdentifier := make([]byte, len(anchor.GetIdentity().Identifier)+len(anchor.GetSalt().GetBytes()))
copy(saltedIdentifier[0:], anchor.GetIdentity().Identifier)
copy(saltedIdentifier[len(anchor.GetIdentity().Identifier):], anchor.GetSalt().GetBytes())
return hash(anchor.Identity.Identifier) ^ hash(p.Identity.Identifier)
return hash(anchor.GetIdentity().Identifier) ^ hash(p.GetIdentity().Identifier)
}
}
......
......@@ -33,7 +33,7 @@ func configureFurthestNeighbor() {
FURTHEST_NEIGHBOR_DISTANCE = uint64(0)
FURTHEST_NEIGHBOR = nil
for _, furthestNeighborCandidate := range INSTANCE.Peers {
for _, furthestNeighborCandidate := range INSTANCE.Peers.GetMap() {
distance := OWN_DISTANCE(furthestNeighborCandidate)
if distance > FURTHEST_NEIGHBOR_DISTANCE {
FURTHEST_NEIGHBOR = furthestNeighborCandidate
......
......@@ -13,23 +13,21 @@ import (
"github.com/iotaledger/goshimmer/plugins/autopeering/types/peerlist"
)
var INSTANCE peerlist.PeerList
var INSTANCE *peerlist.PeerList
func Configure(node *node.Plugin) {
INSTANCE = parseEntryNodes()
}
func parseEntryNodes() peerlist.PeerList {
result := make(peerlist.PeerList, 0)
func parseEntryNodes() *peerlist.PeerList {
result := peerlist.NewPeerList()
for _, entryNodeDefinition := range strings.Fields(*parameters.ENTRY_NODES.Value) {
if entryNodeDefinition == "" {
continue
}
entryNode := &peer.Peer{
Identity: nil,
}
entryNode := &peer.Peer{}
identityBits := strings.Split(entryNodeDefinition, "@")
if len(identityBits) != 2 {
......@@ -38,10 +36,10 @@ func parseEntryNodes() peerlist.PeerList {
if decodedIdentifier, err := hex.DecodeString(identityBits[0]); err != nil {
panic("error while parsing identity of entry node: " + entryNodeDefinition)
} else {
entryNode.Identity = &identity.Identity{
entryNode.SetIdentity(&identity.Identity{
Identifier: decodedIdentifier,
StringIdentifier: identityBits[0],
}
})
}
addressBits := strings.Split(identityBits[1], ":")
......@@ -58,8 +56,8 @@ func parseEntryNodes() peerlist.PeerList {
panic("error while parsing ip of entry in list of entry nodes")
}
entryNode.Address = ip
entryNode.PeeringPort = uint16(port)
entryNode.SetAddress(ip)
entryNode.SetPeeringPort(uint16(port))
case 6:
host := strings.Join(addressBits[:5], ":")
port, err := strconv.Atoi(addressBits[5])
......@@ -72,13 +70,13 @@ func parseEntryNodes() peerlist.PeerList {
panic("error while parsing ip of entry in list of entry nodes")
}
entryNode.Address = ip
entryNode.PeeringPort = uint16(port)
entryNode.SetAddress(ip)
entryNode.SetPeeringPort(uint16(port))
default:
panic("invalid entry in list of trusted entry nodes: " + entryNodeDefinition)
}
result = append(result, entryNode)
result.AddPeer(entryNode)
}
return result
......
......@@ -14,7 +14,7 @@ func Configure(plugin *node.Plugin) {
func initKnownPeers() *peerregister.PeerRegister {
knownPeers := peerregister.New()
for _, entryNode := range entrynodes.INSTANCE {
for _, entryNode := range entrynodes.INSTANCE.GetPeers() {
knownPeers.AddOrUpdate(entryNode)
}
......
......@@ -15,14 +15,14 @@ import (
var INSTANCE *peerregister.PeerRegister
var LIST_INSTANCE peerlist.PeerList
var LIST_INSTANCE *peerlist.PeerList
// Selects a fixed neighborhood from all known peers - this allows nodes to "stay in the same circles" that share their
// view on the ledger an is a preparation for economic clustering
var NEIGHBORHOOD_SELECTOR = func(this *peerregister.PeerRegister, req *request.Request) *peerregister.PeerRegister {
filteredPeers := peerregister.New()
for id, peer := range this.Peers {
filteredPeers.Peers[id] = peer
for id, peer := range this.Peers.GetMap() {
filteredPeers.Peers.Store(id, peer)
}
return filteredPeers
......@@ -31,6 +31,7 @@ var NEIGHBORHOOD_SELECTOR = func(this *peerregister.PeerRegister, req *request.R
var lastUpdate = time.Now()
func Configure(plugin *node.Plugin) {
LIST_INSTANCE = peerlist.NewPeerList()
updateNeighborHood()
}
......@@ -41,9 +42,10 @@ func Run(plugin *node.Plugin) {
}
func updateNeighborHood() {
if INSTANCE == nil || float64(len(INSTANCE.Peers))*1.2 <= float64(len(knownpeers.INSTANCE.Peers)) || lastUpdate.Before(time.Now().Add(-300*time.Second)) {
if INSTANCE == nil || float64(INSTANCE.Peers.Len())*1.2 <= float64(knownpeers.INSTANCE.Peers.Len()) || lastUpdate.Before(time.Now().Add(-300*time.Second)) {
INSTANCE = knownpeers.INSTANCE.Filter(NEIGHBORHOOD_SELECTOR, outgoingrequest.INSTANCE)
LIST_INSTANCE = INSTANCE.List()
LIST_INSTANCE.Update(INSTANCE.List())
lastUpdate = time.Now()
......
......@@ -14,11 +14,11 @@ import (
var INSTANCE *peer.Peer
func Configure(plugin *node.Plugin) {
INSTANCE = &peer.Peer{
Identity: accountability.OwnId(),
PeeringPort: uint16(*parameters.PORT.Value),
GossipPort: uint16(*gossip.PORT.Value),
Address: net.IPv4(0, 0, 0, 0),
Salt: saltmanager.PUBLIC_SALT,
}
INSTANCE = &peer.Peer{}
INSTANCE.SetIdentity(accountability.OwnId())
INSTANCE.SetPeeringPort(uint16(*parameters.PORT.Value))
INSTANCE.SetGossipPort(uint16(*gossip.PORT.Value))
INSTANCE.SetAddress(net.IPv4(0, 0, 0, 0))
INSTANCE.SetSalt(saltmanager.PUBLIC_SALT)
}
......@@ -33,14 +33,14 @@ func getDb() database.Database {
}
func storePeer(p *peer.Peer) {
err := getDb().Set(p.Identity.Identifier, p.Marshal())
err := getDb().Set(p.GetIdentity().Identifier, p.Marshal())
if err != nil {
panic(err)
}
}
func removePeer(p *peer.Peer) {
err := getDb().Delete(p.Identity.Identifier)
err := getDb().Delete(p.GetIdentity().Identifier)
if err != nil {
panic(err)
}
......@@ -55,13 +55,13 @@ func loadPeers(plugin *node.Plugin) {
panic(err)
}
// the peers are stored by identifier in the db
if !bytes.Equal(key, peer.Identity.Identifier) {
if !bytes.Equal(key, peer.GetIdentity().Identifier) {
panic("Invalid item in '" + peerDbName + "' database")
}
knownpeers.INSTANCE.AddOrUpdate(peer)
count++
plugin.LogDebug("Added stored peer: " + peer.Address.String() + " / " + peer.Identity.StringIdentifier)
plugin.LogDebug("Added stored peer: " + peer.GetAddress().String() + " / " + peer.GetIdentity().StringIdentifier)
})
if err != nil {
panic(err)
......
......@@ -40,44 +40,44 @@ func run(plugin *node.Plugin) {
func configureLogging(plugin *node.Plugin) {
gossip.Events.RemoveNeighbor.Attach(events.NewClosure(func(peer *gossip.Neighbor) {
chosenneighbors.INSTANCE.Remove(peer.Identity.StringIdentifier)
acceptedneighbors.INSTANCE.Remove(peer.Identity.StringIdentifier)
chosenneighbors.INSTANCE.Remove(peer.GetIdentity().StringIdentifier)
acceptedneighbors.INSTANCE.Remove(peer.GetIdentity().StringIdentifier)
}))
acceptedneighbors.INSTANCE.Events.Add.Attach(events.NewClosure(func(p *peer.Peer) {
plugin.LogDebug("accepted neighbor added: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
plugin.LogDebug("accepted neighbor added: " + p.GetAddress().String() + " / " + p.GetIdentity().StringIdentifier)
gossip.AddNeighbor(gossip.NewNeighbor(p.Identity, p.Address, p.GossipPort))
gossip.AddNeighbor(gossip.NewNeighbor(p.GetIdentity(), p.GetAddress(), p.GetGossipPort()))
}))
acceptedneighbors.INSTANCE.Events.Remove.Attach(events.NewClosure(func(p *peer.Peer) {
plugin.LogDebug("accepted neighbor removed: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
plugin.LogDebug("accepted neighbor removed: " + p.GetAddress().String() + " / " + p.GetIdentity().StringIdentifier)
gossip.RemoveNeighbor(p.Identity.StringIdentifier)
gossip.RemoveNeighbor(p.GetIdentity().StringIdentifier)
}))
chosenneighbors.INSTANCE.Events.Add.Attach(events.NewClosure(func(p *peer.Peer) {
plugin.LogDebug("chosen neighbor added: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
plugin.LogDebug("chosen neighbor added: " + p.GetAddress().String() + " / " + p.GetIdentity().StringIdentifier)
gossip.AddNeighbor(gossip.NewNeighbor(p.Identity, p.Address, p.GossipPort))
gossip.AddNeighbor(gossip.NewNeighbor(p.GetIdentity(), p.GetAddress(), p.GetGossipPort()))
}))
chosenneighbors.INSTANCE.Events.Remove.Attach(events.NewClosure(func(p *peer.Peer) {
plugin.LogDebug("chosen neighbor removed: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
plugin.LogDebug("chosen neighbor removed: " + p.GetAddress().String() + " / " + p.GetIdentity().StringIdentifier)
gossip.RemoveNeighbor(p.Identity.StringIdentifier)
gossip.RemoveNeighbor(p.GetIdentity().StringIdentifier)
}))
knownpeers.INSTANCE.Events.Add.Attach(events.NewClosure(func(p *peer.Peer) {
plugin.LogInfo("new peer discovered: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
plugin.LogInfo("new peer discovered: " + p.GetAddress().String() + " / " + p.GetIdentity().StringIdentifier)
if _, exists := gossip.GetNeighbor(p.Identity.StringIdentifier); exists {
gossip.AddNeighbor(gossip.NewNeighbor(p.Identity, p.Address, p.GossipPort))
if _, exists := gossip.GetNeighbor(p.GetIdentity().StringIdentifier); exists {
gossip.AddNeighbor(gossip.NewNeighbor(p.GetIdentity(), p.GetAddress(), p.GetGossipPort()))
}
}))
knownpeers.INSTANCE.Events.Update.Attach(events.NewClosure(func(p *peer.Peer) {
plugin.LogDebug("peer updated: " + p.Address.String() + " / " + p.Identity.StringIdentifier)
plugin.LogDebug("peer updated: " + p.GetAddress().String() + " / " + p.GetIdentity().StringIdentifier)
if _, exists := gossip.GetNeighbor(p.Identity.StringIdentifier); exists {
gossip.AddNeighbor(gossip.NewNeighbor(p.Identity, p.Address, p.GossipPort))
if _, exists := gossip.GetNeighbor(p.GetIdentity().StringIdentifier); exists {
gossip.AddNeighbor(gossip.NewNeighbor(p.GetIdentity(), p.GetAddress(), p.GetGossipPort()))
}
}))
}
......@@ -15,9 +15,9 @@ import (
func createAcceptedNeighborDropper(plugin *node.Plugin) func() {
return func() {
timeutil.Ticker(func() {
if len(acceptedneighbors.INSTANCE.Peers) > constants.NEIGHBOR_COUNT/2 {
if acceptedneighbors.INSTANCE.Peers.Len() > constants.NEIGHBOR_COUNT/2 {
defer acceptedneighbors.INSTANCE.Lock()()
for len(acceptedneighbors.INSTANCE.Peers) > constants.NEIGHBOR_COUNT/2 {
for acceptedneighbors.INSTANCE.Peers.Len() > constants.NEIGHBOR_COUNT/2 {
acceptedneighbors.FurthestNeighborLock.RLock()
furthestNeighbor := acceptedneighbors.FURTHEST_NEIGHBOR
acceptedneighbors.FurthestNeighborLock.RUnlock()
......@@ -26,7 +26,7 @@ func createAcceptedNeighborDropper(plugin *node.Plugin) func() {
dropMessage := &drop.Drop{Issuer: ownpeer.INSTANCE}
dropMessage.Sign()
acceptedneighbors.INSTANCE.Remove(furthestNeighbor.Identity.StringIdentifier, false)
acceptedneighbors.INSTANCE.Remove(furthestNeighbor.GetIdentity().StringIdentifier)
go func() {
if _, err := furthestNeighbor.Send(dropMessage.Marshal(), types.PROTOCOL_TYPE_UDP, false); err != nil {
plugin.LogDebug("error when sending drop message to" + acceptedneighbors.FURTHEST_NEIGHBOR.String())
......
......@@ -15,9 +15,9 @@ import (
func createChosenNeighborDropper(plugin *node.Plugin) func() {
return func() {
timeutil.Ticker(func() {
if len(chosenneighbors.INSTANCE.Peers) > constants.NEIGHBOR_COUNT/2 {
if chosenneighbors.INSTANCE.Peers.Len() > constants.NEIGHBOR_COUNT/2 {
defer chosenneighbors.INSTANCE.Lock()()
for len(chosenneighbors.INSTANCE.Peers) > constants.NEIGHBOR_COUNT/2 {
for chosenneighbors.INSTANCE.Peers.Len() > constants.NEIGHBOR_COUNT/2 {
chosenneighbors.FurthestNeighborLock.RLock()
furthestNeighbor := chosenneighbors.FURTHEST_NEIGHBOR
chosenneighbors.FurthestNeighborLock.RUnlock()
......@@ -26,7 +26,7 @@ func createChosenNeighborDropper(plugin *node.Plugin) func() {
dropMessage := &drop.Drop{Issuer: ownpeer.INSTANCE}
dropMessage.Sign()
chosenneighbors.INSTANCE.Remove(furthestNeighbor.Identity.StringIdentifier, false)
chosenneighbors.INSTANCE.Remove(furthestNeighbor.GetIdentity().StringIdentifier)
go func() {
if _, err := furthestNeighbor.Send(dropMessage.Marshal(), types.PROTOCOL_TYPE_UDP, false); err != nil {
plugin.LogDebug("error when sending drop message to" + chosenneighbors.FURTHEST_NEIGHBOR.String())
......
......@@ -12,7 +12,7 @@ func createIncomingDropProcessor(plugin *node.Plugin) *events.Closure {
return events.NewClosure(func(drop *drop.Drop) {
plugin.LogDebug("received drop message from " + drop.Issuer.String())
chosenneighbors.INSTANCE.Remove(drop.Issuer.Identity.StringIdentifier)
acceptedneighbors.INSTANCE.Remove(drop.Issuer.Identity.StringIdentifier)
chosenneighbors.INSTANCE.Remove(drop.Issuer.GetIdentity().StringIdentifier)
acceptedneighbors.INSTANCE.Remove(drop.Issuer.GetIdentity().StringIdentifier)
})
}
......@@ -12,7 +12,7 @@ func createIncomingPingProcessor(plugin *node.Plugin) *events.Closure {
plugin.LogDebug("received ping from " + ping.Issuer.String())
knownpeers.INSTANCE.AddOrUpdate(ping.Issuer)
for _, neighbor := range ping.Neighbors {
for _, neighbor := range ping.Neighbors.GetPeers() {
knownpeers.INSTANCE.AddOrUpdate(neighbor)
}
})
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment