diff --git a/plugins/analysis/client/connector.go b/plugins/analysis/client/connector.go new file mode 100644 index 0000000000000000000000000000000000000000..c8e07092ec29d7003ac66dd8eab2c570afee7b5c --- /dev/null +++ b/plugins/analysis/client/connector.go @@ -0,0 +1,116 @@ +package client + +import ( + "errors" + "net" + "sync" + "time" + + "github.com/iotaledger/hive.go/events" + "github.com/iotaledger/hive.go/netutil" + "github.com/iotaledger/hive.go/network" +) + +// errors returned by the Connector +var ( + errNoConnection = errors.New("no connection established") +) + +// time after which a failed Dial is retried +var redialInterval = 1 * time.Minute + +// A Connector is a redialing Writer on the underlying connection. +type Connector struct { + network string + address string + + mu sync.Mutex + conn *network.ManagedConnection + + startOnce sync.Once + stopOnce sync.Once + stopping chan struct{} +} + +// NewConnector creates a new Connector. +func NewConnector(network string, address string) *Connector { + return &Connector{ + network: network, + address: address, + stopping: make(chan struct{}), + } +} + +// Start starts the Connector. +func (c *Connector) Start() { + c.startOnce.Do(c.dial) +} + +// Stop stops the Connector. +func (c *Connector) Stop() { + c.stopOnce.Do(func() { + close(c.stopping) + _ = c.Close() + }) +} + +// Close closes the current connection. +// If the Connector is not closed, a new redial will be triggered. +func (c *Connector) Close() (err error) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.conn != nil { + err = c.conn.Close() + } + return +} + +// Write writes data to the underlying connection. +// It returns an error if currently no connection has been established. +func (c *Connector) Write(b []byte) (int, error) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.conn == nil { + return 0, errNoConnection + } + n, err := c.conn.Write(b) + // TODO: should the closing rather only happen outside? + // TODO: is the IsTemporaryError useful here? + if err != nil && !netutil.IsTemporaryError(err) { + _ = c.conn.Close() + } + return n, err +} + +func (c *Connector) dial() { + c.mu.Lock() + defer c.mu.Unlock() + + select { + case <-c.stopping: + return + default: + c.conn = nil + conn, err := net.Dial(c.network, c.address) + if err != nil { + log.Warnw("Dial error", "err", err) + go c.scheduleRedial() + return + } + c.conn = network.NewManagedConnection(conn) + c.conn.Events.Close.Attach(events.NewClosure(c.dial)) + } +} + +func (c *Connector) scheduleRedial() { + t := time.NewTimer(redialInterval) + defer t.Stop() + select { + case <-c.stopping: + return + case <-t.C: + c.dial() + } +} diff --git a/plugins/analysis/client/plugin.go b/plugins/analysis/client/plugin.go index 3bb144f3c10172e173759d42b0b65a18d058e5f7..fd10a9269d2317773faf36ee30b9bbd9f0970a72 100644 --- a/plugins/analysis/client/plugin.go +++ b/plugins/analysis/client/plugin.go @@ -1,9 +1,6 @@ package client import ( - "errors" - "net" - "sync" "time" "github.com/iotaledger/goshimmer/dapps/valuetransfers" @@ -13,8 +10,6 @@ import ( "github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/logger" - "github.com/iotaledger/hive.go/netutil" - "github.com/iotaledger/hive.go/network" "github.com/iotaledger/hive.go/node" flag "github.com/spf13/pflag" ) @@ -37,8 +32,9 @@ func init() { var ( // Plugin is the plugin instance of the analysis client plugin. Plugin = node.NewPlugin(PluginName, node.Enabled, run) - log *logger.Logger - conn = &connector{} + conn = NewConnector("tcp", config.Node.GetString(CfgServerAddress)) + + log *logger.Logger ) func run(_ *node.Plugin) { @@ -73,73 +69,3 @@ func run(_ *node.Plugin) { log.Panicf("Failed to start as daemon: %s", err) } } - -type connector struct { - mu sync.Mutex - - conn *network.ManagedConnection - - closeOnce sync.Once - closing chan struct{} -} - -func (c *connector) Start() { - c.closing = make(chan struct{}) - c.new() -} - -func (c *connector) Stop() { - c.closeOnce.Do(func() { - close(c.closing) - if c.conn != nil { - c.conn.Close() - } - }) -} - -func (c *connector) new() { - c.mu.Lock() - defer c.mu.Unlock() - - select { - case _ = <-c.closing: - return - default: - c.conn = nil - tcpConn, err := net.Dial("tcp", config.Node.GetString(CfgServerAddress)) - if err != nil { - time.AfterFunc(1*time.Minute, c.new) - log.Warn(err) - return - } - c.conn = network.NewManagedConnection(tcpConn) - c.conn.Events.Close.Attach(events.NewClosure(c.new)) - } -} - -// Close closes the current connection. -func (c *connector) Close() (err error) { - c.mu.Lock() - defer c.mu.Unlock() - - if c.conn != nil { - err = c.conn.Close() - } - return -} - -func (c *connector) Write(b []byte) (int, error) { - c.mu.Lock() - defer c.mu.Unlock() - - if c.conn == nil { - return 0, errors.New("no connection established") - } - n, err := c.conn.Write(b) - // TODO: should the closing rather only happen outside? - // TODO: is the IsTemporaryError useful here? - if err != nil && !netutil.IsTemporaryError(err) { - c.conn.Close() - } - return n, err -}