Skip to content
Snippets Groups Projects
neighbors.go 6.94 KiB
package gossip

import (
	"math"
	"net"
	"strconv"
	"sync"
	"time"

	"github.com/iotaledger/goshimmer/packages/accountability"
	"github.com/iotaledger/goshimmer/packages/daemon"
	"github.com/iotaledger/goshimmer/packages/errors"
	"github.com/iotaledger/goshimmer/packages/events"
	"github.com/iotaledger/goshimmer/packages/identity"
	"github.com/iotaledger/goshimmer/packages/network"
	"github.com/iotaledger/goshimmer/packages/node"
)

func configureNeighbors(plugin *node.Plugin) {
	Events.AddNeighbor.Attach(events.NewClosure(func(neighbor *Neighbor) {
		plugin.LogSuccess("new neighbor added " + neighbor.Identity.StringIdentifier + "@" + neighbor.Address.String() + ":" + strconv.Itoa(int(neighbor.Port)))
	}))

	Events.UpdateNeighbor.Attach(events.NewClosure(func(neighbor *Neighbor) {
		plugin.LogSuccess("existing neighbor updated " + neighbor.Identity.StringIdentifier + "@" + neighbor.Address.String() + ":" + strconv.Itoa(int(neighbor.Port)))
	}))

	Events.RemoveNeighbor.Attach(events.NewClosure(func(neighbor *Neighbor) {
		plugin.LogSuccess("existing neighbor removed " + neighbor.Identity.StringIdentifier + "@" + neighbor.Address.String() + ":" + strconv.Itoa(int(neighbor.Port)))
	}))
}

func runNeighbors(plugin *node.Plugin) {
	plugin.LogInfo("Starting Neighbor Connection Manager ...")

	neighborLock.RLock()
	for _, neighbor := range GetNeighbors() {
		manageConnection(plugin, neighbor)
	}
	neighborLock.RUnlock()

	Events.AddNeighbor.Attach(events.NewClosure(func(neighbor *Neighbor) {
		manageConnection(plugin, neighbor)
	}))

	plugin.LogSuccess("Starting Neighbor Connection Manager ... done")
}

func manageConnection(plugin *node.Plugin, neighbor *Neighbor) {
	daemon.BackgroundWorker(func() {
		failedConnectionAttempts := 0

		for _, exists := GetNeighbor(neighbor.Identity.StringIdentifier); exists && failedConnectionAttempts < CONNECTION_MAX_ATTEMPTS; {
			protocol, dialed, err := neighbor.Connect()
			if err != nil {
				failedConnectionAttempts++

				plugin.LogFailure("connection attempt [" + strconv.Itoa(int(failedConnectionAttempts)) + "/" + strconv.Itoa(CONNECTION_MAX_ATTEMPTS) + "] " + err.Error())

				if failedConnectionAttempts <= CONNECTION_MAX_ATTEMPTS {
					select {
					case <-daemon.ShutdownSignal:
						return

					case <-time.After(time.Duration(int(math.Pow(2, float64(failedConnectionAttempts-1)))) * CONNECTION_BASE_TIMEOUT):
						continue
					}
				}
			}
			failedConnectionAttempts = 0

			disconnectSignal := make(chan int, 1)
			protocol.Conn.Events.Close.Attach(events.NewClosure(func() {
				close(disconnectSignal)
			}))

			if dialed {
				go protocol.Init()
			}

			// wait for shutdown or
			select {
			case <-daemon.ShutdownSignal:
				return

			case <-disconnectSignal:
				continue
			}
		}

		RemoveNeighbor(neighbor.Identity.StringIdentifier)
	})
}

type Neighbor struct {
	Identity               *identity.Identity
	Address                net.IP
	Port                   uint16
	InitiatedProtocol      *protocol
	AcceptedProtocol       *protocol
	Events                 neighborEvents
	initiatedProtocolMutex sync.RWMutex
	acceptedProtocolMutex  sync.RWMutex
}

func NewNeighbor(identity *identity.Identity, address net.IP, port uint16) *Neighbor {
	return &Neighbor{
		Identity: identity,
		Address:  address,
		Port:     port,
		Events: neighborEvents{
			ProtocolConnectionEstablished: events.NewEvent(protocolCaller),
		},
	}
}

func UnmarshalPeer(data []byte) (*Neighbor, error) {
	return &Neighbor{}, nil
}

func (neighbor *Neighbor) Connect() (*protocol, bool, errors.IdentifiableError) {
	neighbor.initiatedProtocolMutex.Lock()
	defer neighbor.initiatedProtocolMutex.Unlock()

	// return existing connections first
	if neighbor.InitiatedProtocol != nil {
		return neighbor.InitiatedProtocol, false, nil
	}

	// if we already have an accepted connection -> use it instead
	if neighbor.AcceptedProtocol != nil {
		neighbor.acceptedProtocolMutex.RLock()
		if neighbor.AcceptedProtocol != nil {
			defer neighbor.acceptedProtocolMutex.RUnlock()

			return neighbor.AcceptedProtocol, false, nil
		}
		neighbor.acceptedProtocolMutex.RUnlock()
	}

	// otherwise try to dial
	conn, err := net.Dial("tcp", neighbor.Address.String()+":"+strconv.Itoa(int(neighbor.Port)))
	if err != nil {
		return nil, false, ErrConnectionFailed.Derive(err, "error when connecting to neighbor "+
			neighbor.Identity.StringIdentifier+"@"+neighbor.Address.String()+":"+strconv.Itoa(int(neighbor.Port)))
	}

	neighbor.InitiatedProtocol = newProtocol(network.NewManagedConnection(conn))

	neighbor.InitiatedProtocol.Conn.Events.Close.Attach(events.NewClosure(func() {
		neighbor.initiatedProtocolMutex.Lock()
		defer neighbor.initiatedProtocolMutex.Unlock()

		neighbor.InitiatedProtocol = nil
	}))

	// drop the "secondary" connection upon successful handshake
	neighbor.InitiatedProtocol.Events.HandshakeCompleted.Attach(events.NewClosure(func() {
		if accountability.OwnId().StringIdentifier <= neighbor.Identity.StringIdentifier {
			neighbor.acceptedProtocolMutex.Lock()
			var acceptedProtocolConn *network.ManagedConnection
			if neighbor.AcceptedProtocol != nil {
				acceptedProtocolConn = neighbor.AcceptedProtocol.Conn
			}
			neighbor.acceptedProtocolMutex.Unlock()

			if acceptedProtocolConn != nil {
				_ = acceptedProtocolConn.Close()
			}
		}

		neighbor.Events.ProtocolConnectionEstablished.Trigger(neighbor.InitiatedProtocol)
	}))

	return neighbor.InitiatedProtocol, true, nil
}

func (neighbor *Neighbor) Marshal() []byte {
	return nil
}

func (neighbor *Neighbor) Equals(other *Neighbor) bool {
	return neighbor.Identity.StringIdentifier == other.Identity.StringIdentifier &&
		neighbor.Port == other.Port && neighbor.Address.String() == other.Address.String()
}

func AddNeighbor(newNeighbor *Neighbor) {
	neighborLock.Lock()
	defer neighborLock.Unlock()

	if neighbor, exists := neighbors[newNeighbor.Identity.StringIdentifier]; !exists {
		neighbors[newNeighbor.Identity.StringIdentifier] = newNeighbor

		Events.AddNeighbor.Trigger(newNeighbor)
	} else {
		if !neighbor.Equals(newNeighbor) {
			neighbor.Identity = newNeighbor.Identity
			neighbor.Port = newNeighbor.Port
			neighbor.Address = newNeighbor.Address

			Events.UpdateNeighbor.Trigger(neighbor)
		}
	}
}

func RemoveNeighbor(identifier string) {
	if _, exists := neighbors[identifier]; exists {
		neighborLock.Lock()
		defer neighborLock.Unlock()

		if neighbor, exists := neighbors[identifier]; exists {
			delete(neighbors, identifier)

			Events.RemoveNeighbor.Trigger(neighbor)
		}
	}
}

func GetNeighbor(identifier string) (*Neighbor, bool) {
	neighborLock.RLock()
	defer neighborLock.RUnlock()

	neighbor, exists := neighbors[identifier]

	return neighbor, exists
}

func GetNeighbors() map[string]*Neighbor {
	neighborLock.RLock()
	defer neighborLock.RUnlock()

	result := make(map[string]*Neighbor)
	for id, neighbor := range neighbors {
		result[id] = neighbor
	}

	return result
}

const (
	CONNECTION_MAX_ATTEMPTS = 5
	CONNECTION_BASE_TIMEOUT = 10 * time.Second
)

var neighbors = make(map[string]*Neighbor)

var neighborLock sync.RWMutex