Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions src/lib/BetaMessageStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ export class BetaMessageStream<ParsedT = null> implements AsyncIterable<BetaMess
messages: BetaMessageParam[] = [];
receivedMessages: ParsedBetaMessage<ParsedT>[] = [];
#currentMessageSnapshot: BetaMessage | undefined;
#cachedFinalMessage: ParsedBetaMessage<ParsedT> | undefined;
#params: MessageCreateParams | null = null;

controller: AbortController = new AbortController();
Expand Down Expand Up @@ -320,6 +321,9 @@ export class BetaMessageStream<ParsedT = null> implements AsyncIterable<BetaMess
}

#getFinalMessage(): ParsedBetaMessage<ParsedT> {
if (this.#cachedFinalMessage) {
return this.#cachedFinalMessage;
}
if (this.receivedMessages.length === 0) {
throw new AnthropicError('stream ended without producing a Message with role=assistant');
}
Expand All @@ -337,12 +341,9 @@ export class BetaMessageStream<ParsedT = null> implements AsyncIterable<BetaMess
}

#getFinalText(): string {
if (this.receivedMessages.length === 0) {
throw new AnthropicError('stream ended without producing a Message with role=assistant');
}
const textBlocks = this.receivedMessages
.at(-1)!
.content.filter((block): block is BetaTextBlock => block.type === 'text')
const finalMessage = this.#getFinalMessage();
const textBlocks = finalMessage.content
.filter((block): block is BetaTextBlock => block.type === 'text')
.map((block) => block.text);
if (textBlocks.length === 0) {
throw new AnthropicError('stream ended without producing a content block with type=text');
Expand Down Expand Up @@ -399,6 +400,18 @@ export class BetaMessageStream<ParsedT = null> implements AsyncIterable<BetaMess
listeners.forEach(({ listener }: any) => listener(...args));
}

if (event === 'end') {
// Release large internal state to prevent memory leaks when stream references
// are retained (e.g. in long-running tool loops). The final message is preserved
// in #cachedFinalMessage so finalMessage() / finalText() continue to work.
this.messages.length = 0;
this.receivedMessages.length = 0;
this.#currentMessageSnapshot = undefined;
this.#params = null;
this.#listeners = {};
return;
}

if (event === 'abort') {
const error = args[0] as APIUserAbortError;
if (!this.#catchingPromiseCreated && !listeners?.length) {
Expand Down Expand Up @@ -432,6 +445,7 @@ export class BetaMessageStream<ParsedT = null> implements AsyncIterable<BetaMess
protected _emitFinal() {
const finalMessage = this.receivedMessages.at(-1);
if (finalMessage) {
this.#cachedFinalMessage = finalMessage;
this._emit('finalMessage', this.#getFinalMessage());
}
}
Expand Down
26 changes: 20 additions & 6 deletions src/lib/MessageStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ export class MessageStream<ParsedT = null> implements AsyncIterable<MessageStrea
messages: MessageParam[] = [];
receivedMessages: ParsedMessage<ParsedT>[] = [];
#currentMessageSnapshot: Message | undefined;
#cachedFinalMessage: ParsedMessage<ParsedT> | undefined;
#params: MessageCreateParams | null = null;

controller: AbortController = new AbortController();
Expand Down Expand Up @@ -328,6 +329,9 @@ export class MessageStream<ParsedT = null> implements AsyncIterable<MessageStrea
}

#getFinalMessage(): ParsedMessage<ParsedT> {
if (this.#cachedFinalMessage) {
return this.#cachedFinalMessage;
}
if (this.receivedMessages.length === 0) {
throw new AnthropicError('stream ended without producing a Message with role=assistant');
}
Expand All @@ -345,12 +349,9 @@ export class MessageStream<ParsedT = null> implements AsyncIterable<MessageStrea
}

#getFinalText(): string {
if (this.receivedMessages.length === 0) {
throw new AnthropicError('stream ended without producing a Message with role=assistant');
}
const textBlocks = this.receivedMessages
.at(-1)!
.content.filter((block): block is TextBlock => block.type === 'text')
const finalMessage = this.#getFinalMessage();
const textBlocks = finalMessage.content
.filter((block): block is TextBlock => block.type === 'text')
.map((block) => block.text);
if (textBlocks.length === 0) {
throw new AnthropicError('stream ended without producing a content block with type=text');
Expand Down Expand Up @@ -407,6 +408,18 @@ export class MessageStream<ParsedT = null> implements AsyncIterable<MessageStrea
listeners.forEach(({ listener }: any) => listener(...args));
}

if (event === 'end') {
// Release large internal state to prevent memory leaks when stream references
// are retained (e.g. in long-running tool loops). The final message is preserved
// in #cachedFinalMessage so finalMessage() / finalText() continue to work.
this.messages.length = 0;
this.receivedMessages.length = 0;
this.#currentMessageSnapshot = undefined;
this.#params = null;
this.#listeners = {};
return;
}

if (event === 'abort') {
const error = args[0] as APIUserAbortError;
if (!this.#catchingPromiseCreated && !listeners?.length) {
Expand Down Expand Up @@ -440,6 +453,7 @@ export class MessageStream<ParsedT = null> implements AsyncIterable<MessageStrea
protected _emitFinal() {
const finalMessage = this.receivedMessages.at(-1);
if (finalMessage) {
this.#cachedFinalMessage = finalMessage;
this._emit('finalMessage', this.#getFinalMessage());
}
}
Expand Down