Skip to content

Commit dca0681

Browse files
committed
feat: implement conn deadline
1 parent 590bf8e commit dca0681

5 files changed

Lines changed: 209 additions & 49 deletions

File tree

.github/workflows/pr-check.yml

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,17 @@ jobs:
66
compatibility-test:
77
strategy:
88
matrix:
9-
go: [ 1.18, 1.23 ]
10-
os: [ X64, ARM64 ]
9+
go: [ 1.18, 1.24 ]
10+
os: [ ubuntu-latest, ubuntu-24.04-arm, macos-latest ]
1111
runs-on: ${{ matrix.os }}
1212
steps:
1313
- uses: actions/checkout@v4
1414
- name: Set up Go
1515
uses: actions/setup-go@v5
1616
with:
1717
go-version: ${{ matrix.go }}
18-
cache: false
1918
- name: Unit Test
20-
run: go test -timeout=2m -race ./...
19+
run: go test -v -timeout=2m -race ./...
2120
- name: Benchmark
2221
run: go test -bench=. -benchmem -run=none ./... -benchtime=100ms
2322

@@ -46,7 +45,7 @@ jobs:
4645
uses: crate-ci/typos@v1.13.14
4746

4847
golangci-lint:
49-
runs-on: [ Linux, X64 ]
48+
runs-on: ubuntu-latest
5049
steps:
5150
- uses: actions/checkout@v4
5251
- name: Set up Go

