Skip to content
Snippets Groups Projects
server.go 10.3 KiB
Newer Older
package server
capossele's avatar
capossele committed

import (
	"bytes"
	"container/list"
capossele's avatar
capossele committed
	"fmt"
	"io"
capossele's avatar
capossele committed
	"net"
capossele's avatar
capossele committed
	"strings"
capossele's avatar
capossele committed
	"sync"
	"time"

	"github.com/golang/protobuf/proto"
	"github.com/iotaledger/goshimmer/packages/autopeering/peer"
	"github.com/iotaledger/goshimmer/packages/autopeering/peer/service"
	pb "github.com/iotaledger/goshimmer/packages/autopeering/server/proto"
capossele's avatar
capossele committed
	"github.com/pkg/errors"
capossele's avatar
capossele committed
	"go.uber.org/zap"
)

var (
capossele's avatar
capossele committed
	// ErrTimeout is returned when an expected incoming connection was not received in time.
	ErrTimeout = errors.New("accept timeout")
	// ErrClosed means that the server was shut down before a response could be received.
	ErrClosed = errors.New("server closed")
capossele's avatar
capossele committed
	// ErrInvalidHandshake is returned when no correct handshake could be established.
capossele's avatar
capossele committed
	ErrInvalidHandshake = errors.New("invalid handshake")
capossele's avatar
capossele committed
	// ErrNoGossip means that the given peer does not support the gossip service.
	ErrNoGossip = errors.New("peer does not have a gossip service")
capossele's avatar
capossele committed
)

// connection timeouts
const (
	acceptTimeout     = 1000 * time.Millisecond
	handshakeTimeout  = 500 * time.Millisecond
capossele's avatar
capossele committed
	connectionTimeout = acceptTimeout + handshakeTimeout
capossele's avatar
capossele committed

	maxHandshakePacketSize = 256
capossele's avatar
capossele committed
)

capossele's avatar
capossele committed
// TCP establishes verified incoming and outgoing TCP connections to other peers.
type TCP struct {
Wolfgang Welz's avatar
Wolfgang Welz committed
	local      *peer.Local
	publicAddr net.Addr
	listener   *net.TCPListener
	log        *zap.SugaredLogger
capossele's avatar
capossele committed

	addAcceptMatcher chan *acceptMatcher
	acceptReceived   chan accept

	closeOnce sync.Once
	wg        sync.WaitGroup
	closing   chan struct{} // if this channel gets closed all pending waits should terminate
}

// connect contains the result of an incoming connection.
type connect struct {
capossele's avatar
capossele committed
	err error
}

type acceptMatcher struct {
	peer      *peer.Peer   // connecting peer
	deadline  time.Time    // deadline for the incoming call
	connected chan connect // result of the connection is signaled here
}

type accept struct {
	fromID peer.ID  // ID of the connecting peer
	req    []byte   // raw data of the handshake request
	conn   net.Conn // the actual network connection
}

// ListenTCP creates the object and starts listening for incoming connections.
func ListenTCP(local *peer.Local, log *zap.SugaredLogger) (*TCP, error) {
capossele's avatar
capossele committed
	t := &TCP{
capossele's avatar
capossele committed
		local:            local,
		log:              log,
		addAcceptMatcher: make(chan *acceptMatcher),
		acceptReceived:   make(chan accept),
		closing:          make(chan struct{}),
	}

Wolfgang Welz's avatar
Wolfgang Welz committed
	t.publicAddr = local.Services().Get(service.GossipKey)
	if t.publicAddr == nil {
capossele's avatar
capossele committed
		return nil, ErrNoGossip
	}
Wolfgang Welz's avatar
Wolfgang Welz committed
	tcpAddr, err := net.ResolveTCPAddr(t.publicAddr.Network(), t.publicAddr.String())
capossele's avatar
capossele committed
	if err != nil {
		return nil, err
	}
	// if the ip is an external ip, set it to unspecified
Wolfgang Welz's avatar
Wolfgang Welz committed
	if tcpAddr.IP.IsGlobalUnicast() {
Wolfgang Welz's avatar
Wolfgang Welz committed
		if tcpAddr.IP.To4() != nil {
Wolfgang Welz's avatar
Wolfgang Welz committed
			tcpAddr.IP = net.IPv4zero
Wolfgang Welz's avatar
Wolfgang Welz committed
		} else {
			tcpAddr.IP = net.IPv6unspecified
Wolfgang Welz's avatar
Wolfgang Welz committed
		}
	}

	listener, err := net.ListenTCP(t.publicAddr.Network(), tcpAddr)
capossele's avatar
capossele committed
	if err != nil {
		return nil, err
	}
	t.listener = listener
Wolfgang Welz's avatar
Wolfgang Welz committed
	t.log.Debugw("listening started",
		"network", listener.Addr().Network(),
		"address", listener.Addr().String(),
	)
capossele's avatar
capossele committed

	t.wg.Add(2)
	go t.run()
	go t.listenLoop()

	return t, nil
}

