Skip to content

Commit 8499125

Browse files
authored
fix: fix address already use (#634)
1 parent 8788e44 commit 8499125

File tree

2 files changed

+277
-15
lines changed

2 files changed

+277
-15
lines changed

internal/core/debugging_runtime/server.go

Lines changed: 59 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"os/signal"
88
"sync"
99
"syscall"
10+
"time"
1011

1112
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/media_transport"
1213
"github.com/langgenius/dify-plugin-daemon/internal/types/app"
@@ -24,42 +25,85 @@ type RemotePluginServerInterface interface {
2425
Launch() error
2526
}
2627

27-
// Stop stops the server
28+
// Stop stops the server gracefully
2829
func (r *RemotePluginServer) Stop() error {
29-
err := r.server.engine.Stop(context.Background())
30+
if r.server == nil {
31+
return nil
32+
}
33+
34+
// Create a context with timeout for graceful shutdown
35+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
36+
defer cancel()
37+
38+
err := r.server.engine.Stop(ctx)
3039
if err == gnet_errors.ErrEmptyEngine || err == gnet_errors.ErrEngineInShutdown {
3140
return nil
3241
}
3342

34-
return err
43+
if err != nil {
44+
return fmt.Errorf("failed to stop server gracefully: %w", err)
45+
}
46+
47+
return nil
3548
}
3649

3750
// Launch starts the server
3851
func (r *RemotePluginServer) Launch() error {
39-
err := gnet.Run(
40-
r.server, r.server.addr, gnet.WithMulticore(r.server.multicore),
41-
gnet.WithNumEventLoop(r.server.numLoops),
42-
gnet.WithLogger(GnetLogger{}),
43-
)
52+
// Try to start the server with retry mechanism
53+
// This handles the case where the port is in TIME_WAIT state after a crash
54+
maxRetries := 3
55+
var err error
56+
57+
for i := 0; i < maxRetries; i++ {
58+
err = gnet.Run(
59+
r.server, r.server.addr,
60+
gnet.WithMulticore(r.server.multicore),
61+
gnet.WithNumEventLoop(r.server.numLoops),
62+
gnet.WithLogger(GnetLogger{}),
63+
gnet.WithReuseAddr(true),
64+
gnet.WithReusePort(true),
65+
)
66+
67+
if err == nil {
68+
break
69+
}
70+
71+
// If this is the last retry, don't wait
72+
if i < maxRetries-1 {
73+
waitTime := (i + 1) * 2
74+
GnetLogger{}.Warnf("Failed to bind to %s (attempt %d/%d): %v, retrying in %d seconds...\n",
75+
r.server.addr, i+1, maxRetries, err, waitTime)
76+
time.Sleep(time.Duration(waitTime) * time.Second)
77+
}
78+
}
4479

4580
if err != nil {
46-
r.Stop()
81+
err := r.Stop()
82+
if err != nil {
83+
return err
84+
}
85+
return fmt.Errorf("failed to start server after %d attempts: %w", maxRetries, err)
4786
}
4887

4988
// collect shutdown signal
5089
go r.collectShutdownSignal()
5190

52-
return err
91+
return nil
5392
}
5493

55-
func (s *RemotePluginServer) collectShutdownSignal() {
94+
func (r *RemotePluginServer) collectShutdownSignal() {
5695
c := make(chan os.Signal, 1)
57-
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
96+
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
5897

59-
<-c
98+
sig := <-c
99+
fmt.Printf("\nReceived signal %v, shutting down server gracefully...\n", sig)
60100

61-
// shutdown server
62-
s.Stop()
101+
// shutdown server with timeout
102+
if err := r.Stop(); err != nil {
103+
fmt.Printf("Error shutting down server: %v\n", err)
104+
} else {
105+
fmt.Println("Server shut down successfully")
106+
}
63107
}
64108

65109
// NewDebuggingPluginServer creates a new RemotePluginServer

internal/core/debugging_runtime/server_test.go

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,3 +406,221 @@ func TestIncorrectHandshake(t *testing.T) {
406406
return
407407
}
408408
}
409+
410+
411+
// TestServerStopWithNilServer tests stopping a server with nil server field
412+
func TestServerStopWithNilServer(t *testing.T) {
413+
server := &RemotePluginServer{}
414+
err := server.Stop()
415+
if err != nil {
416+
t.Errorf("expected no error when stopping nil server, got: %v", err)
417+
}
418+
}
419+
420+
// TestServerStartAndStop tests basic server start and stop
421+
func TestServerStartAndStop(t *testing.T) {
422+
port, err := network.GetRandomPort()
423+
if err != nil {
424+
t.Fatalf("failed to get random port: %v", err)
425+
}
426+
427+
oss, err := factory.Load("local", cloudoss.OSSArgs{
428+
Local: &cloudoss.Local{
429+
Path: "./storage",
430+
},
431+
})
432+
if err != nil {
433+
t.Fatalf("failed to load local storage: %v", err)
434+
}
435+
436+
config := &app.Config{
437+
PluginRemoteInstallingHost: "127.0.0.1",
438+
PluginRemoteInstallingPort: port,
439+
PluginRemoteInstallingMaxConn: 1,
440+
PluginRemoteInstallServerEventLoopNums: 1,
441+
}
442+
443+
server := NewDebuggingPluginServer(config, media_transport.NewAssetsBucket(oss, "assets", 10))
444+
445+
// Start server in background
446+
done := make(chan error, 1)
447+
go func() {
448+
done <- server.Launch()
449+
}()
450+
451+
// Wait for server to start
452+
time.Sleep(1 * time.Second)
453+
454+
// Verify server is listening
455+
conn, err := net.DialTimeout("tcp", fmt.Sprintf("127.0.0.1:%d", port), 500*time.Millisecond)
456+
if err != nil {
457+
t.Fatalf("server not listening after 1s: %v", err)
458+
}
459+
conn.Close()
460+
461+
// Stop server
462+
if err := server.Stop(); err != nil {
463+
t.Errorf("failed to stop server: %v", err)
464+
}
465+
466+
// Wait for launch to return
467+
select {
468+
case err := <-done:
469+
if err != nil {
470+
t.Logf("launch returned error (expected): %v", err)
471+
}
472+
case <-time.After(3 * time.Second):
473+
t.Error("launch did not return after stop")
474+
}
475+
476+
// Verify server is not listening
477+
time.Sleep(100 * time.Millisecond)
478+
conn, err = net.DialTimeout("tcp", fmt.Sprintf("127.0.0.1:%d", port), 500*time.Millisecond)
479+
if err == nil {
480+
conn.Close()
481+
t.Error("server still listening after stop")
482+
}
483+
}
484+
485+
// TestServerStopIdempotent tests that stopping multiple times is safe
486+
func TestServerStopIdempotent(t *testing.T) {
487+
port, err := network.GetRandomPort()
488+
if err != nil {
489+
t.Fatalf("failed to get random port: %v", err)
490+
}
491+
492+
oss, err := factory.Load("local", cloudoss.OSSArgs{
493+
Local: &cloudoss.Local{
494+
Path: "./storage",
495+
},
496+
})
497+
if err != nil {
498+
t.Fatalf("failed to load local storage: %v", err)
499+
}
500+
501+
config := &app.Config{
502+
PluginRemoteInstallingHost: "127.0.0.1",
503+
PluginRemoteInstallingPort: port,
504+
PluginRemoteInstallingMaxConn: 1,
505+
PluginRemoteInstallServerEventLoopNums: 1,
506+
}
507+
508+
server := NewDebuggingPluginServer(config, media_transport.NewAssetsBucket(oss, "assets", 10))
509+
510+
// Start server
511+
go func() {
512+
server.Launch()
513+
}()
514+
515+
time.Sleep(1 * time.Second)
516+
517+
// Stop multiple times
518+
for i := 0; i < 3; i++ {
519+
if err := server.Stop(); err != nil {
520+
t.Errorf("stop %d failed: %v", i+1, err)
521+
}
522+
}
523+
}
524+
525+
// TestServerQuickRestart tests immediate restart after stop
526+
func TestServerQuickRestart(t *testing.T) {
527+
port, err := network.GetRandomPort()
528+
if err != nil {
529+
t.Fatalf("failed to get random port: %v", err)
530+
}
531+
532+
oss, err := factory.Load("local", cloudoss.OSSArgs{
533+
Local: &cloudoss.Local{
534+
Path: "./storage",
535+
},
536+
})
537+
if err != nil {
538+
t.Fatalf("failed to load local storage: %v", err)
539+
}
540+
541+
config := &app.Config{
542+
PluginRemoteInstallingHost: "127.0.0.1",
543+
PluginRemoteInstallingPort: port,
544+
PluginRemoteInstallingMaxConn: 1,
545+
PluginRemoteInstallServerEventLoopNums: 1,
546+
}
547+
548+
// First server
549+
server1 := NewDebuggingPluginServer(config, media_transport.NewAssetsBucket(oss, "assets", 10))
550+
go server1.Launch()
551+
time.Sleep(1 * time.Second)
552+
553+
// Verify first server is listening
554+
conn, err := net.DialTimeout("tcp", fmt.Sprintf("127.0.0.1:%d", port), 500*time.Millisecond)
555+
if err != nil {
556+
t.Fatalf("first server not listening: %v", err)
557+
}
558+
conn.Close()
559+
560+
// Stop first server
561+
server1.Stop()
562+
time.Sleep(200 * time.Millisecond)
563+
564+
// Start second server on same port
565+
server2 := NewDebuggingPluginServer(config, media_transport.NewAssetsBucket(oss, "assets", 10))
566+
go server2.Launch()
567+
time.Sleep(1 * time.Second)
568+
569+
// Verify second server is listening
570+
conn, err = net.DialTimeout("tcp", fmt.Sprintf("127.0.0.1:%d", port), 500*time.Millisecond)
571+
if err != nil {
572+
t.Errorf("second server not listening: %v", err)
573+
} else {
574+
conn.Close()
575+
}
576+
577+
// Cleanup
578+
server2.Stop()
579+
}
580+
581+
// TestServerStopConcurrent tests concurrent stop calls
582+
func TestServerStopConcurrent(t *testing.T) {
583+
port, err := network.GetRandomPort()
584+
if err != nil {
585+
t.Fatalf("failed to get random port: %v", err)
586+
}
587+
588+
oss, err := factory.Load("local", cloudoss.OSSArgs{
589+
Local: &cloudoss.Local{
590+
Path: "./storage",
591+
},
592+
})
593+
if err != nil {
594+
t.Fatalf("failed to load local storage: %v", err)
595+
}
596+
597+
config := &app.Config{
598+
PluginRemoteInstallingHost: "127.0.0.1",
599+
PluginRemoteInstallingPort: port,
600+
PluginRemoteInstallingMaxConn: 1,
601+
PluginRemoteInstallServerEventLoopNums: 1,
602+
}
603+
604+
server := NewDebuggingPluginServer(config, media_transport.NewAssetsBucket(oss, "assets", 10))
605+
go server.Launch()
606+
time.Sleep(1 * time.Second)
607+
608+
// Concurrent stops
609+
done := make(chan struct{})
610+
for i := 0; i < 3; i++ {
611+
go func() {
612+
server.Stop()
613+
done <- struct{}{}
614+
}()
615+
}
616+
617+
// Wait for all stops to complete
618+
for i := 0; i < 3; i++ {
619+
<-done
620+
}
621+
}
622+
623+
// TestServerLaunchWithRetry is skipped due to long execution time
624+
func TestServerLaunchWithRetry(t *testing.T) {
625+
t.Skip("Skipping - requires 12+ seconds for retry mechanism")
626+
}

0 commit comments

Comments
 (0)