Skip to content
Snippets Groups Projects
neighbors.go 7.86 KiB
package gossip

import (
    "github.com/iotaledger/goshimmer/packages/accountability"
    "github.com/iotaledger/goshimmer/packages/daemon"
    "github.com/iotaledger/goshimmer/packages/errors"
    "github.com/iotaledger/goshimmer/packages/events"
    "github.com/iotaledger/goshimmer/packages/identity"
    "github.com/iotaledger/goshimmer/packages/network"
    "github.com/iotaledger/goshimmer/packages/node"
    "math"
    "net"
    "strconv"
    "sync"
    "time"
)

func configureNeighbors(plugin *node.Plugin) {
    Events.AddNeighbor.Attach(events.NewClosure(func(neighbor *Neighbor) {
        plugin.LogSuccess("new neighbor added " + neighbor.Identity.StringIdentifier + "@" + neighbor.Address.String() + ":" + strconv.Itoa(int(neighbor.Port)))
    }))

    Events.UpdateNeighbor.Attach(events.NewClosure(func(neighbor *Neighbor) {
        plugin.LogSuccess("existing neighbor updated " + neighbor.Identity.StringIdentifier + "@" + neighbor.Address.String() + ":" + strconv.Itoa(int(neighbor.Port)))
    }))

    Events.RemoveNeighbor.Attach(events.NewClosure(func(neighbor *Neighbor) {
        plugin.LogSuccess("existing neighbor removed " + neighbor.Identity.StringIdentifier + "@" + neighbor.Address.String() + ":" + strconv.Itoa(int(neighbor.Port)))
    }))
}

func runNeighbors(plugin *node.Plugin) {
    plugin.LogInfo("Starting Neighbor Connection Manager ...")

    neighborLock.RLock()
    for _, neighbor := range GetNeighbors() {
        manageConnection(plugin, neighbor)
    }
    neighborLock.RUnlock()

    Events.AddNeighbor.Attach(events.NewClosure(func(neighbor *Neighbor) {
        manageConnection(plugin, neighbor)
    }))

    plugin.LogSuccess("Starting Neighbor Connection Manager ... done")
}

func manageConnection(plugin *node.Plugin, neighbor *Neighbor) {
    daemon.BackgroundWorker(func() {
        failedConnectionAttempts := 0

        for _, exists := GetNeighbor(neighbor.Identity.StringIdentifier); exists && failedConnectionAttempts < CONNECTION_MAX_ATTEMPTS; {
            protocol, dialed, err := neighbor.Connect()
            if err != nil {
                failedConnectionAttempts++

                plugin.LogFailure("connection attempt [" + strconv.Itoa(int(failedConnectionAttempts)) + "/" + strconv.Itoa(CONNECTION_MAX_ATTEMPTS) + "] " + err.Error())

                if failedConnectionAttempts <= CONNECTION_MAX_ATTEMPTS {
                    select {
                    case <-daemon.ShutdownSignal:
                        return

                    case <-time.After(time.Duration(int(math.Pow(2, float64(failedConnectionAttempts-1)))) * CONNECTION_BASE_TIMEOUT):
                        continue
                    }
                }
            }

            failedConnectionAttempts = 0

            disconnectSignal := make(chan int, 1)
            protocol.Conn.Events.Close.Attach(events.NewClosure(func() {
                close(disconnectSignal)
            }))

            if dialed {
                go protocol.Init()
            }

            // wait for shutdown or
            select {
            case <-daemon.ShutdownSignal:
                return

            case <-disconnectSignal:
                break
            }
        }

        RemoveNeighbor(neighbor.Identity.StringIdentifier)
    })
}

type Neighbor struct {
    Identity               *identity.Identity
    Address                net.IP
    Port                   uint16
    InitiatedProtocol      *protocol
    AcceptedProtocol       *protocol
    Events                 neighborEvents
    initiatedProtocolMutex sync.RWMutex
    acceptedProtocolMutex  sync.RWMutex
}

func NewNeighbor(identity *identity.Identity, address net.IP, port uint16) *Neighbor {
    return &Neighbor{
        Identity: identity,
        Address:  address,
        Port:     port,
        Events: neighborEvents{
            ProtocolConnectionEstablished: events.NewEvent(protocolCaller),
        },
    }
}

func UnmarshalPeer(data []byte) (*Neighbor, error) {
    return &Neighbor{}, nil
}