// Close stops listening on the gossip address.
capossele's avatar
capossele committed
func (t *TCP) Close() {
capossele's avatar
capossele committed
	t.closeOnce.Do(func() {
		close(t.closing)
		if err := t.listener.Close(); err != nil {
			t.log.Warnw("close error", "err", err)
		}
		t.wg.Wait()
	})
}

// LocalAddr returns the listener's network address,
capossele's avatar
capossele committed
func (t *TCP) LocalAddr() net.Addr {
capossele's avatar
capossele committed
	return t.listener.Addr()
}

// DialPeer establishes a gossip connection to the given peer.
// If the peer does not accept the connection or the handshake fails, an error is returned.
func (t *TCP) DialPeer(p *peer.Peer) (net.Conn, error) {
capossele's avatar
capossele committed
	gossipAddr := p.Services().Get(service.GossipKey)
	if gossipAddr == nil {
		return nil, ErrNoGossip
	}

	conn, err := net.DialTimeout(gossipAddr.Network(), gossipAddr.String(), acceptTimeout)
	if err != nil {
		return nil, errors.Wrap(err, "dial peer failed")
capossele's avatar
capossele committed
	}

	err = t.doHandshake(p.PublicKey(), gossipAddr.String(), conn)
	if err != nil {
		return nil, errors.Wrap(err, "outgoing handshake failed")
capossele's avatar
capossele committed
	}

	t.log.Debugw("outgoing connection established",
		"id", p.ID(),
		"addr", conn.RemoteAddr(),
	)
capossele's avatar
capossele committed
}

// AcceptPeer awaits an incoming connection from the given peer.
// If the peer does not establish the connection or the handshake fails, an error is returned.
func (t *TCP) AcceptPeer(p *peer.Peer) (net.Conn, error) {
capossele's avatar
capossele committed
	if p.Services().Get(service.GossipKey) == nil {
		return nil, ErrNoGossip
	}
capossele's avatar
capossele committed
	// wait for the connection
	connected := <-t.acceptPeer(p)
	if connected.err != nil {
		return nil, errors.Wrap(connected.err, "accept peer failed")
capossele's avatar
capossele committed
	}
	t.log.Debugw("incoming connection established",
		"id", p.ID(),
		"addr", connected.c.RemoteAddr(),
	)
capossele's avatar
capossele committed
	return connected.c, nil
}

capossele's avatar
capossele committed
func (t *TCP) acceptPeer(p *peer.Peer) <-chan connect {
capossele's avatar
capossele committed
	connected := make(chan connect, 1)
	// add the matcher
	select {
	case t.addAcceptMatcher <- &acceptMatcher{peer: p, connected: connected}:
	case <-t.closing:
		connected <- connect{nil, ErrClosed}
	}
	return connected
}

capossele's avatar
capossele committed
func (t *TCP) closeConnection(c net.Conn) {
capossele's avatar
capossele committed
	if err := c.Close(); err != nil {
		t.log.Warnw("close error", "err", err)
	}
}

