Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/sweet-trees-visit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"react-router": patch
---

Remove recursion from turbo-stream v2 allowing for encoding / decoding of massive payloads.
102 changes: 62 additions & 40 deletions packages/react-router/__tests__/vendor/turbo-stream-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,26 @@ import {
type EncodePlugin,
} from "../../vendor/turbo-stream-v2/utils";

async function quickDecode(stream: ReadableStream<Uint8Array>) {
async function quickDecode<T = unknown>(stream: ReadableStream<Uint8Array>) {
const decoded = await decode(stream);
await decoded.done;
return decoded.value;
return decoded.value as T;
}

async function readStreamToString(stream: ReadableStream<Uint8Array>) {
const reader = stream.getReader();
const decoder = new TextDecoder();
let text = "";

let chunk = await reader.read();
while (!chunk.done) {
text += decoder.decode(chunk.value, { stream: true });
chunk = await reader.read();
}

text += decoder.decode();
reader.releaseLock();
return text;
}

test("should encode and decode undefined", async () => {
Expand Down Expand Up @@ -52,7 +68,7 @@ test("should encode and decode Date", async () => {

test("should encode and decode invalid Date", async () => {
const input = new Date("invalid");
const output = await quickDecode(encode(input));
const output = await quickDecode<Date>(encode(input));
expect(isNaN(input.getTime())).toBe(true);
expect(isNaN(output.getTime())).toBe(true);
});
Expand Down Expand Up @@ -237,16 +253,7 @@ test("should encode and decode object and dedupe object key, value, and promise
expect(partialResult).toEqual(partialInput);
expect(await bazResult).toEqual(await bazInput);

let encoded = "";
const stream = encode(input);
await stream.pipeThrough(new TextDecoderStream()).pipeTo(
new WritableStream({
write(chunk) {
encoded += chunk;
},
}),
);

const encoded = await readStreamToString(encode(input));
expect(Array.from(encoded.matchAll(/"foo"/g))).toHaveLength(1);
expect(Array.from(encoded.matchAll(/"bar"/g))).toHaveLength(1);
expect(Array.from(encoded.matchAll(/"baz"/g))).toHaveLength(1);
Expand Down Expand Up @@ -504,15 +511,7 @@ test("should encode and decode objects with multiple promises resolving to the s
await decoded.done;

// Ensure we aren't duplicating values in the stream
let encoded = "";
const stream = encode(input);
await stream.pipeThrough(new TextDecoderStream()).pipeTo(
new WritableStream({
write(chunk) {
encoded += chunk;
},
}),
);
const encoded = await readStreamToString(encode(input));
expect(Array.from(encoded.matchAll(/"baz"/g))).toHaveLength(1);
});

Expand All @@ -535,15 +534,7 @@ test("should encode and decode objects with reused values", async () => {
expect(await value.data).toEqual(await input.data);

// Ensure we aren't duplicating values in the stream
let encoded = "";
const stream = encode(input);
await stream.pipeThrough(new TextDecoderStream()).pipeTo(
new WritableStream({
write(chunk) {
encoded += chunk;
},
}),
);
const encoded = await readStreamToString(encode(input));
expect(Array.from(encoded.matchAll(/"baz"/g))).toHaveLength(1);
await decoded.done;
});
Expand All @@ -566,15 +557,7 @@ test("should encode and decode objects with multiple promises rejecting to the s
await decoded.done;

// Ensure we aren't duplicating values in the stream
let encoded = "";
const stream = encode(input);
await stream.pipeThrough(new TextDecoderStream()).pipeTo(
new WritableStream({
write(chunk) {
encoded += chunk;
},
}),
);
const encoded = await readStreamToString(encode(input));
expect(Array.from(encoded.matchAll(/"baz"/g))).toHaveLength(1);
});

Expand All @@ -597,3 +580,42 @@ test("should allow many nested promises without a memory leak", async () => {
expect(currentDecoded.i).toBe(depth - 1);
await decoded.done;
});

test("should encode large payload", async () => {
const input = createDeeplyNestedObject();
await readStreamToString(encode(input));
});

test("should encode and decode large payload and yield the event loop", async () => {
const input = createDeeplyNestedObject();
let i = 0;
let interval = setInterval(() => i++, 0);
const decoded = await decode(encode(input));

clearInterval(interval);
expect(i > 0).toBe(true);

let currentInput: Nested | null = input;
let currentDecoded = decoded.value as Nested | null;

while (currentInput && currentDecoded) {
expect(currentDecoded.value).toBe(currentInput.value);
currentInput = currentInput.next;
currentDecoded = currentDecoded.next;
}

expect(currentInput).toBeNull();
expect(currentDecoded).toBeNull();

await decoded.done;
});

type Nested = { value: number; next: Nested | null };
function createDeeplyNestedObject(): Nested {
const depth = 100000;
let current = { value: 0, next: null as any };
for (let i = 1; i < depth; i++) {
current = { value: i, next: current };
}
return current;
}
72 changes: 50 additions & 22 deletions packages/react-router/vendor/turbo-stream-v2/flatten.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,15 @@ import {
type ThisEncode,
} from "./utils";

export function flatten(this: ThisEncode, input: unknown): number | [number] {
const TIME_LIMIT_MS = 1;
const getNow = () => Date.now();
const yieldToMain = (): Promise<void> =>
new Promise((resolve) => setTimeout(resolve, 0));

export async function flatten(
this: ThisEncode,
input: unknown,
): Promise<number | [number]> {
const { indices } = this;
const existing = indices.get(input);
if (existing) return [existing];
Expand All @@ -33,21 +41,51 @@ export function flatten(this: ThisEncode, input: unknown): number | [number] {

const index = this.index++;
indices.set(input, index);
stringify.call(this, input, index);

const stack: [unknown, number][] = [[input, index]];
await stringify.call(this, stack);

return index;
}

function stringify(this: ThisEncode, input: unknown, index: number) {
const { deferred, plugins, postPlugins } = this;
async function stringify(this: ThisEncode, stack: [unknown, number][]) {
const { deferred, indices, plugins, postPlugins } = this;
const str = this.stringified;

const stack: [unknown, number][] = [[input, index]];
let lastYieldTime = getNow();

// Helper to assign index and schedule for processing if needed
const flattenValue = (value: unknown): number | [number] => {
const existing = indices.get(value);
if (existing) return [existing];

if (value === undefined) return UNDEFINED;
if (value === null) return NULL;
if (Number.isNaN(value)) return NAN;
if (value === Number.POSITIVE_INFINITY) return POSITIVE_INFINITY;
if (value === Number.NEGATIVE_INFINITY) return NEGATIVE_INFINITY;
if (value === 0 && 1 / value < 0) return NEGATIVE_ZERO;

const index = this.index++;
indices.set(value, index);
stack.push([value, index]);
return index;
};

let i = 0;
while (stack.length > 0) {
// Yield to main thread if time limit exceeded
const now = getNow();
if (++i % 6000 === 0 && now - lastYieldTime >= TIME_LIMIT_MS) {
await yieldToMain();
lastYieldTime = getNow();
}

const [input, index] = stack.pop()!;

const partsForObj = (obj: any) =>
Object.keys(obj)
.map((k) => `"_${flatten.call(this, k)}":${flatten.call(this, obj[k])}`)
.map((k) => `"_${flattenValue(k)}":${flattenValue(obj[k])}`)
.join(",");
let error: Error | null = null;

Expand Down Expand Up @@ -87,9 +125,7 @@ function stringify(this: ThisEncode, input: unknown, index: number) {
const [pluginIdentifier, ...rest] = pluginResult;
str[index] = `[${JSON.stringify(pluginIdentifier)}`;
if (rest.length > 0) {
str[index] += `,${rest
.map((v) => flatten.call(this, v))
.join(",")}`;
str[index] += `,${rest.map((v) => flattenValue(v)).join(",")}`;
}
str[index] += "]";
break;
Expand All @@ -102,8 +138,7 @@ function stringify(this: ThisEncode, input: unknown, index: number) {
if (isArray) {
for (let i = 0; i < input.length; i++)
result +=
(i ? "," : "") +
(i in input ? flatten.call(this, input[i]) : HOLE);
(i ? "," : "") + (i in input ? flattenValue(input[i]) : HOLE);
str[index] = `${result}]`;
} else if (input instanceof Date) {
const dateTime = input.getTime();
Expand All @@ -119,18 +154,15 @@ function stringify(this: ThisEncode, input: unknown, index: number) {
} else if (input instanceof Set) {
if (input.size > 0) {
str[index] = `["${TYPE_SET}",${[...input]
.map((val) => flatten.call(this, val))
.map((val) => flattenValue(val))
.join(",")}]`;
} else {
str[index] = `["${TYPE_SET}"]`;
}
} else if (input instanceof Map) {
if (input.size > 0) {
str[index] = `["${TYPE_MAP}",${[...input]
.flatMap(([k, v]) => [
flatten.call(this, k),
flatten.call(this, v),
])
.flatMap(([k, v]) => [flattenValue(k), flattenValue(v)])
.join(",")}]`;
} else {
str[index] = `["${TYPE_MAP}"]`;
Expand Down Expand Up @@ -165,9 +197,7 @@ function stringify(this: ThisEncode, input: unknown, index: number) {
const [pluginIdentifier, ...rest] = pluginResult;
str[index] = `[${JSON.stringify(pluginIdentifier)}`;
if (rest.length > 0) {
str[index] += `,${rest
.map((v) => flatten.call(this, v))
.join(",")}`;
str[index] += `,${rest.map((v) => flattenValue(v)).join(",")}`;
}
str[index] += "]";
break;
Expand All @@ -192,9 +222,7 @@ function stringify(this: ThisEncode, input: unknown, index: number) {
const [pluginIdentifier, ...rest] = pluginResult;
str[index] = `[${JSON.stringify(pluginIdentifier)}`;
if (rest.length > 0) {
str[index] += `,${rest
.map((v) => flatten.call(this, v))
.join(",")}`;
str[index] += `,${rest.map((v) => flattenValue(v)).join(",")}`;
}
str[index] += "]";
break;
Expand Down
Loading