diff --git a/plugins/analysis/client/fpc_heartbeato.go b/plugins/analysis/client/fpc_heartbeato.go index 1cedde60d07e67715c0c61b2531169d785cf656a..582a9feebc8f6e9e8f7d10033252cdccb0a6fd0a 100644 --- a/plugins/analysis/client/fpc_heartbeato.go +++ b/plugins/analysis/client/fpc_heartbeato.go @@ -30,9 +30,6 @@ func onRoundExecuted(roundStats *vote.RoundStats) { chunks := splitFPCVoteContext(roundStats.ActiveVoteContexts) - connLock.Lock() - defer connLock.Unlock() - for _, chunk := range chunks { // abort if empty round if len(chunk) == 0 { @@ -63,7 +60,7 @@ func onRoundExecuted(roundStats *vote.RoundStats) { log.Info("Client: onRoundExecuted data size: ", len(data)) - if _, err = managedConn.Write(data); err != nil { + if _, err = conn.Write(data); err != nil { log.Debugw("Error while writing to connection", "Description", err) return } diff --git a/plugins/analysis/client/heartbeat.go b/plugins/analysis/client/heartbeat.go index 365562bc4e32170f2527f2c7454e0f95b8e05261..d719856b7dfbae70ce915093faa75e05230a1602 100644 --- a/plugins/analysis/client/heartbeat.go +++ b/plugins/analysis/client/heartbeat.go @@ -1,13 +1,13 @@ package client import ( + "io" "strings" "github.com/iotaledger/goshimmer/packages/metrics" "github.com/iotaledger/goshimmer/plugins/analysis/packet" "github.com/iotaledger/goshimmer/plugins/autopeering" "github.com/iotaledger/goshimmer/plugins/autopeering/local" - "github.com/iotaledger/hive.go/network" "github.com/mr-tron/base58" ) @@ -17,7 +17,7 @@ type EventDispatchers struct { Heartbeat func(heartbeat *packet.Heartbeat) } -func sendHeartbeat(conn *network.ManagedConnection, hb *packet.Heartbeat) { +func sendHeartbeat(w io.Writer, hb *packet.Heartbeat) { var out strings.Builder for _, value := range hb.OutboundIDs { out.WriteString(base58.Encode(value)) @@ -39,9 +39,7 @@ func sendHeartbeat(conn *network.ManagedConnection, hb *packet.Heartbeat) { return } - connLock.Lock() - defer connLock.Unlock() - if _, err = conn.Write(data); err != nil { + if _, err = w.Write(data); err != nil { log.Debugw("Error while writing to connection", "Description", err) } // trigger AnalysisOutboundBytes event diff --git a/plugins/analysis/client/metric_heartbeat.go b/plugins/analysis/client/metric_heartbeat.go index 46d06ef15db4575824c12322d3d50cff4fdfbaf8..d3f2509289b4314eee2c65fa8e93c10468b61cac 100644 --- a/plugins/analysis/client/metric_heartbeat.go +++ b/plugins/analysis/client/metric_heartbeat.go @@ -1,26 +1,24 @@ package client import ( + "io" "runtime" "time" "github.com/iotaledger/goshimmer/packages/metrics" "github.com/iotaledger/goshimmer/plugins/analysis/packet" "github.com/iotaledger/goshimmer/plugins/autopeering/local" - "github.com/iotaledger/hive.go/network" "github.com/shirou/gopsutil/cpu" ) -func sendMetricHeartbeat(conn *network.ManagedConnection, hb *packet.MetricHeartbeat) { +func sendMetricHeartbeat(w io.Writer, hb *packet.MetricHeartbeat) { data, err := packet.NewMetricHeartbeatMessage(hb) if err != nil { log.Info(err, " - metric heartbeat message skipped") return } - connLock.Lock() - defer connLock.Unlock() - if _, err = conn.Write(data); err != nil { + if _, err = w.Write(data); err != nil { log.Debugw("Error while writing to connection", "Description", err) } // trigger AnalysisOutboundBytes event diff --git a/plugins/analysis/client/plugin.go b/plugins/analysis/client/plugin.go index e79fc801b293b3864392f1dc98baa2d233d18d4d..736e79acf07eb0f1e23dff759eca5d3a9ba5acd8 100644 --- a/plugins/analysis/client/plugin.go +++ b/plugins/analysis/client/plugin.go @@ -34,25 +34,18 @@ func init() { var ( // Plugin is the plugin instance of the analysis client plugin. - Plugin = node.NewPlugin(PluginName, node.Enabled, run) - log *logger.Logger - managedConn *network.ManagedConnection - connLock sync.Mutex + Plugin = node.NewPlugin(PluginName, node.Enabled, run) + log *logger.Logger + conn = &connector{} ) func run(_ *node.Plugin) { finalized = make(map[string]vote.Opinion) log = logger.NewLogger(PluginName) - conn, err := net.Dial("tcp", config.Node.GetString(CfgServerAddress)) - if err != nil { - log.Debugf("Could not connect to reporting server: %s", err.Error()) - return - } - - managedConn = network.NewManagedConnection(conn) - if err := daemon.BackgroundWorker(PluginName, func(shutdownSignal <-chan struct{}) { + conn.Start() + defer conn.Stop() onFinalizedClosure := events.NewClosure(onFinalized) valuetransfers.Voter().Events().Finalized.Attach(onFinalizedClosure) @@ -70,11 +63,61 @@ func run(_ *node.Plugin) { return case <-ticker.C: - sendHeartbeat(managedConn, createHeartbeat()) - sendMetricHeartbeat(managedConn, createMetricHeartbeat()) + sendHeartbeat(conn, createHeartbeat()) + sendMetricHeartbeat(conn, createMetricHeartbeat()) } } }, shutdown.PriorityAnalysis); err != nil { log.Panicf("Failed to start as daemon: %s", err) } } + +type connector struct { + mu sync.Mutex + + conn *network.ManagedConnection + + closeOnce sync.Once + closing chan struct{} +} + +func (c *connector) Start() { + c.closing = make(chan struct{}) + c.new() +} + +func (c *connector) Stop() { + c.closeOnce.Do(func() { + close(c.closing) + if c.conn != nil { + c.conn.Close() + } + }) +} + +func (c *connector) new() { + select { + case _ = <-c.closing: + return + default: + c.mu.Lock() + defer c.mu.Unlock() + + conn, err := net.Dial("tcp", config.Node.GetString(CfgServerAddress)) + if err != nil { + time.AfterFunc(1*time.Minute, c.new) + log.Warn(err) + return + } + c.conn = network.NewManagedConnection(conn) + c.conn.Events.Close.Attach(events.NewClosure(c.new)) + } +} + +func (c *connector) Write(b []byte) (int, error) { + // TODO: check that start was called + // TODO: check that Stop was not called + c.mu.Lock() + defer c.mu.Unlock() + return c.conn.Write(b) +}