Skip to content

Commit 47b5ad3

Browse files
committed
feat: graceful shutdown with drain signal and browser cleanup
1 parent 0d4e2da commit 47b5ad3

5 files changed

Lines changed: 234 additions & 13 deletions

File tree

cmd/serve.go

Lines changed: 95 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@ package cmd
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"os"
8+
"os/signal"
79
"strings"
810
"sync"
11+
"syscall"
912
"time"
1013

1114
"github.com/karust/openserp/baidu"
@@ -96,7 +99,7 @@ func serve(cmd *cobra.Command, args []string) {
9699
&rawEngine{name: "yandex"},
97100
&rawEngine{name: "baidu"},
98101
)
99-
if err := serv.Listen(); err != nil {
102+
if err := listenWithGracefulShutdown(serv, nil); err != nil {
100103
logrus.Error(err)
101104
}
102105
return
@@ -117,15 +120,15 @@ func serve(cmd *cobra.Command, args []string) {
117120
baseOpts.IsHeadless = false
118121
}
119122

120-
engines, err := buildBrowserEngines(baseOpts, proxyCfg)
123+
engines, closeBrowsers, err := buildBrowserEngines(baseOpts, proxyCfg)
121124
if err != nil {
122125
logrus.Error(err)
123126
return
124127
}
125128

126129
serverOpts := buildServerOptions(corsCfg, proxyCfg)
127130
serv := core.NewServerWithOptions(config.Server.Host, config.Server.Port, serverOpts, engines...)
128-
if err := serv.Listen(); err != nil {
131+
if err := listenWithGracefulShutdown(serv, closeBrowsers); err != nil {
129132
logrus.Error(err)
130133
}
131134
}
@@ -154,6 +157,71 @@ func buildServerOptions(corsCfg core.CORSConfig, proxyCfg core.ProxyConfig) core
154157
}
155158
}
156159

160+
const gracefulShutdownTimeout = 30 * time.Second
161+
162+
func listenWithGracefulShutdown(serv *core.Server, onShutdown func() error) error {
163+
listenErrCh := make(chan error, 1)
164+
go func() {
165+
listenErrCh <- serv.Listen()
166+
}()
167+
168+
sigCh := make(chan os.Signal, 1)
169+
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
170+
defer signal.Stop(sigCh)
171+
172+
select {
173+
case err := <-listenErrCh:
174+
return err
175+
case sig := <-sigCh:
176+
logrus.WithField("signal", sig.String()).Info("Shutdown signal received, draining traffic")
177+
}
178+
179+
serv.SetDraining(true)
180+
181+
shutdownErr := serv.ShutdownWithTimeout(gracefulShutdownTimeout)
182+
if isServerNotRunningError(shutdownErr) {
183+
shutdownErr = nil
184+
}
185+
186+
if onShutdown != nil {
187+
resourceErr := onShutdown()
188+
if resourceErr != nil {
189+
shutdownErr = errors.Join(shutdownErr, resourceErr)
190+
}
191+
}
192+
193+
if listenErr := waitForListenExit(listenErrCh); listenErr != nil && !isExpectedListenShutdownError(listenErr) {
194+
shutdownErr = errors.Join(shutdownErr, listenErr)
195+
}
196+
197+
return shutdownErr
198+
}
199+
200+
func waitForListenExit(listenErrCh <-chan error) error {
201+
select {
202+
case err := <-listenErrCh:
203+
return err
204+
case <-time.After(time.Second):
205+
return nil
206+
}
207+
}
208+
209+
func isExpectedListenShutdownError(err error) bool {
210+
if err == nil {
211+
return true
212+
}
213+
msg := strings.ToLower(err.Error())
214+
return strings.Contains(msg, "server closed") ||
215+
strings.Contains(msg, "closed network connection")
216+
}
217+
218+
func isServerNotRunningError(err error) bool {
219+
if err == nil {
220+
return false
221+
}
222+
return strings.Contains(strings.ToLower(err.Error()), "server is not running")
223+
}
224+
157225
type browserPool struct {
158226
mu sync.Mutex
159227
base core.BrowserOpts
@@ -193,6 +261,27 @@ func (p *browserPool) get(proxyURL string) (*core.Browser, error) {
193261
return b, nil
194262
}
195263

264+
func (p *browserPool) close() error {
265+
p.mu.Lock()
266+
browsers := make([]*core.Browser, 0, len(p.browser))
267+
for key, b := range p.browser {
268+
browsers = append(browsers, b)
269+
delete(p.browser, key)
270+
}
271+
p.mu.Unlock()
272+
273+
var closeErr error
274+
for _, browser := range browsers {
275+
if browser == nil {
276+
continue
277+
}
278+
if err := browser.Close(); err != nil {
279+
closeErr = errors.Join(closeErr, err)
280+
}
281+
}
282+
return closeErr
283+
}
284+
196285
type pooledBrowserEngine struct {
197286
name string
198287
limiter *rate.Limiter
@@ -301,15 +390,15 @@ func browserEngineSpecs() []browserEngineSpec {
301390
}
302391
}
303392

304-
func buildBrowserEngines(baseOpts core.BrowserOpts, proxyCfg core.ProxyConfig) ([]core.SearchEngine, error) {
393+
func buildBrowserEngines(baseOpts core.BrowserOpts, proxyCfg core.ProxyConfig) ([]core.SearchEngine, func() error, error) {
305394
pool := newBrowserPool(baseOpts)
306395
specs := browserEngineSpecs()
307396

308397
engines := make([]core.SearchEngine, 0, len(specs))
309398
for _, spec := range specs {
310399
policy := resolveEngineProxyPolicy(proxyCfg, spec.name)
311400
if err := validateBrowserProxyPolicy(proxyCfg, policy); err != nil {
312-
return nil, fmt.Errorf("browser proxy validation failed for engine %s: %w", spec.name, err)
401+
return nil, nil, fmt.Errorf("browser proxy validation failed for engine %s: %w", spec.name, err)
313402
}
314403

315404
opts := spec.opts
@@ -324,7 +413,7 @@ func buildBrowserEngines(baseOpts core.BrowserOpts, proxyCfg core.ProxyConfig) (
324413
})
325414
}
326415

327-
return engines, nil
416+
return engines, pool.close, nil
328417
}
329418

330419
func validateBrowserProxyPolicy(proxyCfg core.ProxyConfig, policy core.ProxyPolicy) error {

core/browser.go

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -174,11 +174,7 @@ func resolveBrowserBinaryPath(browserPath string, lookPath func() (string, bool)
174174

175175
// IsInitialized reports whether the browser launcher has been created.
176176
func (b *Browser) IsInitialized() bool {
177-
if b.browserAddr != "" {
178-
return true
179-
} else {
180-
return false
181-
}
177+
return b.browserAddr != ""
182178
}
183179

184180
// Navigate connects to Chromium, creates a page, applies stealth/emulation and
@@ -317,7 +313,40 @@ func (b *Browser) Navigate(ctx context.Context, URL string) (*rod.Page, error) {
317313

318314
// Close closes the active browser connection.
319315
func (b *Browser) Close() error {
320-
return b.browser.Close()
316+
if b == nil || b.browserAddr == "" {
317+
return nil
318+
}
319+
320+
browser := b.browser
321+
if browser == nil {
322+
browser = rod.New().ControlURL(b.browserAddr)
323+
if b.Timeout > 0 {
324+
browser = browser.Timeout(b.Timeout)
325+
}
326+
if err := browser.Connect(); err != nil {
327+
if isBrowserClosedError(err) {
328+
return nil
329+
}
330+
return err
331+
}
332+
}
333+
334+
b.browser = nil
335+
if err := browser.Close(); err != nil && !isBrowserClosedError(err) {
336+
return err
337+
}
338+
return nil
339+
}
340+
341+
func isBrowserClosedError(err error) bool {
342+
if err == nil {
343+
return false
344+
}
345+
msg := strings.ToLower(err.Error())
346+
return strings.Contains(msg, "connection refused") ||
347+
strings.Contains(msg, "closed network connection") ||
348+
strings.Contains(msg, "target closed") ||
349+
strings.Contains(msg, "eof")
321350
}
322351

323352
// ClosePageWithTimeout bounds page close calls so shutdown paths don't hang.

core/server.go

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"runtime"
1010
"sort"
1111
"strings"
12+
"sync/atomic"
1213
"time"
1314

1415
"github.com/gofiber/fiber/v2"
@@ -43,6 +44,7 @@ type Server struct {
4344
resilient *ResilientSearcher
4445
startTime time.Time
4546
opts ServerOptions
47+
draining atomic.Bool
4648
}
4749

4850
// ServerOptions configures HTTP server middleware and resilience behavior.
@@ -98,6 +100,7 @@ func NewServerWithOptions(host string, port int, opts ServerOptions, searchEngin
98100
startTime: time.Now(),
99101
opts: opts,
100102
}
103+
serv.draining.Store(false)
101104
logrus.Info("Resilient search enabled: retry + circuit breaker")
102105
if opts.AllowEndpointFallback {
103106
logrus.Warn("Dedicated endpoint fallback is enabled")
@@ -120,6 +123,7 @@ func NewServerWithOptions(host string, port int, opts ServerOptions, searchEngin
120123
app.Get("/docs", serv.handleSwaggerUI)
121124
app.Get("/docs/", serv.handleSwaggerUI)
122125
app.Get("/health", serv.handleHealthCheck)
126+
app.Get("/ready", serv.handleReadinessCheck)
123127
app.Get("/stats", serv.handleStats)
124128
app.Get("/stats/cache", serv.handleCacheStats)
125129
app.Get("/stats/proxy", serv.handleProxyStats)
@@ -264,6 +268,12 @@ type HealthStatus struct {
264268
System map[string]interface{} `json:"system"`
265269
}
266270

271+
// ReadinessStatus is returned by /ready to indicate if this instance can
272+
// receive new traffic.
273+
type ReadinessStatus struct {
274+
Status string `json:"status"`
275+
}
276+
267277
// EngineHealth describes availability of one configured engine.
268278
type EngineHealth struct {
269279
Name string `json:"name"`
@@ -338,6 +348,15 @@ func (s *Server) handleHealthCheck(c *fiber.Ctx) error {
338348
return c.JSON(health)
339349
}
340350

351+
func (s *Server) handleReadinessCheck(c *fiber.Ctx) error {
352+
status := ReadinessStatus{Status: "ready"}
353+
if s.draining.Load() {
354+
status.Status = "draining"
355+
return c.Status(fiber.StatusServiceUnavailable).JSON(status)
356+
}
357+
return c.JSON(status)
358+
}
359+
341360
func (s *Server) handleStats(c *fiber.Ctx) error {
342361
return c.JSON(map[string]interface{}{
343362
"cache": s.cacheStatsPayload(),
@@ -672,12 +691,30 @@ func (s *Server) handleSwaggerUI(c *fiber.Ctx) error {
672691
return c.SendString(page)
673692
}
674693

694+
// SetDraining controls readiness state exposed by /ready.
695+
func (s *Server) SetDraining(draining bool) {
696+
s.draining.Store(draining)
697+
}
698+
699+
const defaultShutdownTimeout = 30 * time.Second
700+
675701
// Listen starts the Fiber HTTP server on the configured address.
676702
func (s *Server) Listen() error {
703+
s.SetDraining(false)
677704
return s.app.Listen(s.addr)
678705
}
679706

680707
// Shutdown gracefully stops the Fiber HTTP server.
681708
func (s *Server) Shutdown() error {
682-
return s.app.Shutdown()
709+
return s.ShutdownWithTimeout(defaultShutdownTimeout)
710+
}
711+
712+
// ShutdownWithTimeout drains the server before force-closing active
713+
// connections when timeout is exceeded.
714+
func (s *Server) ShutdownWithTimeout(timeout time.Duration) error {
715+
if timeout <= 0 {
716+
timeout = defaultShutdownTimeout
717+
}
718+
s.SetDraining(true)
719+
return s.app.ShutdownWithTimeout(timeout)
683720
}

core/server_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,37 @@ func TestHealthEndpointStatusSemantics(t *testing.T) {
408408
}
409409
}
410410

411+
func TestReadinessEndpointStatusSemantics(t *testing.T) {
412+
engine := &engineMock{name: "google", initialized: true}
413+
srv := NewServerWithOptions("127.0.0.1", 7077, DefaultServerOptions(), engine)
414+
415+
resp := request(t, srv, "/ready")
416+
if resp.StatusCode != http.StatusOK {
417+
t.Fatalf("expected ready endpoint to return 200 while serving, got %d", resp.StatusCode)
418+
}
419+
420+
var ready ReadinessStatus
421+
if err := json.NewDecoder(resp.Body).Decode(&ready); err != nil {
422+
t.Fatalf("decode ready response: %v", err)
423+
}
424+
if ready.Status != "ready" {
425+
t.Fatalf("expected readiness status=ready, got %q", ready.Status)
426+
}
427+
428+
srv.SetDraining(true)
429+
resp = request(t, srv, "/ready")
430+
if resp.StatusCode != http.StatusServiceUnavailable {
431+
t.Fatalf("expected ready endpoint to return 503 while draining, got %d", resp.StatusCode)
432+
}
433+
434+
if err := json.NewDecoder(resp.Body).Decode(&ready); err != nil {
435+
t.Fatalf("decode draining response: %v", err)
436+
}
437+
if ready.Status != "draining" {
438+
t.Fatalf("expected readiness status=draining, got %q", ready.Status)
439+
}
440+
}
441+
411442
func TestDedicatedEndpointNoFallbackByDefault(t *testing.T) {
412443
primary := &engineMock{
413444
name: "google",

0 commit comments

Comments
 (0)