connection_impl.go

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,12 @@ type connection struct {
3939
locker
4040
operator *FDOperator
4141
readTimeout time.Duration
42+
readDeadline int64 // UnixNano(). it overwrites readTimeout. 0 if not set.
4243
readTimer *time.Timer
4344
readTrigger chan error
4445
waitReadSize int64
4546
writeTimeout time.Duration
47+
writeDeadline int64 // UnixNano(). it overwrites writeTimeout. 0 if not set.
4648
writeTimer *time.Timer
4749
writeTrigger chan error
4850
inputBuffer *LinkBuffer
@@ -87,6 +89,7 @@ func (c *connection) SetReadTimeout(timeout time.Duration) error {
8789
if timeout >= 0 {
8890
c.readTimeout = timeout
8991
}
92+
atomic.StoreInt64(&c.readDeadline, 0)
9093
return nil
9194
}
9295

@@ -95,6 +98,38 @@ func (c *connection) SetWriteTimeout(timeout time.Duration) error {
9598
if timeout >= 0 {
9699
c.writeTimeout = timeout
97100
}
101+
atomic.StoreInt64(&c.writeDeadline, 0)
102+
return nil
103+
}
104+
105+
// SetDeadline implements net.Conn.SetDeadline
106+
func (c *connection) SetDeadline(t time.Time) error {
107+
v := int64(0)
108+
if !t.IsZero() {
109+
v = t.UnixNano()
110+
}
111+
atomic.StoreInt64(&c.readDeadline, v)
112+
atomic.StoreInt64(&c.writeDeadline, v)
113+
return nil
114+
}
115+
116+
// SetReadDeadline implements net.Conn.SetReadDeadline
117+
func (c *connection) SetReadDeadline(t time.Time) error {
118+
if t.IsZero() {
119+
atomic.StoreInt64(&c.readDeadline, 0)
120+
} else {
121+
atomic.StoreInt64(&c.readDeadline, t.UnixNano())
122+
}
123+
return nil
124+
}
125+
126+
// SetWriteDeadline implements net.Conn.SetWriteDeadline
127+
func (c *connection) SetWriteDeadline(t time.Time) error {
128+
if t.IsZero() {
129+
atomic.StoreInt64(&c.writeDeadline, 0)
130+
} else {
131+
atomic.StoreInt64(&c.writeDeadline, t.UnixNano())
132+
}
98133
return nil
99134
}
100135

@@ -408,8 +443,14 @@ func (c *connection) waitRead(n int) (err error) {
408443
}
409444
atomic.StoreInt64(&c.waitReadSize, int64(n))
410445
defer atomic.StoreInt64(&c.waitReadSize, 0)
411-
if c.readTimeout > 0 {
412-
return c.waitReadWithTimeout(n)
446+
if dl := atomic.LoadInt64(&c.readDeadline); dl > 0 {
447+
timeout := time.Duration(dl - time.Now().UnixNano())
448+
if timeout <= 0 {
449+
return Exception(ErrReadTimeout, c.remoteAddr.String())
450+
}
451+
return c.waitReadWithTimeout(n, timeout)
452+
} else if c.readTimeout > 0 {
453+
return c.waitReadWithTimeout(n, c.readTimeout)
413454
}
414455
// wait full n
415456
for c.inputBuffer.Len() < n {
@@ -429,12 +470,11 @@ func (c *connection) waitRead(n int) (err error) {
429470
}
430471

431472
// waitReadWithTimeout will wait full n bytes or until timeout.
432-
func (c *connection) waitReadWithTimeout(n int) (err error) {
433-
// set read timeout
473+
func (c *connection) waitReadWithTimeout(n int, timeout time.Duration) (err error) {
434474
if c.readTimer == nil {
435-
c.readTimer = time.NewTimer(c.readTimeout)
475+
c.readTimer = time.NewTimer(timeout)
436476
} else {
437-
c.readTimer.Reset(c.readTimeout)
477+
c.readTimer.Reset(timeout)
438478
}
439479

440480
for c.inputBuffer.Len() < n {
@@ -501,15 +541,22 @@ func (c *connection) flush() error {
501541
}
502542

503543
func (c *connection) waitFlush() (err error) {
504-
if c.writeTimeout == 0 {
544+
timeout := c.writeTimeout
545+
if dl := atomic.LoadInt64(&c.writeDeadline); dl > 0 {
546+
timeout = time.Duration(dl - time.Now().UnixNano())
547+
if timeout <= 0 {
548+
return Exception(ErrWriteTimeout, c.remoteAddr.String())
549+
}
550+
}
551+
if timeout == 0 {
505552
return <-c.writeTrigger
506553
}
507554

508555
// set write timeout
509556
if c.writeTimer == nil {
510-
c.writeTimer = time.NewTimer(c.writeTimeout)
557+
c.writeTimer = time.NewTimer(timeout)
511558
} else {
512-
c.writeTimer.Reset(c.writeTimeout)
559+
c.writeTimer.Reset(timeout)
513560
}
514561

515562
select {

connection_test.go

Lines changed: 148 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -350,29 +350,67 @@ func TestLargeBufferWrite(t *testing.T) {
350350
trigger <- 1
351351
}
352352

353-
func TestWriteTimeout(t *testing.T) {
354-
address := getTestAddress()
355-
ln, err := createTestListener("tcp", address)
353+
func TestConnectionTimeout(t *testing.T) {
354+
ln, err := net.Listen("tcp", "127.0.0.1:0")
356355
MustNil(t, err)
356+
defer ln.Close()
357+
358+
const (
359+
bufsz = 1 << 20
360+
interval = 10 * time.Millisecond
361+
)
362+
363+
calcRate := func(n int32) int32 {
364+
v := n / int32(time.Second/interval)
365+
if v > bufsz {
366+
panic(v)
367+
}
368+
if v < 1 {
369+
return 1
370+
}
371+
return v
372+
}
373+
374+
wn := int32(1) // for each Read, must <= bufsz
375+
setServerWriteRate := func(n int32) {
376+
atomic.StoreInt32(&wn, calcRate(n))
377+
}
378+
379+
rn := int32(1) // for each Write, must <= bufsz
380+
setServerReadRate := func(n int32) {
381+
atomic.StoreInt32(&rn, calcRate(n))
382+
}
357383

358-
interval := time.Millisecond * 100
359384
go func() {
360385
for {
361386
conn, err := ln.Accept()
362-
if conn == nil && err == nil {
363-
continue
364-
}
365387
if err != nil {
366388
return
367389
}
390+
// set small SO_SNDBUF/SO_RCVBUF buffer for better control timeout test
391+
tcpconn := conn.(*net.TCPConn)
392+
tcpconn.SetReadBuffer(512)
393+
tcpconn.SetWriteBuffer(512)
368394
go func() {
369-
buf := make([]byte, 1024)
370-
// slow read
395+
buf := make([]byte, bufsz)
371396
for {
372-
_, err := conn.Read(buf)
397+
n := atomic.LoadInt32(&rn)
398+
_, err := conn.Read(buf[:int(n)])
373399
if err != nil {
374-
err = conn.Close()
375-
MustNil(t, err)
400+
conn.Close()
401+
return
402+
}
403+
time.Sleep(interval)
404+
}
405+
}()
406+
407+
go func() {
408+
buf := make([]byte, bufsz)
409+
for {
410+
n := atomic.LoadInt32(&wn)
411+
_, err := conn.Write(buf[:int(n)])
412+
if err != nil {
413+
conn.Close()
376414
return
377415
}
378416
time.Sleep(interval)
@@ -381,26 +419,103 @@ func TestWriteTimeout(t *testing.T) {
381419
}
382420
}()
383421

384-
conn, err := DialConnection("tcp", address, time.Second)
385-
MustNil(t, err)
422+
newConn := func() Connection {
423+
conn, err := DialConnection("tcp", ln.Addr().String(), time.Second)
424+
MustNil(t, err)
425+
fd := conn.(Conn).Fd()
426+
// set small SO_SNDBUF/SO_RCVBUF buffer for better control timeout test
427+
err = syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_SNDBUF, 512)
428+
MustNil(t, err)
429+
err = syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_RCVBUF, 512)
430+
MustNil(t, err)
431+
return conn
432+
}
386433

387-
_, err = conn.Writer().Malloc(1024)
388-
MustNil(t, err)
389-
err = conn.Writer().Flush()
390-
MustNil(t, err)
434+
mallocAndFlush := func(conn Connection, sz int) error {
435+
_, err = conn.Writer().Malloc(sz)
436+
MustNil(t, err)
437+
return conn.Writer().Flush()
438+
}
391439

392-
_ = conn.SetWriteTimeout(time.Millisecond * 10)
393-
_, err = conn.Writer().Malloc(1024 * 1024 * 512)
394-
MustNil(t, err)
395-
err = conn.Writer().Flush()
396-
MustTrue(t, errors.Is(err, ErrWriteTimeout))
440+
t.Run("TestWriteTimeout", func(t *testing.T) {
441+
setServerReadRate(10 << 10) // 10KB/s
397442

398-
// close success
399-
err = conn.Close()
400-
MustNil(t, err)
443+
conn := newConn()
444+
defer conn.Close()
401445

402-
err = ln.Close()
403-
MustNil(t, err)
446+
// write 1KB without timeout
447+
err = mallocAndFlush(conn, 1<<10) // ~100ms
448+
MustNil(t, err)
449+
450+
// write 50ms timeout
451+
_ = conn.SetWriteTimeout(50 * time.Millisecond)
452+
err = mallocAndFlush(conn, 1<<20)
453+
MustTrue(t, errors.Is(err, ErrWriteTimeout))
454+
})
455+
456+
t.Run("TestReadTimeout", func(t *testing.T) {
457+
setServerWriteRate(10 << 10) // 10KB/s
458+
459+
conn := newConn()
460+
defer conn.Close()
461+
462+
// read 1KB without timeout
463+
_, err = conn.Reader().Next(1 << 10) // ~100ms
464+
MustNil(t, err)
465+
466+
// read 20KB ~ 2s, 50ms timeout
467+
_ = conn.SetReadTimeout(50 * time.Millisecond)
468+
_, err = conn.Reader().Next(20 << 10)
469+
MustTrue(t, errors.Is(err, ErrReadTimeout))
470+
})
471+
472+
t.Run("TestWriteDeadline", func(t *testing.T) {
473+
setServerReadRate(10 << 10) // 10KB/s
474+
475+
conn := newConn()
476+
defer conn.Close()
477+
478+
// write 1KB without deadline
479+
err = conn.SetWriteDeadline(time.Now())
480+
err = conn.SetDeadline(time.Time{})
481+
MustNil(t, err)
482+
err = mallocAndFlush(conn, 1<<10) // ~100ms
483+
MustNil(t, err)
484+
485+
// write with deadline
486+
err = conn.SetWriteDeadline(time.Now().Add(50 * time.Millisecond))
487+
MustNil(t, err)
488+
err = mallocAndFlush(conn, 1<<20)
489+
MustTrue(t, errors.Is(err, ErrWriteTimeout))
490+
491+
// write deadline exceeded
492+
err = mallocAndFlush(conn, 10<<10)
493+
MustTrue(t, errors.Is(err, ErrWriteTimeout))
494+
})
495+
496+
t.Run("TestReadDeadline", func(t *testing.T) {
497+
setServerWriteRate(20 << 10) // 20KB/s
498+
499+
conn := newConn()
500+
defer conn.Close()
501+
502+
// read 1KB without deadline
503+
err = conn.SetReadDeadline(time.Now())
504+
err = conn.SetDeadline(time.Time{})
505+
MustNil(t, err)
506+
_, err = conn.Reader().Next(1 << 10)
507+
MustNil(t, err)
508+
509+
// read 100KB with deadline
510+
err = conn.SetReadDeadline(time.Now().Add(50 * time.Millisecond))
511+
MustNil(t, err)
512+
_, err = conn.Reader().Next(100 << 10)
513+
MustTrue(t, errors.Is(err, ErrReadTimeout))
514+
515+
// read 10KB, deadline exceeded
516+
_, err = conn.Reader().Next(10 << 10)
517+
MustTrue(t, errors.Is(err, ErrReadTimeout))
518+
})
404519
}
405520

406521
// TestConnectionLargeMemory is used to verify the memory usage in the large package scenario.
@@ -656,7 +771,7 @@ func TestConnectionServerClose(t *testing.T) {
656771
var wg sync.WaitGroup
657772
el, err := NewEventLoop(
658773
func(ctx context.Context, connection Connection) error {
659-
// t.Logf("server.OnRequest: addr=%s", connection.RemoteAddr())
774+
t.Logf("server.OnRequest: addr=%s", connection.RemoteAddr())
660775
defer wg.Done()
661776
buf, err := connection.Reader().Next(len(PONG)) // pong
662777
Equal(t, string(buf), PONG)
@@ -679,14 +794,14 @@ func TestConnectionServerClose(t *testing.T) {
679794
err = connection.Writer().Flush()
680795
MustNil(t, err)
681796
connection.AddCloseCallback(func(connection Connection) error {
682-
// t.Logf("server.CloseCallback: addr=%s", connection.RemoteAddr())
797+
t.Logf("server.CloseCallback: addr=%s", connection.RemoteAddr())
683798
wg.Done()
684799
return nil
685800
})
686801
return ctx
687802
}),
688803
WithOnPrepare(func(connection Connection) context.Context {
689-
// t.Logf("server.OnPrepare: addr=%s", connection.RemoteAddr())
804+
t.Logf("server.OnPrepare: addr=%s", connection.RemoteAddr())
690805
defer wg.Done()
691806
//nolint:staticcheck // SA1029 no built-in type string as key
692807
return context.WithValue(context.Background(), "prepare", "true")
@@ -719,7 +834,7 @@ func TestConnectionServerClose(t *testing.T) {
719834

720835
return connection.Close()
721836
}
722-
conns := 100
837+
conns := 10
723838
// server: OnPrepare, OnConnect, OnRequest, CloseCallback
724839
// client: OnRequest, CloseCallback
725840
wg.Add(conns * 6)
@@ -730,7 +845,7 @@ func TestConnectionServerClose(t *testing.T) {
730845
err = conn.SetOnRequest(clientOnRequest)
731846
MustNil(t, err)
732847
conn.AddCloseCallback(func(connection Connection) error {
733-
// t.Logf("client.CloseCallback: addr=%s", connection.LocalAddr())
848+
t.Logf("client.CloseCallback: addr=%s", connection.LocalAddr())
734849
defer wg.Done()
735850
return nil
736851
})

0 commit comments

Comments
 (0)