Skip to content
Snippets Groups Projects
Unverified Commit 1c3f0760 authored by Wolfgang Welz's avatar Wolfgang Welz
Browse files

wip: redial on lost connection

parent 0bf4e5a2
No related branches found
No related tags found
No related merge requests found
......@@ -30,9 +30,6 @@ func onRoundExecuted(roundStats *vote.RoundStats) {
chunks := splitFPCVoteContext(roundStats.ActiveVoteContexts)
connLock.Lock()
defer connLock.Unlock()
for _, chunk := range chunks {
// abort if empty round
if len(chunk) == 0 {
......@@ -63,7 +60,7 @@ func onRoundExecuted(roundStats *vote.RoundStats) {
log.Info("Client: onRoundExecuted data size: ", len(data))
if _, err = managedConn.Write(data); err != nil {
if _, err = conn.Write(data); err != nil {
log.Debugw("Error while writing to connection", "Description", err)
return
}
......
package client
import (
"io"
"strings"
"github.com/iotaledger/goshimmer/packages/metrics"
"github.com/iotaledger/goshimmer/plugins/analysis/packet"
"github.com/iotaledger/goshimmer/plugins/autopeering"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/hive.go/network"
"github.com/mr-tron/base58"
)
......@@ -17,7 +17,7 @@ type EventDispatchers struct {
Heartbeat func(heartbeat *packet.Heartbeat)
}
func sendHeartbeat(conn *network.ManagedConnection, hb *packet.Heartbeat) {
func sendHeartbeat(w io.Writer, hb *packet.Heartbeat) {
var out strings.Builder
for _, value := range hb.OutboundIDs {
out.WriteString(base58.Encode(value))
......@@ -39,9 +39,7 @@ func sendHeartbeat(conn *network.ManagedConnection, hb *packet.Heartbeat) {
return
}
connLock.Lock()
defer connLock.Unlock()
if _, err = conn.Write(data); err != nil {
if _, err = w.Write(data); err != nil {
log.Debugw("Error while writing to connection", "Description", err)
}
// trigger AnalysisOutboundBytes event
......
package client
import (
"io"
"runtime"
"time"
"github.com/iotaledger/goshimmer/packages/metrics"
"github.com/iotaledger/goshimmer/plugins/analysis/packet"
"github.com/iotaledger/goshimmer/plugins/autopeering/local"
"github.com/iotaledger/hive.go/network"
"github.com/shirou/gopsutil/cpu"
)
func sendMetricHeartbeat(conn *network.ManagedConnection, hb *packet.MetricHeartbeat) {
func sendMetricHeartbeat(w io.Writer, hb *packet.MetricHeartbeat) {
data, err := packet.NewMetricHeartbeatMessage(hb)
if err != nil {
log.Info(err, " - metric heartbeat message skipped")
return
}
connLock.Lock()
defer connLock.Unlock()
if _, err = conn.Write(data); err != nil {
if _, err = w.Write(data); err != nil {
log.Debugw("Error while writing to connection", "Description", err)
}
// trigger AnalysisOutboundBytes event
......
......@@ -34,25 +34,18 @@ func init() {
var (
// Plugin is the plugin instance of the analysis client plugin.
Plugin = node.NewPlugin(PluginName, node.Enabled, run)
log *logger.Logger
managedConn *network.ManagedConnection
connLock sync.Mutex
Plugin = node.NewPlugin(PluginName, node.Enabled, run)
log *logger.Logger
conn = &connector{}
)
func run(_ *node.Plugin) {
finalized = make(map[string]vote.Opinion)
log = logger.NewLogger(PluginName)
conn, err := net.Dial("tcp", config.Node.GetString(CfgServerAddress))
if err != nil {
log.Debugf("Could not connect to reporting server: %s", err.Error())
return
}
managedConn = network.NewManagedConnection(conn)
if err := daemon.BackgroundWorker(PluginName, func(shutdownSignal <-chan struct{}) {
conn.Start()
defer conn.Stop()
onFinalizedClosure := events.NewClosure(onFinalized)
valuetransfers.Voter().Events().Finalized.Attach(onFinalizedClosure)
......@@ -70,11 +63,61 @@ func run(_ *node.Plugin) {
return
case <-ticker.C:
sendHeartbeat(managedConn, createHeartbeat())
sendMetricHeartbeat(managedConn, createMetricHeartbeat())
sendHeartbeat(conn, createHeartbeat())
sendMetricHeartbeat(conn, createMetricHeartbeat())
}
}
}, shutdown.PriorityAnalysis); err != nil {
log.Panicf("Failed to start as daemon: %s", err)
}
}
type connector struct {
mu sync.Mutex
conn *network.ManagedConnection
closeOnce sync.Once
closing chan struct{}
}
func (c *connector) Start() {
c.closing = make(chan struct{})
c.new()
}
func (c *connector) Stop() {
c.closeOnce.Do(func() {
close(c.closing)
if c.conn != nil {
c.conn.Close()
}
})
}
func (c *connector) new() {
select {
case _ = <-c.closing:
return
default:
c.mu.Lock()
defer c.mu.Unlock()
conn, err := net.Dial("tcp", config.Node.GetString(CfgServerAddress))
if err != nil {
time.AfterFunc(1*time.Minute, c.new)
log.Warn(err)
return
}
c.conn = network.NewManagedConnection(conn)
c.conn.Events.Close.Attach(events.NewClosure(c.new))
}
}
func (c *connector) Write(b []byte) (int, error) {
// TODO: check that start was called
// TODO: check that Stop was not called
c.mu.Lock()
defer c.mu.Unlock()
return c.conn.Write(b)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment