Skip to content

fix(streaming): add aclose() to AsyncStream for standard async cleanup protocol#2944

Open
s-zx wants to merge 2 commits intoopenai:mainfrom
s-zx:fix/2853-async-stream-aclose
Open

fix(streaming): add aclose() to AsyncStream for standard async cleanup protocol#2944
s-zx wants to merge 2 commits intoopenai:mainfrom
s-zx:fix/2853-async-stream-aclose

Conversation

@s-zx
Copy link

@s-zx s-zx commented Mar 8, 2026

Problem

AsyncStream exposes close() but not aclose(), causing AttributeError when instrumentation libraries (Langfuse, OpenTelemetry wrappers, etc.) call the standard Python async cleanup convention:

await stream.aclose()
# AttributeError: 'AsyncStream' object has no attribute 'aclose'. Did you mean: 'close'?

The error surfaces in production via the call chain in AsyncChatCompletionStream:

AsyncChatCompletionStreamManager.__aexit__
  -> AsyncChatCompletionStream.close()        # line 215
  -> self._response.aclose()                  # AttributeError when _response is AsyncStream

When instrumentation wraps the raw AsyncStream, self._response resolves to the AsyncStream itself rather than the underlying httpx.Response. Since AsyncStream has close() but not aclose(), cleanup fails.

Closes #2853

Fix

Add aclose() as a thin alias that delegates to close():

async def aclose(self) -> None:
    """Async-convention alias for close()."""
    await self.close()

aclose() is the standard async cleanup method used by asyncio.StreamWriter, httpx.AsyncByteStream, and async generators (PEP 525). Adding it lets callers and wrappers use either name without special-casing AsyncStream.

The sync Stream class is not affected since its callers use close().

s-zx added 2 commits March 8, 2026 23:29
…ad. block

Two related bugs in Stream.__stream__ and AsyncStream.__stream__:

1. JSONDecodeError on meta-only SSE events (fixes openai#2722)
   The SSE specification allows events with no data field (e.g. standalone
   'retry:' or 'id:' directives).  The SDK's SSE parser correctly sets
   data='' for these but __stream__ called sse.json() unconditionally,
   raising JSONDecodeError: Expecting value.
   Fix: skip events whose data is empty or whitespace before any JSON
   parsing.

2. Unreachable sse.event == 'error' check (fixes openai#2796)
   The error-event guard was nested inside the startswith('thread.') branch,
   making it logically impossible to trigger because 'error' != 'thread.*'.
   This was a regression from commit abc2596 ('fix(streaming): correct
   indentation') where the check was accidentally moved inside the block.
   Fix: move the error-event handling to the else branch (non-thread events),
   which is the correct location for standalone 'error' SSE events.
…p protocol

AsyncStream exposes close() but not aclose(), causing AttributeError
when instrumentation libraries (e.g. Langfuse, OpenTelemetry wrappers)
call the standard Python async cleanup convention:

  await stream.aclose()
  # AttributeError: 'AsyncStream' object has no attribute 'aclose'

The error surfaces in production via the call chain:
  AsyncChatCompletionStreamManager.__aexit__
    -> AsyncChatCompletionStream.close()
    -> self._response.aclose()   <- AttributeError when _response is
                                    AsyncStream (happens when the raw
                                    stream is wrapped by instrumentation)

aclose() is the standard async cleanup method used by asyncio.StreamWriter,
httpx.AsyncByteStream, and async generators (PEP 525).  Add it as a
thin alias that delegates to close() so callers can use either name
without special-casing AsyncStream.

Fixes openai#2853
@s-zx s-zx requested a review from a team as a code owner March 8, 2026 22:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

AsyncStream missing aclose() causes AttributeError in streaming structured output

1 participant