Skip to content

Commit 1c2ae68

Browse files
authored
Improve error handling and resource management in streamPrediction (#79)
* Move up deferred closing of response body * Return context error instead of nil * Send to line channel in select with context check * Refactor final goroutine in streamPrediction
1 parent 63039fd commit 1c2ae68

1 file changed

Lines changed: 22 additions & 21 deletions

File tree

stream.go

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -169,20 +169,24 @@ func (r *Client) streamPrediction(ctx context.Context, prediction *Prediction, l
169169

170170
g.Go(func() error {
171171
defer close(lineChan)
172+
defer resp.Body.Close()
172173

173174
for {
174175
select {
175176
case <-ctx.Done():
176-
return nil
177+
return ctx.Err()
177178
case <-done:
178179
return nil
179180
default:
180181
line, err := reader.ReadBytes('\n')
181182
if err != nil {
182-
defer resp.Body.Close()
183183
return err
184184
}
185-
lineChan <- line
185+
select {
186+
case lineChan <- line:
187+
case <-ctx.Done():
188+
return ctx.Err()
189+
}
186190
}
187191
}
188192
})
@@ -223,30 +227,27 @@ func (r *Client) streamPrediction(ctx context.Context, prediction *Prediction, l
223227
}()
224228

225229
go func() {
230+
err := g.Wait()
231+
226232
defer close(sseChan)
227233
defer close(errChan)
228234

229-
for {
230-
select {
231-
case <-ctx.Done():
232-
return
233-
case <-done:
235+
if err != nil {
236+
if errors.Is(err, io.EOF) {
237+
// Attempt to reconnect if the connection was closed before the stream was done
238+
r.streamPrediction(ctx, prediction, lastEvent, sseChan, errChan)
234239
return
235-
default:
236-
err := g.Wait()
237-
if err != nil {
238-
if err == io.EOF {
239-
// Attempt to reconnect if the connection was closed before the stream was done
240-
r.streamPrediction(ctx, prediction, lastEvent, sseChan, errChan)
241-
continue
242-
}
240+
}
243241

244-
if errors.Is(err, context.Canceled) {
245-
return
246-
}
242+
if errors.Is(err, context.Canceled) {
243+
// Context was canceled, simply return
244+
return
245+
}
247246

248-
errChan <- err
249-
}
247+
select {
248+
case errChan <- err:
249+
default:
250+
// errChan is full or closed
250251
}
251252
}
252253
}()

0 commit comments

Comments
 (0)