func (neighbor *Neighbor) Connect() (*protocol, bool, errors.IdentifiableError) {
    neighbor.initiatedProtocolMutex.Lock()
    defer neighbor.initiatedProtocolMutex.Unlock()

    // return existing connections first
    if neighbor.InitiatedProtocol != nil {
        return neighbor.InitiatedProtocol, false, nil
    }

    // if we already have an accepted connection -> use it instead
    if neighbor.AcceptedProtocol != nil {
        neighbor.acceptedProtocolMutex.RLock()
        if neighbor.AcceptedProtocol != nil {
            defer neighbor.acceptedProtocolMutex.RUnlock()

            return neighbor.AcceptedProtocol, false, nil
        }
        neighbor.acceptedProtocolMutex.RUnlock()
    }
    // otherwise try to dial
    conn, err := net.Dial("tcp", neighbor.Address.String()+":"+strconv.Itoa(int(neighbor.Port)))
    if err != nil {
        return nil, false, ErrConnectionFailed.Derive(err, "error when connecting to neighbor "+
            neighbor.Identity.StringIdentifier+"@"+neighbor.Address.String()+":"+strconv.Itoa(int(neighbor.Port)))
    }

    neighbor.InitiatedProtocol = newProtocol(network.NewManagedConnection(conn))

    neighbor.InitiatedProtocol.Conn.Events.Close.Attach(events.NewClosure(func() {
        neighbor.initiatedProtocolMutex.Lock()
        defer neighbor.initiatedProtocolMutex.Unlock()

        neighbor.InitiatedProtocol = nil
    }))

    // drop the "secondary" connection upon successful handshake
    neighbor.InitiatedProtocol.Events.HandshakeCompleted.Attach(events.NewClosure(func() {
        if accountability.OWN_ID.StringIdentifier <= neighbor.Identity.StringIdentifier {
            neighbor.acceptedProtocolMutex.Lock()
            var acceptedProtocolConn *network.ManagedConnection
            if neighbor.AcceptedProtocol != nil {
                acceptedProtocolConn = neighbor.AcceptedProtocol.Conn
            }
            neighbor.acceptedProtocolMutex.Unlock()

            if acceptedProtocolConn != nil {
                _ = acceptedProtocolConn.Close()
            }
        }

        neighbor.Events.ProtocolConnectionEstablished.Trigger(neighbor.InitiatedProtocol)
    }))

    return neighbor.InitiatedProtocol, true, nil
}

func (neighbor *Neighbor) Marshal() []byte {
    return nil
}

func (neighbor *Neighbor) Equals(other *Neighbor) bool {
    return neighbor.Identity.StringIdentifier == neighbor.Identity.StringIdentifier &&
        neighbor.Port == other.Port && neighbor.Address.String() == other.Address.String()
}

func AddNeighbor(newNeighbor *Neighbor) {
    neighborLock.Lock()
    defer neighborLock.Unlock()

    if neighbor, exists := neighbors[newNeighbor.Identity.StringIdentifier]; !exists {
        neighbors[newNeighbor.Identity.StringIdentifier] = newNeighbor

        Events.AddNeighbor.Trigger(newNeighbor)
    } else {
        if !newNeighbor.Equals(neighbor) {
            neighbor.Identity = neighbor.Identity
            neighbor.Port = neighbor.Port
            neighbor.Address = neighbor.Address

            Events.UpdateNeighbor.Trigger(newNeighbor)
        }
    }
}

func RemoveNeighbor(identifier string) {
    if _, exists := neighbors[identifier]; exists {
        neighborLock.Lock()
        defer neighborLock.Unlock()
        if neighbor, exists := neighbors[identifier]; exists {
            delete(neighbors, identifier)

            Events.RemoveNeighbor.Trigger(neighbor)
        }
    }
}

func GetNeighbor(identifier string) (*Neighbor, bool) {
    neighborLock.RLock()
    defer neighborLock.RUnlock()

    neighbor, exists := neighbors[identifier]

    return neighbor, exists
}

func GetNeighbors() map[string]*Neighbor {
    neighborLock.RLock()
    defer neighborLock.RUnlock()

    result := make(map[string]*Neighbor)
    for id, neighbor := range neighbors {
        result[id] = neighbor
    }

    return result
}

const (
    CONNECTION_MAX_ATTEMPTS = 5
    CONNECTION_BASE_TIMEOUT = 10 * time.Second
)

var neighbors = make(map[string]*Neighbor)

var neighborLock sync.RWMutex