capossele's avatar
capossele committed
func (t *TCP) run() {
capossele's avatar
capossele committed
	defer t.wg.Done()

	var (
capossele's avatar
capossele committed
		matcherList = list.New()
		timeout     = time.NewTimer(0)
capossele's avatar
capossele committed
	)
	defer timeout.Stop()

	<-timeout.C // ignore first timeout

	for {

		// Set the timer so that it fires when the next accept expires
capossele's avatar
capossele committed
		if e := matcherList.Front(); e != nil {
capossele's avatar
capossele committed
			// the first element always has the closest deadline
capossele's avatar
capossele committed
			m := e.Value.(*acceptMatcher)
capossele's avatar
capossele committed
			timeout.Reset(time.Until(m.deadline))
		} else {
			timeout.Stop()
		}

		select {

		// add a new matcher to the list
		case m := <-t.addAcceptMatcher:
			m.deadline = time.Now().Add(connectionTimeout)
capossele's avatar
capossele committed
			matcherList.PushBack(m)
capossele's avatar
capossele committed

		// on accept received, check all matchers for a fit
		case a := <-t.acceptReceived:
			matched := false
capossele's avatar
capossele committed
			for e := matcherList.Front(); e != nil; e = e.Next() {
				m := e.Value.(*acceptMatcher)
capossele's avatar
capossele committed
				if m.peer.ID() == a.fromID {
					matched = true
capossele's avatar
capossele committed
					matcherList.Remove(e)
capossele's avatar
capossele committed
					// finish the handshake
					go t.matchAccept(m, a.req, a.conn)
				}
			}
			// close the connection if not matched
			if !matched {
				t.log.Debugw("unexpected connection", "id", a.fromID, "addr", a.conn.RemoteAddr())
				t.closeConnection(a.conn)
			}

		// on timeout, check for expired matchers
		case <-timeout.C:
			now := time.Now()

			// notify and remove any expired matchers
capossele's avatar
capossele committed
			for e := matcherList.Front(); e != nil; e = e.Next() {
				m := e.Value.(*acceptMatcher)
capossele's avatar
capossele committed
				if now.After(m.deadline) || now.Equal(m.deadline) {
					m.connected <- connect{nil, ErrTimeout}
capossele's avatar
capossele committed
					matcherList.Remove(e)
capossele's avatar
capossele committed
				}
			}

		// on close, notify all the matchers
		case <-t.closing:
capossele's avatar
capossele committed
			for e := matcherList.Front(); e != nil; e = e.Next() {
				e.Value.(*acceptMatcher).connected <- connect{nil, ErrClosed}
capossele's avatar
capossele committed
func (t *TCP) matchAccept(m *acceptMatcher, req []byte, conn net.Conn) {
capossele's avatar
capossele committed
	t.wg.Add(1)
	defer t.wg.Done()

	if err := t.writeHandshakeResponse(req, conn); err != nil {
		m.connected <- connect{nil, errors.Wrap(err, "incoming handshake failed")}
capossele's avatar
capossele committed
		t.closeConnection(conn)
		return
	}
	m.connected <- connect{conn, nil}
capossele's avatar
capossele committed
}

capossele's avatar
capossele committed
func (t *TCP) listenLoop() {
capossele's avatar
capossele committed
	defer t.wg.Done()

	for {
		conn, err := t.listener.AcceptTCP()
		if err, ok := err.(net.Error); ok && err.Temporary() {
			t.log.Debugw("temporary read error", "err", err)
			continue
		} else if err != nil {
			// return from the loop on all other errors
capossele's avatar
capossele committed
			if err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") {
				t.log.Warnw("listen error", "err", err)
			}
			t.log.Debug("listening stopped")
capossele's avatar
capossele committed
			return
		}

		key, req, err := t.readHandshakeRequest(conn)
		if err != nil {
			t.log.Warnw("failed handshake", "addr", conn.RemoteAddr(), "err", err)
			t.closeConnection(conn)
			continue
		}

		select {
		case t.acceptReceived <- accept{
			fromID: key.ID(),
			req:    req,
			conn:   conn,
		}:
		case <-t.closing:
			t.closeConnection(conn)
			return
		}
	}
}

capossele's avatar
capossele committed
func (t *TCP) doHandshake(key peer.PublicKey, remoteAddr string, conn net.Conn) error {
Wolfgang Welz's avatar
Wolfgang Welz committed
	reqData, err := newHandshakeRequest(remoteAddr)
capossele's avatar
capossele committed
	if err != nil {
		return err
	}

	pkt := &pb.Packet{
		PublicKey: t.local.PublicKey(),
		Signature: t.local.Sign(reqData),
		Data:      reqData,
	}
	b, err := proto.Marshal(pkt)
	if err != nil {
		return err
capossele's avatar
capossele committed
	}
	if l := len(b); l > maxHandshakePacketSize {
		return fmt.Errorf("handshake size too large: %d, max %d", l, maxHandshakePacketSize)
capossele's avatar
capossele committed
	}

	err = conn.SetWriteDeadline(time.Now().Add(handshakeTimeout))
	if err != nil {
capossele's avatar
capossele committed
		return err
	}
	_, err = conn.Write(b)
	if err != nil {
		return err
	}

	err = conn.SetReadDeadline(time.Now().Add(handshakeTimeout))
	if err != nil {
capossele's avatar
capossele committed
		return err
	}
capossele's avatar
capossele committed
	b = make([]byte, maxHandshakePacketSize)
capossele's avatar
capossele committed
	n, err := conn.Read(b)
	if err != nil {
		return err
	}

	pkt = &pb.Packet{}
	err = proto.Unmarshal(b[:n], pkt)
	if err != nil {
		return err
capossele's avatar
capossele committed
	}

	signer, err := peer.RecoverKeyFromSignedData(pkt)
capossele's avatar
capossele committed
	if err != nil || !bytes.Equal(key, signer) {
		return ErrInvalidHandshake
capossele's avatar
capossele committed
	}
	if !t.validateHandshakeResponse(pkt.GetData(), reqData) {
		return ErrInvalidHandshake
	}

	return nil
}

capossele's avatar
capossele committed
func (t *TCP) readHandshakeRequest(conn net.Conn) (peer.PublicKey, []byte, error) {
capossele's avatar
capossele committed
	if err := conn.SetReadDeadline(time.Now().Add(handshakeTimeout)); err != nil {
		return nil, nil, err
	}
capossele's avatar
capossele committed
	b := make([]byte, maxHandshakePacketSize)
capossele's avatar
capossele committed
	n, err := conn.Read(b)
	if err != nil {
capossele's avatar
capossele committed
		return nil, nil, errors.Wrap(err, ErrInvalidHandshake.Error())
capossele's avatar
capossele committed
	}

	pkt := &pb.Packet{}
	err = proto.Unmarshal(b[:n], pkt)
	if err != nil {
capossele's avatar
capossele committed
		return nil, nil, err
	}

	key, err := peer.RecoverKeyFromSignedData(pkt)
	if err != nil {
		return nil, nil, err
	}

Wolfgang Welz's avatar
Wolfgang Welz committed
	if !t.validateHandshakeRequest(pkt.GetData()) {
capossele's avatar
capossele committed
		return nil, nil, ErrInvalidHandshake
	}

	return key, pkt.GetData(), nil
}

capossele's avatar
capossele committed
func (t *TCP) writeHandshakeResponse(reqData []byte, conn net.Conn) error {
capossele's avatar
capossele committed
	data, err := newHandshakeResponse(reqData)
	if err != nil {
		return err
	}

	pkt := &pb.Packet{
		PublicKey: t.local.PublicKey(),
		Signature: t.local.Sign(data),
		Data:      data,
	}
	b, err := proto.Marshal(pkt)
	if err != nil {
		return err
	}
capossele's avatar
capossele committed
	if l := len(b); l > maxHandshakePacketSize {
		return fmt.Errorf("handshake size too large: %d, max %d", l, maxHandshakePacketSize)
	}
capossele's avatar
capossele committed

	err = conn.SetWriteDeadline(time.Now().Add(handshakeTimeout))
	if err != nil {
capossele's avatar
capossele committed
		return err
	}
	_, err = conn.Write(b)
	if err != nil {
		return err
	}

	return nil
}