neighbors.go 6.94 KiB
package gossip
import (
"math"
"net"
"strconv"
"sync"
"time"
"github.com/iotaledger/goshimmer/packages/accountability"
"github.com/iotaledger/goshimmer/packages/daemon"
"github.com/iotaledger/goshimmer/packages/errors"
"github.com/iotaledger/goshimmer/packages/events"
"github.com/iotaledger/goshimmer/packages/identity"
"github.com/iotaledger/goshimmer/packages/network"
"github.com/iotaledger/goshimmer/packages/node"
)
func configureNeighbors(plugin *node.Plugin) {
Events.AddNeighbor.Attach(events.NewClosure(func(neighbor *Neighbor) {
plugin.LogSuccess("new neighbor added " + neighbor.Identity.StringIdentifier + "@" + neighbor.Address.String() + ":" + strconv.Itoa(int(neighbor.Port)))
}))
Events.UpdateNeighbor.Attach(events.NewClosure(func(neighbor *Neighbor) {
plugin.LogSuccess("existing neighbor updated " + neighbor.Identity.StringIdentifier + "@" + neighbor.Address.String() + ":" + strconv.Itoa(int(neighbor.Port)))
}))
Events.RemoveNeighbor.Attach(events.NewClosure(func(neighbor *Neighbor) {
plugin.LogSuccess("existing neighbor removed " + neighbor.Identity.StringIdentifier + "@" + neighbor.Address.String() + ":" + strconv.Itoa(int(neighbor.Port)))
}))
}
func runNeighbors(plugin *node.Plugin) {
plugin.LogInfo("Starting Neighbor Connection Manager ...")
neighborLock.RLock()
for _, neighbor := range GetNeighbors() {
manageConnection(plugin, neighbor)
}
neighborLock.RUnlock()
Events.AddNeighbor.Attach(events.NewClosure(func(neighbor *Neighbor) {
manageConnection(plugin, neighbor)
}))
plugin.LogSuccess("Starting Neighbor Connection Manager ... done")
}
func manageConnection(plugin *node.Plugin, neighbor *Neighbor) {
daemon.BackgroundWorker(func() {
failedConnectionAttempts := 0
for _, exists := GetNeighbor(neighbor.Identity.StringIdentifier); exists && failedConnectionAttempts < CONNECTION_MAX_ATTEMPTS; {
protocol, dialed, err := neighbor.Connect()
if err != nil {
failedConnectionAttempts++
plugin.LogFailure("connection attempt [" + strconv.Itoa(int(failedConnectionAttempts)) + "/" + strconv.Itoa(CONNECTION_MAX_ATTEMPTS) + "] " + err.Error())
if failedConnectionAttempts <= CONNECTION_MAX_ATTEMPTS {
select {
case <-daemon.ShutdownSignal:
return
case <-time.After(time.Duration(int(math.Pow(2, float64(failedConnectionAttempts-1)))) * CONNECTION_BASE_TIMEOUT):
continue
}
}
}
failedConnectionAttempts = 0
disconnectSignal := make(chan int, 1)
protocol.Conn.Events.Close.Attach(events.NewClosure(func() {
close(disconnectSignal)
}))
if dialed {
go protocol.Init()
}
// wait for shutdown or
select {
case <-daemon.ShutdownSignal:
return
case <-disconnectSignal:
continue
}
}
RemoveNeighbor(neighbor.Identity.StringIdentifier)
})
}
type Neighbor struct {
Identity *identity.Identity
Address net.IP
Port uint16
InitiatedProtocol *protocol
AcceptedProtocol *protocol
Events neighborEvents
initiatedProtocolMutex sync.RWMutex
acceptedProtocolMutex sync.RWMutex
}
func NewNeighbor(identity *identity.Identity, address net.IP, port uint16) *Neighbor {
return &Neighbor{
Identity: identity,
Address: address,
Port: port,
Events: neighborEvents{
ProtocolConnectionEstablished: events.NewEvent(protocolCaller),
},
}
}
func UnmarshalPeer(data []byte) (*Neighbor, error) {
return &Neighbor{}, nil
}
func (neighbor *Neighbor) Connect() (*protocol, bool, errors.IdentifiableError) {
neighbor.initiatedProtocolMutex.Lock()
defer neighbor.initiatedProtocolMutex.Unlock()
// return existing connections first
if neighbor.InitiatedProtocol != nil {
return neighbor.InitiatedProtocol, false, nil
}
// if we already have an accepted connection -> use it instead
if neighbor.AcceptedProtocol != nil {
neighbor.acceptedProtocolMutex.RLock()
if neighbor.AcceptedProtocol != nil {
defer neighbor.acceptedProtocolMutex.RUnlock()
return neighbor.AcceptedProtocol, false, nil
}
neighbor.acceptedProtocolMutex.RUnlock()
}
// otherwise try to dial
conn, err := net.Dial("tcp", neighbor.Address.String()+":"+strconv.Itoa(int(neighbor.Port)))
if err != nil {
return nil, false, ErrConnectionFailed.Derive(err, "error when connecting to neighbor "+
neighbor.Identity.StringIdentifier+"@"+neighbor.Address.String()+":"+strconv.Itoa(int(neighbor.Port)))
}
neighbor.InitiatedProtocol = newProtocol(network.NewManagedConnection(conn))
neighbor.InitiatedProtocol.Conn.Events.Close.Attach(events.NewClosure(func() {
neighbor.initiatedProtocolMutex.Lock()
defer neighbor.initiatedProtocolMutex.Unlock()
neighbor.InitiatedProtocol = nil
}))
// drop the "secondary" connection upon successful handshake
neighbor.InitiatedProtocol.Events.HandshakeCompleted.Attach(events.NewClosure(func() {
if accountability.OwnId().StringIdentifier <= neighbor.Identity.StringIdentifier {
neighbor.acceptedProtocolMutex.Lock()
var acceptedProtocolConn *network.ManagedConnection
if neighbor.AcceptedProtocol != nil {
acceptedProtocolConn = neighbor.AcceptedProtocol.Conn
}
neighbor.acceptedProtocolMutex.Unlock()
if acceptedProtocolConn != nil {
_ = acceptedProtocolConn.Close()
}
}
neighbor.Events.ProtocolConnectionEstablished.Trigger(neighbor.InitiatedProtocol)
}))
return neighbor.InitiatedProtocol, true, nil
}
func (neighbor *Neighbor) Marshal() []byte {
return nil
}
func (neighbor *Neighbor) Equals(other *Neighbor) bool {
return neighbor.Identity.StringIdentifier == other.Identity.StringIdentifier &&
neighbor.Port == other.Port && neighbor.Address.String() == other.Address.String()
}
func AddNeighbor(newNeighbor *Neighbor) {
neighborLock.Lock()
defer neighborLock.Unlock()
if neighbor, exists := neighbors[newNeighbor.Identity.StringIdentifier]; !exists {
neighbors[newNeighbor.Identity.StringIdentifier] = newNeighbor
Events.AddNeighbor.Trigger(newNeighbor)
} else {
if !neighbor.Equals(newNeighbor) {
neighbor.Identity = newNeighbor.Identity
neighbor.Port = newNeighbor.Port
neighbor.Address = newNeighbor.Address
Events.UpdateNeighbor.Trigger(neighbor)
}
}
}
func RemoveNeighbor(identifier string) {
if _, exists := neighbors[identifier]; exists {
neighborLock.Lock()
defer neighborLock.Unlock()
if neighbor, exists := neighbors[identifier]; exists {
delete(neighbors, identifier)
Events.RemoveNeighbor.Trigger(neighbor)
}
}
}
func GetNeighbor(identifier string) (*Neighbor, bool) {
neighborLock.RLock()
defer neighborLock.RUnlock()
neighbor, exists := neighbors[identifier]
return neighbor, exists
}
func GetNeighbors() map[string]*Neighbor {
neighborLock.RLock()
defer neighborLock.RUnlock()
result := make(map[string]*Neighbor)
for id, neighbor := range neighbors {
result[id] = neighbor
}
return result
}
const (
CONNECTION_MAX_ATTEMPTS = 5
CONNECTION_BASE_TIMEOUT = 10 * time.Second
)
var neighbors = make(map[string]*Neighbor)
var neighborLock sync.RWMutex