Skip to content

Commit 36083f7

Browse files
committed
fix: add mutex for thread-safe access to HeartbeatScanner error handling and update tests for parallel execution
1 parent 58298a9 commit 36083f7

File tree

2 files changed

+25
-0
lines changed

2 files changed

+25
-0
lines changed

common/render/heartbeat.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package render
22

33
import (
44
"bufio"
5+
"sync"
56
"time"
67

78
"github.com/gin-gonic/gin"
@@ -52,6 +53,7 @@ type HeartbeatScanner struct {
5253
done chan struct{}
5354
interval time.Duration
5455
text string
56+
mu sync.Mutex
5557
err error
5658
closed bool
5759
}
@@ -96,7 +98,9 @@ func (h *HeartbeatScanner) readLoop(scanner *bufio.Scanner) {
9698
}
9799
// Set err before closing the channel so the caller sees it after
98100
// the channel-close receive.
101+
h.mu.Lock()
99102
h.err = scanner.Err()
103+
h.mu.Unlock()
100104
}
101105

102106
// Scan advances to the next line from the upstream scanner. While waiting
@@ -128,6 +132,8 @@ func (h *HeartbeatScanner) Text() string {
128132

129133
// Err returns the first non-EOF error encountered by the underlying scanner.
130134
func (h *HeartbeatScanner) Err() error {
135+
h.mu.Lock()
136+
defer h.mu.Unlock()
131137
return h.err
132138
}
133139

relay/adaptor/openai/direct_stream_handler_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ func fakeUpstream(sse string) *http.Response {
3636
// TestResponseAPIDirectStreamHandler_DataForwarded verifies that data lines
3737
// from the upstream are forwarded to the client (event: lines are skipped).
3838
func TestResponseAPIDirectStreamHandler_DataForwarded(t *testing.T) {
39+
t.Parallel()
3940
c, w := newDirectStreamContext()
4041

4142
sse := "event: response.created\n" +
@@ -59,6 +60,7 @@ func TestResponseAPIDirectStreamHandler_DataForwarded(t *testing.T) {
5960
// TestResponseAPIDirectStreamHandler_NormalCompletion tests a full stream with
6061
// response.created -> deltas -> response.completed -> [DONE].
6162
func TestResponseAPIDirectStreamHandler_NormalCompletion(t *testing.T) {
63+
t.Parallel()
6264
c, w := newDirectStreamContext()
6365

6466
sse := "event: response.created\n" +
@@ -95,6 +97,7 @@ func TestResponseAPIDirectStreamHandler_NormalCompletion(t *testing.T) {
9597
// upstream stream ends without sending [DONE], the handler still renders [DONE]
9698
// (via the !doneRendered fallback at the end of the function).
9799
func TestResponseAPIDirectStreamHandler_UpstreamDropsNoDone(t *testing.T) {
100+
t.Parallel()
98101
c, w := newDirectStreamContext()
99102

100103
// Stream ends abruptly without [DONE]
@@ -117,6 +120,7 @@ func TestResponseAPIDirectStreamHandler_UpstreamDropsNoDone(t *testing.T) {
117120
// malformed JSON in data lines is skipped (not forwarded) but doesn't break
118121
// the stream.
119122
func TestResponseAPIDirectStreamHandler_UnparseableEventsSkipped(t *testing.T) {
123+
t.Parallel()
120124
c, w := newDirectStreamContext()
121125

122126
sse := "data: {this is not valid json}\n" +
@@ -140,6 +144,7 @@ func TestResponseAPIDirectStreamHandler_UnparseableEventsSkipped(t *testing.T) {
140144
// TestResponseAPIDirectStreamHandler_UsageAccumulation tests that usage from
141145
// a response.completed event is correctly extracted and returned.
142146
func TestResponseAPIDirectStreamHandler_UsageAccumulation(t *testing.T) {
147+
t.Parallel()
143148
c, _ := newDirectStreamContext()
144149

145150
sse := "event: response.completed\n" +
@@ -158,6 +163,7 @@ func TestResponseAPIDirectStreamHandler_UsageAccumulation(t *testing.T) {
158163
// TestResponseAPIDirectStreamHandler_TextAccumulation verifies that delta events
159164
// accumulate responseText correctly.
160165
func TestResponseAPIDirectStreamHandler_TextAccumulation(t *testing.T) {
166+
t.Parallel()
161167
c, _ := newDirectStreamContext()
162168

163169
sse := "event: response.output_text.delta\n" +
@@ -179,6 +185,7 @@ func TestResponseAPIDirectStreamHandler_TextAccumulation(t *testing.T) {
179185
// TestResponseAPIDirectStreamHandler_EmptyEventType verifies that data lines
180186
// without a preceding event: line are still forwarded (they just have data: prefix).
181187
func TestResponseAPIDirectStreamHandler_EmptyEventType(t *testing.T) {
188+
t.Parallel()
182189
c, w := newDirectStreamContext()
183190

184191
// Data line with no preceding event: line - still a valid response object
@@ -197,6 +204,7 @@ func TestResponseAPIDirectStreamHandler_EmptyEventType(t *testing.T) {
197204
// TestResponseAPIDirectStreamHandler_WebSearchTracking tests that web_search_call
198205
// output items are counted and stored in context.
199206
func TestResponseAPIDirectStreamHandler_WebSearchTracking(t *testing.T) {
207+
t.Parallel()
200208
c, _ := newDirectStreamContext()
201209

202210
// response.completed with a web_search_call in output
@@ -218,6 +226,7 @@ func TestResponseAPIDirectStreamHandler_WebSearchTracking(t *testing.T) {
218226
// as a ResponseAPIResponse. Events where the "id" field is at the top level
219227
// (not nested inside "response") are parsed as fullResponse objects.
220228
func TestResponseAPIDirectStreamHandler_ConvertedResponseSet(t *testing.T) {
229+
t.Parallel()
221230
c, _ := newDirectStreamContext()
222231

223232
// Use a response.created event which has "id" at the top level of the
@@ -245,6 +254,7 @@ func TestResponseAPIDirectStreamHandler_ConvertedResponseSet(t *testing.T) {
245254
// that a response.completed event (where id is nested) still sets
246255
// ConvertedResponse via the fallback map path.
247256
func TestResponseAPIDirectStreamHandler_ConvertedResponseFromCompleted(t *testing.T) {
257+
t.Parallel()
248258
c, _ := newDirectStreamContext()
249259

250260
sse := "event: response.output_text.delta\n" +
@@ -273,6 +283,7 @@ func TestResponseAPIDirectStreamHandler_ConvertedResponseFromCompleted(t *testin
273283
// when there is no full response event but there is text/usage, a fallback
274284
// map is stored in ConvertedResponse.
275285
func TestResponseAPIDirectStreamHandler_ConvertedResponseFallback(t *testing.T) {
286+
t.Parallel()
276287
c, _ := newDirectStreamContext()
277288

278289
// Only delta events, no response.completed
@@ -298,6 +309,7 @@ func TestResponseAPIDirectStreamHandler_ConvertedResponseFallback(t *testing.T)
298309
// events (which don't have a response ID or parseable structure) are skipped
299310
// by the parser but don't cause errors.
300311
func TestResponseAPIDirectStreamHandler_KeepaliveSkipped(t *testing.T) {
312+
t.Parallel()
301313
c, w := newDirectStreamContext()
302314

303315
sse := "event: keepalive\n" +
@@ -321,6 +333,7 @@ func TestResponseAPIDirectStreamHandler_KeepaliveSkipped(t *testing.T) {
321333
// TestResponseAPIDirectStreamHandler_MultipleUsageUpdates verifies that the
322334
// handler takes the last usage value when multiple events contain usage.
323335
func TestResponseAPIDirectStreamHandler_MultipleUsageUpdates(t *testing.T) {
336+
t.Parallel()
324337
c, _ := newDirectStreamContext()
325338

326339
sse := "event: response.created\n" +
@@ -343,6 +356,7 @@ func TestResponseAPIDirectStreamHandler_MultipleUsageUpdates(t *testing.T) {
343356
// TestResponseAPIDirectStreamHandler_EmptyStream verifies behavior with an
344357
// empty stream (no data lines at all).
345358
func TestResponseAPIDirectStreamHandler_EmptyStream(t *testing.T) {
359+
t.Parallel()
346360
c, w := newDirectStreamContext()
347361

348362
sse := ""
@@ -360,6 +374,7 @@ func TestResponseAPIDirectStreamHandler_EmptyStream(t *testing.T) {
360374
// TestResponseAPIDirectStreamHandler_OnlyDone verifies behavior when the
361375
// upstream sends only [DONE] with no data events.
362376
func TestResponseAPIDirectStreamHandler_OnlyDone(t *testing.T) {
377+
t.Parallel()
363378
c, w := newDirectStreamContext()
364379

365380
sse := "data: [DONE]\n"
@@ -376,6 +391,7 @@ func TestResponseAPIDirectStreamHandler_OnlyDone(t *testing.T) {
376391
// TestResponseAPIDirectStreamHandler_FlushOccurs verifies that the recorder
377392
// is flushed (indicating streaming output was sent).
378393
func TestResponseAPIDirectStreamHandler_FlushOccurs(t *testing.T) {
394+
t.Parallel()
379395
c, w := newDirectStreamContext()
380396

381397
sse := "event: response.created\n" +
@@ -396,6 +412,7 @@ func TestResponseAPIDirectStreamHandler_FlushOccurs(t *testing.T) {
396412
// "event:" lines from upstream are forwarded to the client, matching the
397413
// official Response API wire format: "event: <type>\ndata: <json>\n\n".
398414
func TestResponseAPIDirectStreamHandler_EventLinesForwarded(t *testing.T) {
415+
t.Parallel()
399416
c, w := newDirectStreamContext()
400417

401418
sse := "event: response.created\n" +
@@ -420,6 +437,7 @@ func TestResponseAPIDirectStreamHandler_EventLinesForwarded(t *testing.T) {
420437
// TestResponseAPIDirectStreamHandler_ReasoningPassthrough verifies that
421438
// reasoning summary events are forwarded faithfully including event: lines.
422439
func TestResponseAPIDirectStreamHandler_ReasoningPassthrough(t *testing.T) {
440+
t.Parallel()
423441
c, w := newDirectStreamContext()
424442

425443
sse := "event: response.output_item.added\n" +
@@ -452,6 +470,7 @@ func TestResponseAPIDirectStreamHandler_ReasoningPassthrough(t *testing.T) {
452470
// TestResponseAPIDirectStreamHandler_ToolCallPassthrough verifies that
453471
// function_call events are forwarded faithfully.
454472
func TestResponseAPIDirectStreamHandler_ToolCallPassthrough(t *testing.T) {
473+
t.Parallel()
455474
c, w := newDirectStreamContext()
456475

457476
sse := "event: response.output_item.added\n" +

0 commit comments

Comments
 (0)