diff --git a/go.mod b/go.mod index b802194bc7ac0740b862b922f4e60abfbbbca264..a201dffd6d7524d0deec39462ea1d34aca745e22 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/golang/protobuf v1.3.5 github.com/google/go-cmp v0.4.0 github.com/gorilla/websocket v1.4.1 - github.com/iotaledger/hive.go v0.0.0-20200617164933-c48b4401b814 + github.com/iotaledger/hive.go v0.0.0-20200618111525-99e5fb050f12 github.com/iotaledger/iota.go v1.0.0-beta.14 github.com/labstack/echo v3.3.10+incompatible github.com/labstack/gommon v0.3.0 diff --git a/go.sum b/go.sum index d56d14b5eb88dbea5dd9a357508507a3089efc06..6d1a6f85347c69a0cc388ef5aed9353a5ebb6f84 100644 --- a/go.sum +++ b/go.sum @@ -155,10 +155,8 @@ github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= -github.com/iotaledger/hive.go v0.0.0-20200615181744-3ab0cddc15cf h1:JW1kB2q+arYv4KgRIvyGUj7Svf20EUQRswDe3+8sYSw= -github.com/iotaledger/hive.go v0.0.0-20200615181744-3ab0cddc15cf/go.mod h1:zwZhaE4ZeglpTrbmbwdnVPMI5XdRu2RmByi3Qn0ztmU= -github.com/iotaledger/hive.go v0.0.0-20200617164933-c48b4401b814 h1:9Cg6q13ngg3/UxBPQjdDREYr+792HO8SN8KzI1t7xFY= -github.com/iotaledger/hive.go v0.0.0-20200617164933-c48b4401b814/go.mod h1:zwZhaE4ZeglpTrbmbwdnVPMI5XdRu2RmByi3Qn0ztmU= +github.com/iotaledger/hive.go v0.0.0-20200618111525-99e5fb050f12 h1:rWYQqY7Pm2qOPtYO7LnerrrlfCWvpkAcCGxQHqz8q9Y= +github.com/iotaledger/hive.go v0.0.0-20200618111525-99e5fb050f12/go.mod h1:zwZhaE4ZeglpTrbmbwdnVPMI5XdRu2RmByi3Qn0ztmU= github.com/iotaledger/iota.go v1.0.0-beta.9/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8= github.com/iotaledger/iota.go v1.0.0-beta.14 h1:Oeb28MfBuJEeXcGrLhTCJFtbsnc8y1u7xidsAmiOD5A= github.com/iotaledger/iota.go v1.0.0-beta.14/go.mod h1:F6WBmYd98mVjAmmPVYhnxg8NNIWCjjH8VWT9qvv3Rc8= diff --git a/plugins/analysis/client/connector.go b/plugins/analysis/client/connector.go new file mode 100644 index 0000000000000000000000000000000000000000..cd9da66a1b43d1a5b1f6b43c9c45fc63384d3e6d --- /dev/null +++ b/plugins/analysis/client/connector.go @@ -0,0 +1,115 @@ +package client + +import ( + "errors" + "net" + "sync" + "time" + + "github.com/iotaledger/hive.go/events" + "github.com/iotaledger/hive.go/netutil" + "github.com/iotaledger/hive.go/network" +) + +// errors returned by the Connector +var ( + errNoConnection = errors.New("no connection established") +) + +// time after which a failed Dial is retried +var redialInterval = 1 * time.Minute + +// A Connector is a redialing Writer on the underlying connection. +type Connector struct { + network string + address string + + mu sync.Mutex + conn *network.ManagedConnection + + startOnce sync.Once + stopOnce sync.Once + stopping chan struct{} +} + +// NewConnector creates a new Connector. +func NewConnector(network string, address string) *Connector { + return &Connector{ + network: network, + address: address, + stopping: make(chan struct{}), + } +} + +// Start starts the Connector. +func (c *Connector) Start() { + c.startOnce.Do(c.dial) +} + +// Stop stops the Connector. +func (c *Connector) Stop() { + c.stopOnce.Do(func() { + close(c.stopping) + _ = c.Close() + }) +} + +// Close closes the current connection. +// If the Connector is not closed, a new redial will be triggered. +func (c *Connector) Close() (err error) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.conn != nil { + err = c.conn.Close() + } + return +} + +// Write writes data to the underlying connection. +// It returns an error if currently no connection has been established. +func (c *Connector) Write(b []byte) (int, error) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.conn == nil { + return 0, errNoConnection + } + n, err := c.conn.Write(b) + // TODO: should the closing rather only happen outside? + // TODO: is the IsTemporaryError useful here? + if err != nil && !netutil.IsTemporaryError(err) { + _ = c.conn.Close() + } + return n, err +} + +func (c *Connector) dial() { + c.mu.Lock() + defer c.mu.Unlock() + + select { + case <-c.stopping: + return + default: + c.conn = nil + conn, err := net.Dial(c.network, c.address) + if err != nil { + go c.scheduleRedial() + return + } + c.conn = network.NewManagedConnection(conn) + c.conn.Events.Close.Attach(events.NewClosure(c.dial)) + } +} + +func (c *Connector) scheduleRedial() { + t := time.NewTimer(redialInterval) + defer t.Stop() + select { + case <-c.stopping: + return + case <-t.C: + c.dial() + } +} diff --git a/plugins/analysis/client/fpc_heartbeato.go b/plugins/analysis/client/fpc_heartbeato.go index 1cedde60d07e67715c0c61b2531169d785cf656a..582a9feebc8f6e9e8f7d10033252cdccb0a6fd0a 100644 --- a/plugins/analysis/client/fpc_heartbeato.go +++ b/plugins/analysis/client/fpc_heartbeato.go @@ -30,9 +30,6 @@ func onRoundExecuted(roundStats *vote.RoundStats) { chunks := splitFPCVoteContext(roundStats.ActiveVoteContexts) - connLock.Lock() - defer connLock.Unlock() - for _, chunk := range chunks { // abort if empty round if len(chunk) == 0 { @@ -63,7 +60,7 @@ func onRoundExecuted(roundStats *vote.RoundStats) { log.Info("Client: onRoundExecuted data size: ", len(data)) - if _, err = managedConn.Write(data); err != nil { + if _, err = conn.Write(data); err != nil { log.Debugw("Error while writing to connection", "Description", err) return } diff --git a/plugins/analysis/client/heartbeat.go b/plugins/analysis/client/heartbeat.go index 365562bc4e32170f2527f2c7454e0f95b8e05261..d719856b7dfbae70ce915093faa75e05230a1602 100644 --- a/plugins/analysis/client/heartbeat.go +++ b/plugins/analysis/client/heartbeat.go @@ -1,13 +1,13 @@ package client import ( + "io" "strings" "github.com/iotaledger/goshimmer/packages/metrics" "github.com/iotaledger/goshimmer/plugins/analysis/packet" "github.com/iotaledger/goshimmer/plugins/autopeering" "github.com/iotaledger/goshimmer/plugins/autopeering/local" - "github.com/iotaledger/hive.go/network" "github.com/mr-tron/base58" ) @@ -17,7 +17,7 @@ type EventDispatchers struct { Heartbeat func(heartbeat *packet.Heartbeat) } -func sendHeartbeat(conn *network.ManagedConnection, hb *packet.Heartbeat) { +func sendHeartbeat(w io.Writer, hb *packet.Heartbeat) { var out strings.Builder for _, value := range hb.OutboundIDs { out.WriteString(base58.Encode(value)) @@ -39,9 +39,7 @@ func sendHeartbeat(conn *network.ManagedConnection, hb *packet.Heartbeat) { return } - connLock.Lock() - defer connLock.Unlock() - if _, err = conn.Write(data); err != nil { + if _, err = w.Write(data); err != nil { log.Debugw("Error while writing to connection", "Description", err) } // trigger AnalysisOutboundBytes event diff --git a/plugins/analysis/client/metric_heartbeat.go b/plugins/analysis/client/metric_heartbeat.go index 46d06ef15db4575824c12322d3d50cff4fdfbaf8..d3f2509289b4314eee2c65fa8e93c10468b61cac 100644 --- a/plugins/analysis/client/metric_heartbeat.go +++ b/plugins/analysis/client/metric_heartbeat.go @@ -1,26 +1,24 @@ package client import ( + "io" "runtime" "time" "github.com/iotaledger/goshimmer/packages/metrics" "github.com/iotaledger/goshimmer/plugins/analysis/packet" "github.com/iotaledger/goshimmer/plugins/autopeering/local" - "github.com/iotaledger/hive.go/network" "github.com/shirou/gopsutil/cpu" ) -func sendMetricHeartbeat(conn *network.ManagedConnection, hb *packet.MetricHeartbeat) { +func sendMetricHeartbeat(w io.Writer, hb *packet.MetricHeartbeat) { data, err := packet.NewMetricHeartbeatMessage(hb) if err != nil { log.Info(err, " - metric heartbeat message skipped") return } - connLock.Lock() - defer connLock.Unlock() - if _, err = conn.Write(data); err != nil { + if _, err = w.Write(data); err != nil { log.Debugw("Error while writing to connection", "Description", err) } // trigger AnalysisOutboundBytes event diff --git a/plugins/analysis/client/plugin.go b/plugins/analysis/client/plugin.go index e79fc801b293b3864392f1dc98baa2d233d18d4d..fd10a9269d2317773faf36ee30b9bbd9f0970a72 100644 --- a/plugins/analysis/client/plugin.go +++ b/plugins/analysis/client/plugin.go @@ -1,8 +1,6 @@ package client import ( - "net" - "sync" "time" "github.com/iotaledger/goshimmer/dapps/valuetransfers" @@ -12,7 +10,6 @@ import ( "github.com/iotaledger/hive.go/daemon" "github.com/iotaledger/hive.go/events" "github.com/iotaledger/hive.go/logger" - "github.com/iotaledger/hive.go/network" "github.com/iotaledger/hive.go/node" flag "github.com/spf13/pflag" ) @@ -34,25 +31,19 @@ func init() { var ( // Plugin is the plugin instance of the analysis client plugin. - Plugin = node.NewPlugin(PluginName, node.Enabled, run) - log *logger.Logger - managedConn *network.ManagedConnection - connLock sync.Mutex + Plugin = node.NewPlugin(PluginName, node.Enabled, run) + conn = NewConnector("tcp", config.Node.GetString(CfgServerAddress)) + + log *logger.Logger ) func run(_ *node.Plugin) { finalized = make(map[string]vote.Opinion) log = logger.NewLogger(PluginName) - conn, err := net.Dial("tcp", config.Node.GetString(CfgServerAddress)) - if err != nil { - log.Debugf("Could not connect to reporting server: %s", err.Error()) - return - } - - managedConn = network.NewManagedConnection(conn) - if err := daemon.BackgroundWorker(PluginName, func(shutdownSignal <-chan struct{}) { + conn.Start() + defer conn.Stop() onFinalizedClosure := events.NewClosure(onFinalized) valuetransfers.Voter().Events().Finalized.Attach(onFinalizedClosure) @@ -70,8 +61,8 @@ func run(_ *node.Plugin) { return case <-ticker.C: - sendHeartbeat(managedConn, createHeartbeat()) - sendMetricHeartbeat(managedConn, createMetricHeartbeat()) + sendHeartbeat(conn, createHeartbeat()) + sendMetricHeartbeat(conn, createMetricHeartbeat()) } } }, shutdown.PriorityAnalysis); err != nil { diff --git a/plugins/analysis/server/plugin.go b/plugins/analysis/server/plugin.go index 30e317a223e9e75949af8727de436c4086b13f17..12821d3331c980618f4d303ca13a6ab39ff0293e 100644 --- a/plugins/analysis/server/plugin.go +++ b/plugins/analysis/server/plugin.go @@ -1,8 +1,10 @@ package server import ( + "io" "net" "strconv" + "strings" "time" "github.com/iotaledger/goshimmer/packages/shutdown" @@ -26,7 +28,7 @@ const ( CfgAnalysisServerBindAddress = "analysis.server.bindAddress" // IdleTimeout defines the idle timeout. - IdleTimeout = 10 * time.Second + IdleTimeout = 1 * time.Minute ) func init() { @@ -37,6 +39,7 @@ var ( // Plugin is the plugin instance of the analysis server plugin. Plugin = node.NewPlugin(PluginName, node.Disabled, configure, run) server *tcp.TCPServer + prot *protocol.Protocol log *logger.Logger ) @@ -64,7 +67,13 @@ func run(_ *node.Plugin) { if err := daemon.BackgroundWorker(PluginName, func(shutdownSignal <-chan struct{}) { log.Infof("%s started, bind-address=%s", PluginName, bindAddr) defer log.Infof("Stopping %s ... done", PluginName) + + // connect protocol events to processors + prot = protocol.New(packet.AnalysisMsgRegistry) + wireUp(prot) + go server.Listen(addr, port) + <-shutdownSignal log.Info("Stopping Server ...") server.Shutdown() @@ -75,76 +84,70 @@ func run(_ *node.Plugin) { // HandleConnection handles the given connection. func HandleConnection(conn *network.ManagedConnection) { - err := conn.SetTimeout(IdleTimeout) + err := conn.SetReadTimeout(IdleTimeout) if err != nil { - log.Errorf(err.Error()) + log.Debugw("Error setting read timeout; closing connection", "err", err) + _ = conn.Close() + return } - - // create new protocol instance - p := protocol.New(conn, packet.AnalysisMsgRegistry) - onReceiveData := events.NewClosure(func(data []byte) { - // process incoming data in protocol.Receive() - p.Receive(data) + _, err := prot.Read(data) + if err != nil { + log.Debugw("Invalid message received; closing connection", "err", err) + _ = conn.Close() + } }) - - var onDisconnect *events.Closure - onDisconnect = events.NewClosure(func() { - conn.Events.ReceiveData.Detach(onReceiveData) - conn.Events.Close.Detach(onDisconnect) - }) - conn.Events.ReceiveData.Attach(onReceiveData) - conn.Events.Close.Attach(onDisconnect) - - // connect protocol events to processors - wireUp(p) - // starts the protocol and reads from its connection - go p.Start() + go func() { + buffer := make([]byte, 2048) + _, err := conn.Read(buffer) + if err != nil && err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") { + log.Debugw("Read error", "err", err) + } + // always close the connection when we've stopped reading from it + _ = conn.Close() + }() } // wireUp connects the Received events of the protocol to the packet specific processor func wireUp(p *protocol.Protocol) { p.Events.Received[packet.MessageTypeHeartbeat].Attach(events.NewClosure(func(data []byte) { - processHeartbeatPacket(data, p) + processHeartbeatPacket(data) })) p.Events.Received[packet.MessageTypeFPCHeartbeat].Attach(events.NewClosure(func(data []byte) { - processFPCHeartbeatPacket(data, p) + processFPCHeartbeatPacket(data) })) p.Events.Received[packet.MessageTypeMetricHeartbeat].Attach(events.NewClosure(func(data []byte) { - processMetricHeartbeatPacket(data, p) + processMetricHeartbeatPacket(data) })) } // processHeartbeatPacket parses the serialized data into a Heartbeat packet and triggers its event -func processHeartbeatPacket(data []byte, p *protocol.Protocol) { +func processHeartbeatPacket(data []byte) { heartbeatPacket, err := packet.ParseHeartbeat(data) if err != nil { Events.Error.Trigger(err) - p.CloseConnection() return } Events.Heartbeat.Trigger(heartbeatPacket) } // processHeartbeatPacket parses the serialized data into a FPC Heartbeat packet and triggers its event -func processFPCHeartbeatPacket(data []byte, p *protocol.Protocol) { +func processFPCHeartbeatPacket(data []byte) { hb, err := packet.ParseFPCHeartbeat(data) if err != nil { Events.Error.Trigger(err) - p.CloseConnection() return } Events.FPCHeartbeat.Trigger(hb) } -// processMetricHeartbeatPacket parses the serialized data into a MEtric Heartbeat packet and triggers its event -func processMetricHeartbeatPacket(data []byte, p *protocol.Protocol) { +// processMetricHeartbeatPacket parses the serialized data into a Metric Heartbeat packet and triggers its event +func processMetricHeartbeatPacket(data []byte) { hb, err := packet.ParseMetricHeartbeat(data) if err != nil { Events.Error.Trigger(err) - p.CloseConnection() return } Events.MetricHeartbeat.Trigger(hb)