Skip to content
Snippets Groups Projects
Unverified Commit 6d399a65 authored by Angelo Capossele's avatar Angelo Capossele Committed by GitHub
Browse files

Fix graceful shutdown (#527)

* :bug: Fix grpc server race condition

* :bug: Fix graceful shutdown

* :ok_hand: Add nil check

* :recycle:

 Refactor FPC gRPC server

* stop worker on server issues

Co-authored-by: default avatarWolfgang Welz <welzwo@gmail.com>
parent fb989c77
No related branches found
No related tags found
No related merge requests found
...@@ -101,7 +101,10 @@ func configureFPC() { ...@@ -101,7 +101,10 @@ func configureFPC() {
} }
func runFPC() { func runFPC() {
if err := daemon.BackgroundWorker("FPCVoterServer", func(shutdownSignal <-chan struct{}) { const ServerWorkerName = "FPCVoterServer"
if err := daemon.BackgroundWorker(ServerWorkerName, func(shutdownSignal <-chan struct{}) {
stopped := make(chan struct{})
bindAddr := config.Node.GetString(CfgFPCBindAddress)
voterServer = votenet.New(Voter(), func(id string) vote.Opinion { voterServer = votenet.New(Voter(), func(id string) vote.Opinion {
branchID, err := branchmanager.BranchIDFromBase58(id) branchID, err := branchmanager.BranchIDFromBase58(id)
if err != nil { if err != nil {
...@@ -123,21 +126,29 @@ func runFPC() { ...@@ -123,21 +126,29 @@ func runFPC() {
} }
return vote.Like return vote.Like
}, config.Node.GetString(CfgFPCBindAddress), }, bindAddr,
metrics.Events().FPCInboundBytes, metrics.Events().FPCInboundBytes,
metrics.Events().FPCOutboundBytes, metrics.Events().FPCOutboundBytes,
metrics.Events().QueryReceived) metrics.Events().QueryReceived,
)
go func() { go func() {
log.Infof("%s started, bind-address=%s", ServerWorkerName, bindAddr)
if err := voterServer.Run(); err != nil { if err := voterServer.Run(); err != nil {
log.Error(err) log.Errorf("Error serving: %s", err)
} }
close(stopped)
}() }()
log.Infof("Started vote server on %s", config.Node.GetString(CfgFPCBindAddress)) // stop if we are shutting down or the server could not be started
<-shutdownSignal select {
case <-shutdownSignal:
case <-stopped:
}
log.Infof("Stopping %s ...", ServerWorkerName)
voterServer.Shutdown() voterServer.Shutdown()
log.Info("Stopped vote server") log.Infof("Stopping %s ... done", ServerWorkerName)
}, shutdown.PriorityFPC); err != nil { }, shutdown.PriorityFPC); err != nil {
log.Panicf("Failed to start as daemon: %s", err) log.Panicf("Failed to start as daemon: %s", err)
} }
......
...@@ -21,6 +21,7 @@ func New(voter vote.Voter, opnRetriever OpinionRetriever, bindAddr string, netRx ...@@ -21,6 +21,7 @@ func New(voter vote.Voter, opnRetriever OpinionRetriever, bindAddr string, netRx
voter: voter, voter: voter,
opnRetriever: opnRetriever, opnRetriever: opnRetriever,
bindAddr: bindAddr, bindAddr: bindAddr,
grpcServer: grpc.NewServer(),
netRxEvent: netRxEvent, netRxEvent: netRxEvent,
netTxEvent: netTxEvent, netTxEvent: netTxEvent,
queryReceivedEvent: queryReceivedEvent, queryReceivedEvent: queryReceivedEvent,
...@@ -71,9 +72,7 @@ func (vs *VoterServer) Run() error { ...@@ -71,9 +72,7 @@ func (vs *VoterServer) Run() error {
return err return err
} }
vs.grpcServer = grpc.NewServer()
RegisterVoterQueryServer(vs.grpcServer, vs) RegisterVoterQueryServer(vs.grpcServer, vs)
return vs.grpcServer.Serve(listener) return vs.grpcServer.Serve(listener)
} }
......
...@@ -93,7 +93,7 @@ func (c *Connector) dial() { ...@@ -93,7 +93,7 @@ func (c *Connector) dial() {
return return
default: default:
c.conn = nil c.conn = nil
conn, err := net.Dial(c.network, c.address) conn, err := net.DialTimeout(c.network, c.address, 5*time.Second)
if err != nil { if err != nil {
go c.scheduleRedial() go c.scheduleRedial()
return return
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment