Skip to content
Snippets Groups Projects
Unverified Commit 76c9299d authored by Wolfgang Welz's avatar Wolfgang Welz
Browse files

connector cleanup

parent 9f8206d7
Branches
No related tags found
No related merge requests found
package client
import (
"errors"
"net"
"sync"
"time"
"github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/netutil"
"github.com/iotaledger/hive.go/network"
)
// errors returned by the Connector
var (
errNoConnection = errors.New("no connection established")
)
// time after which a failed Dial is retried
var redialInterval = 1 * time.Minute
// A Connector is a redialing Writer on the underlying connection.
type Connector struct {
network string
address string
mu sync.Mutex
conn *network.ManagedConnection
startOnce sync.Once
stopOnce sync.Once
stopping chan struct{}
}
// NewConnector creates a new Connector.
func NewConnector(network string, address string) *Connector {
return &Connector{
network: network,
address: address,
stopping: make(chan struct{}),
}
}
// Start starts the Connector.
func (c *Connector) Start() {
c.startOnce.Do(c.dial)
}
// Stop stops the Connector.
func (c *Connector) Stop() {
c.stopOnce.Do(func() {
close(c.stopping)
_ = c.Close()
})
}
// Close closes the current connection.
// If the Connector is not closed, a new redial will be triggered.
func (c *Connector) Close() (err error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.conn != nil {
err = c.conn.Close()
}
return
}
// Write writes data to the underlying connection.
// It returns an error if currently no connection has been established.
func (c *Connector) Write(b []byte) (int, error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.conn == nil {
return 0, errNoConnection
}
n, err := c.conn.Write(b)
// TODO: should the closing rather only happen outside?
// TODO: is the IsTemporaryError useful here?
if err != nil && !netutil.IsTemporaryError(err) {
_ = c.conn.Close()
}
return n, err
}
func (c *Connector) dial() {
c.mu.Lock()
defer c.mu.Unlock()
select {
case <-c.stopping:
return
default:
c.conn = nil
conn, err := net.Dial(c.network, c.address)
if err != nil {
log.Warnw("Dial error", "err", err)
go c.scheduleRedial()
return
}
c.conn = network.NewManagedConnection(conn)
c.conn.Events.Close.Attach(events.NewClosure(c.dial))
}
}
func (c *Connector) scheduleRedial() {
t := time.NewTimer(redialInterval)
defer t.Stop()
select {
case <-c.stopping:
return
case <-t.C:
c.dial()
}
}
package client package client
import ( import (
"errors"
"net"
"sync"
"time" "time"
"github.com/iotaledger/goshimmer/dapps/valuetransfers" "github.com/iotaledger/goshimmer/dapps/valuetransfers"
...@@ -13,8 +10,6 @@ import ( ...@@ -13,8 +10,6 @@ import (
"github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/daemon"
"github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/events"
"github.com/iotaledger/hive.go/logger" "github.com/iotaledger/hive.go/logger"
"github.com/iotaledger/hive.go/netutil"
"github.com/iotaledger/hive.go/network"
"github.com/iotaledger/hive.go/node" "github.com/iotaledger/hive.go/node"
flag "github.com/spf13/pflag" flag "github.com/spf13/pflag"
) )
...@@ -37,8 +32,9 @@ func init() { ...@@ -37,8 +32,9 @@ func init() {
var ( var (
// Plugin is the plugin instance of the analysis client plugin. // Plugin is the plugin instance of the analysis client plugin.
Plugin = node.NewPlugin(PluginName, node.Enabled, run) Plugin = node.NewPlugin(PluginName, node.Enabled, run)
log *logger.Logger conn = NewConnector("tcp", config.Node.GetString(CfgServerAddress))
conn = &connector{}
log *logger.Logger
) )
func run(_ *node.Plugin) { func run(_ *node.Plugin) {
...@@ -73,73 +69,3 @@ func run(_ *node.Plugin) { ...@@ -73,73 +69,3 @@ func run(_ *node.Plugin) {
log.Panicf("Failed to start as daemon: %s", err) log.Panicf("Failed to start as daemon: %s", err)
} }
} }
type connector struct {
mu sync.Mutex
conn *network.ManagedConnection
closeOnce sync.Once
closing chan struct{}
}
func (c *connector) Start() {
c.closing = make(chan struct{})
c.new()
}
func (c *connector) Stop() {
c.closeOnce.Do(func() {
close(c.closing)
if c.conn != nil {
c.conn.Close()
}
})
}
func (c *connector) new() {
c.mu.Lock()
defer c.mu.Unlock()
select {
case _ = <-c.closing:
return
default:
c.conn = nil
tcpConn, err := net.Dial("tcp", config.Node.GetString(CfgServerAddress))
if err != nil {
time.AfterFunc(1*time.Minute, c.new)
log.Warn(err)
return
}
c.conn = network.NewManagedConnection(tcpConn)
c.conn.Events.Close.Attach(events.NewClosure(c.new))
}
}
// Close closes the current connection.
func (c *connector) Close() (err error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.conn != nil {
err = c.conn.Close()
}
return
}
func (c *connector) Write(b []byte) (int, error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.conn == nil {
return 0, errors.New("no connection established")
}
n, err := c.conn.Write(b)
// TODO: should the closing rather only happen outside?
// TODO: is the IsTemporaryError useful here?
if err != nil && !netutil.IsTemporaryError(err) {
c.conn.Close()
}
return n, err
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment