Skip to content
Snippets Groups Projects
Select Git revision
  • ba49caf7663c65a2d25ae866ea2b9acd0a5673f3
  • without_tipselection default
  • develop protected
  • fix/grafana-local-dashboard
  • wasp
  • fix/dashboard-explorer-freeze
  • master
  • feat/timerqueue
  • test/sync_debug_and_650
  • feat/sync_revamp_inv
  • wip/sync
  • tool/db-recovery
  • portcheck/fix
  • fix/synchronization
  • feat/new-dashboard-analysis
  • feat/refactored-analysis-dashboard
  • feat/new-analysis-dashboard
  • test/demo-prometheus-fpc
  • prometheus_metrics
  • wip/analysis-server
  • merge/fpc-test-value-transfer
  • v0.2.2
  • v0.2.1
  • v0.2.0
  • v0.1.3
  • v0.1.2
  • v0.1.1
  • v0.1.0
28 results

message.go

Blame
  • neighbors.go 8.20 KiB
    package gossip
    
    import (
    	"math"
    	"net"
    	"sync"
    	"time"
    
    	"github.com/iotaledger/autopeering-sim/peer"
    	"github.com/iotaledger/goshimmer/packages/errors"
    	"github.com/iotaledger/goshimmer/packages/identity"
    	"github.com/iotaledger/goshimmer/packages/network"
    	"github.com/iotaledger/goshimmer/plugins/autopeering/local"
    	"github.com/iotaledger/hive.go/daemon"
    	"github.com/iotaledger/hive.go/events"
    	"github.com/iotaledger/hive.go/node"
    )
    
    func configureNeighbors(plugin *node.Plugin) {
    	Events.AddNeighbor.Attach(events.NewClosure(func(neighbor *Neighbor) {
    		log.Info("new neighbor added " + neighbor.GetIdentity().StringIdentifier + "@" + neighbor.GetAddress().String() + ":" + neighbor.GetPort())
    		//plugin.LogSuccess("new neighbor added " + hex.EncodeToString(neighbor.Peer.ID().Bytes()) + "@" + neighbor.GetAddress().String() + ":" + neighbor.GetPort())
    	}))
    
    	Events.UpdateNeighbor.Attach(events.NewClosure(func(neighbor *Neighbor) {
    		log.Info("existing neighbor updated " + neighbor.GetIdentity().StringIdentifier + "@" + neighbor.GetAddress().String() + ":" + neighbor.GetPort())
    	}))
    
    	Events.RemoveNeighbor.Attach(events.NewClosure(func(neighbor *Neighbor) {
    		log.Info("existing neighbor removed " + neighbor.GetIdentity().StringIdentifier + "@" + neighbor.GetAddress().String() + ":" + neighbor.GetPort())
    	}))
    }
    
    func runNeighbors(plugin *node.Plugin) {
    	log.Info("Starting Neighbor Connection Manager ...")
    
    	neighborLock.RLock()
    	for _, neighbor := range neighbors.GetMap() {
    		manageConnection(plugin, neighbor)
    	}
    	neighborLock.RUnlock()
    
    	Events.AddNeighbor.Attach(events.NewClosure(func(neighbor *Neighbor) {
    		manageConnection(plugin, neighbor)
    	}))
    
    	log.Info("Starting Neighbor Connection Manager ... done")
    }
    
    func manageConnection(plugin *node.Plugin, neighbor *Neighbor) {
    	daemon.BackgroundWorker("Connection Manager ("+neighbor.GetIdentity().StringIdentifier+")", func() {
    		failedConnectionAttempts := 0
    
    		for _, exists := neighbors.Load(neighbor.GetIdentity().StringIdentifier); exists && failedConnectionAttempts < CONNECTION_MAX_ATTEMPTS; {
    			protocol, dialed, err := neighbor.Connect()
    			if err != nil {
    				failedConnectionAttempts++
    
    				log.Errorf("connection attempt [%d / %d] %s", failedConnectionAttempts, CONNECTION_MAX_ATTEMPTS, err.Error())
    
    				if failedConnectionAttempts <= CONNECTION_MAX_ATTEMPTS {
    					select {
    					case <-daemon.ShutdownSignal:
    						return
    
    					case <-time.After(time.Duration(int(math.Pow(2, float64(failedConnectionAttempts-1)))) * CONNECTION_BASE_TIMEOUT):
    						continue
    					}
    				}
    			}
    
    			failedConnectionAttempts = 0
    
    			disconnectSignal := make(chan int, 1)
    			protocol.Conn.Events.Close.Attach(events.NewClosure(func() {
    				close(disconnectSignal)
    			}))
    
    			if dialed {
    				go protocol.Init()
    			}
    
    			// wait for shutdown or
    			select {
    			case <-daemon.ShutdownSignal:
    				return
    
    			case <-disconnectSignal:
    				continue
    			}
    		}
    
    		RemoveNeighbor(neighbor.GetIdentity().StringIdentifier)
    	})
    }
    
    type Neighbor struct {
    	identity               *identity.Identity
    	identityMutex          sync.RWMutex
    	address                net.IP
    	addressMutex           sync.RWMutex
    	port                   string
    	portMutex              sync.RWMutex
    	initiatedProtocol      *protocol
    	initiatedProtocolMutex sync.RWMutex
    	acceptedProtocol       *protocol
    	Events                 neighborEvents
    	acceptedProtocolMutex  sync.RWMutex
    	Peer                   *peer.Peer
    }
    
    func NewNeighbor(peer *peer.Peer, address, port string) *Neighbor {
    	return &Neighbor{
    		identity: identity.NewPublicIdentity(peer.ToProto().GetPublicKey()),
    		address:  net.ParseIP(address),
    		port:     port,
    		Peer:     peer,
    		Events: neighborEvents{
    			ProtocolConnectionEstablished: events.NewEvent(protocolCaller),
    		},
    	}
    }
    
    func (neighbor *Neighbor) GetIdentity() (result *identity.Identity) {
    	neighbor.identityMutex.RLock()
    	result = neighbor.identity
    	neighbor.identityMutex.RUnlock()
    
    	return result
    }
    
    func (neighbor *Neighbor) SetIdentity(identity *identity.Identity) {
    	neighbor.identityMutex.Lock()
    	neighbor.identity = identity
    	neighbor.identityMutex.Unlock()
    }
    
    func (neighbor *Neighbor) GetAddress() (result net.IP) {
    	neighbor.addressMutex.RLock()
    	result = neighbor.address
    	neighbor.addressMutex.RUnlock()
    
    	return result
    }
    
    func (neighbor *Neighbor) SetAddress(address net.IP) {
    	neighbor.addressMutex.Lock()
    	neighbor.address = address
    	neighbor.addressMutex.Unlock()
    }
    
    func (neighbor *Neighbor) GetPort() (result string) {
    	neighbor.portMutex.RLock()
    	result = neighbor.port
    	neighbor.portMutex.RUnlock()
    
    	return result
    }
    
    func (neighbor *Neighbor) SetPort(port string) {
    	neighbor.portMutex.Lock()
    	neighbor.port = port
    	neighbor.portMutex.Unlock()
    }
    
    func (neighbor *Neighbor) GetInitiatedProtocol() (result *protocol) {
    	neighbor.initiatedProtocolMutex.RLock()
    	result = neighbor.initiatedProtocol
    	neighbor.initiatedProtocolMutex.RUnlock()
    
    	return result
    }
    
    func (neighbor *Neighbor) SetInitiatedProtocol(p *protocol) {
    	neighbor.initiatedProtocolMutex.Lock()
    	neighbor.initiatedProtocol = p
    	neighbor.initiatedProtocolMutex.Unlock()
    }
    
    func (neighbor *Neighbor) GetAcceptedProtocol() (result *protocol) {
    	neighbor.acceptedProtocolMutex.RLock()
    	result = neighbor.acceptedProtocol
    	neighbor.acceptedProtocolMutex.RUnlock()
    
    	return result
    }
    
    func (neighbor *Neighbor) SetAcceptedProtocol(p *protocol) {
    	neighbor.acceptedProtocolMutex.Lock()
    	neighbor.acceptedProtocol = p
    	neighbor.acceptedProtocolMutex.Unlock()
    }
    
    func UnmarshalPeer(data []byte) (*Neighbor, error) {
    	return &Neighbor{}, nil
    }
    
    func (neighbor *Neighbor) Connect() (*protocol, bool, errors.IdentifiableError) {
    	// return existing connections first
    	if neighbor.GetInitiatedProtocol() != nil {
    		return neighbor.GetInitiatedProtocol(), false, nil
    	}
    
    	// if we already have an accepted connection -> use it instead
    	if neighbor.GetAcceptedProtocol() != nil {
    		return neighbor.GetAcceptedProtocol(), false, nil
    	}
    
    	// otherwise try to dial
    	conn, err := net.Dial("tcp", neighbor.GetAddress().String()+":"+neighbor.GetPort())
    	if err != nil {
    		return nil, false, ErrConnectionFailed.Derive(err, "error when connecting to neighbor "+
    			neighbor.GetIdentity().StringIdentifier+"@"+neighbor.GetAddress().String()+":"+neighbor.GetPort())
    	}
    
    	neighbor.SetInitiatedProtocol(newProtocol(network.NewManagedConnection(conn)))
    
    	neighbor.GetInitiatedProtocol().Conn.Events.Close.Attach(events.NewClosure(func() {
    		neighbor.SetInitiatedProtocol(nil)
    	}))
    
    	// drop the "secondary" connection upon successful handshake
    	neighbor.GetInitiatedProtocol().Events.HandshakeCompleted.Attach(events.NewClosure(func() {
    		if local.INSTANCE.ID().String() <= neighbor.Peer.ID().String() {
    			var acceptedProtocolConn *network.ManagedConnection
    			if neighbor.GetAcceptedProtocol() != nil {
    				acceptedProtocolConn = neighbor.GetAcceptedProtocol().Conn
    			}
    
    			if acceptedProtocolConn != nil {
    				_ = acceptedProtocolConn.Close()
    			}
    		}
    
    		neighbor.Events.ProtocolConnectionEstablished.Trigger(neighbor.GetInitiatedProtocol())
    	}))
    
    	return neighbor.GetInitiatedProtocol(), true, nil
    }
    
    func (neighbor *Neighbor) Marshal() []byte {
    	return nil
    }
    
    func (neighbor *Neighbor) Equals(other *Neighbor) bool {
    	return neighbor.GetIdentity().StringIdentifier == other.GetIdentity().StringIdentifier &&
    		neighbor.GetPort() == other.GetPort() && neighbor.GetAddress().String() == other.GetAddress().String()
    }
    
    func AddNeighbor(newNeighbor *Neighbor) {
    	if neighbor, exists := neighbors.Load(newNeighbor.GetIdentity().StringIdentifier); !exists {
    		neighbors.Store(newNeighbor.GetIdentity().StringIdentifier, newNeighbor)
    		Events.AddNeighbor.Trigger(newNeighbor)
    	} else {
    		if !neighbor.Equals(newNeighbor) {
    			neighbor.SetIdentity(newNeighbor.GetIdentity())
    			neighbor.SetPort(newNeighbor.GetPort())
    			neighbor.SetAddress(newNeighbor.GetAddress())
    
    			Events.UpdateNeighbor.Trigger(neighbor)
    		}
    	}
    }
    
    func RemoveNeighbor(identifier string) {
    	if neighbor, exists := neighbors.Delete(identifier); exists {
    		Events.RemoveNeighbor.Trigger(neighbor)
    	}
    }
    
    func GetNeighbor(identifier string) (*Neighbor, bool) {
    	return neighbors.Load(identifier)
    }
    
    func GetNeighbors() map[string]*Neighbor {
    	return neighbors.GetMap()
    }
    
    const (
    	CONNECTION_MAX_ATTEMPTS = 5
    	CONNECTION_BASE_TIMEOUT = 10 * time.Second
    )
    
    var neighbors = NewNeighborMap()
    
    var neighborLock sync.RWMutex