Skip to content

Commit 577c1d7

Browse files
committed
Encode messages before storing in send buffer
1 parent 9b4912d commit 577c1d7

File tree

7 files changed

+63
-42
lines changed

7 files changed

+63
-42
lines changed

testUtil/fixtures/cleanup.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { expect, vi } from 'vitest';
1+
import { assert, expect, vi } from 'vitest';
22
import {
33
ClientTransport,
44
Connection,
@@ -68,9 +68,16 @@ export async function ensureTransportBuffersAreEventuallyEmpty(
6868
[...t.sessions]
6969
.map(([client, sess]) => {
7070
// get all messages that are not heartbeats
71-
const buff = sess.sendBuffer.filter((msg) => {
72-
return !Value.Check(ControlMessageAckSchema, msg.payload);
73-
});
71+
const buff = sess.sendBuffer
72+
.map((encodedMsg) => {
73+
const msg = sess.parseMsg(encodedMsg.data);
74+
assert(msg);
75+
76+
return msg;
77+
})
78+
.filter(
79+
(msg) => !Value.Check(ControlMessageAckSchema, msg.payload),
80+
);
7481

7582
return [client, buff] as [
7683
string,

transport/message.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,17 @@ export function cancelMessage(
280280
export type OpaqueTransportMessage = TransportMessage;
281281
export type TransportClientId = string;
282282

283+
/**
284+
* An encoded message that is ready to be send over the transport.
285+
* The seq number is kept to keep track of which messages have been
286+
* acked by the peer.
287+
*/
288+
export interface EncodedTransportMessage {
289+
id: string;
290+
seq: number;
291+
data: Uint8Array;
292+
}
293+
283294
/**
284295
* Checks if the given control flag (usually found in msg.controlFlag) is an ack message.
285296
* @param controlFlag - The control flag to check.

transport/sessionStateMachine/SessionConnected.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,11 @@ export class SessionConnected<
4949
}
5050

5151
send(msg: PartialTransportMessage): string {
52-
const constructedMsg = this.constructMsg(msg);
53-
this.sendBuffer.push(constructedMsg);
54-
this.conn.send(this.options.codec.toBuffer(constructedMsg));
52+
const encodedMsg = this.encodeMsg(msg);
53+
this.sendBuffer.push(encodedMsg);
54+
this.conn.send(encodedMsg.data);
5555

56-
return constructedMsg.id;
56+
return encodedMsg.id;
5757
}
5858

5959
constructor(props: SessionConnectedProps<ConnType>) {
@@ -75,7 +75,7 @@ export class SessionConnected<
7575
);
7676

7777
for (const msg of this.sendBuffer) {
78-
this.conn.send(this.options.codec.toBuffer(msg));
78+
this.conn.send(msg.data);
7979
}
8080
}
8181

transport/sessionStateMachine/common.ts

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import { Logger, MessageMetadata } from '../../logging';
22
import { TelemetryInfo } from '../../tracing';
33
import {
4+
EncodedTransportMessage,
45
OpaqueTransportMessage,
56
OpaqueTransportMessageSchema,
67
PartialTransportMessage,
78
ProtocolVersion,
89
TransportClientId,
9-
TransportMessage,
1010
} from '../message';
1111
import { Value } from '@sinclair/typebox/value';
1212
import { Codec } from '../../codec';
@@ -204,7 +204,7 @@ export interface IdentifiedSessionProps extends CommonSessionProps {
204204
to: TransportClientId;
205205
seq: number;
206206
ack: number;
207-
sendBuffer: Array<OpaqueTransportMessage>;
207+
sendBuffer: Array<EncodedTransportMessage>;
208208
telemetry: TelemetryInfo;
209209
protocolVersion: ProtocolVersion;
210210
}
@@ -224,7 +224,7 @@ export abstract class IdentifiedSession extends CommonSession {
224224
* Number of unique messages we've received this session (excluding handshake)
225225
*/
226226
ack: number;
227-
sendBuffer: Array<OpaqueTransportMessage>;
227+
sendBuffer: Array<EncodedTransportMessage>;
228228

229229
constructor(props: IdentifiedSessionProps) {
230230
const { id, to, seq, ack, sendBuffer, telemetry, log, protocolVersion } =
@@ -258,9 +258,9 @@ export abstract class IdentifiedSession extends CommonSession {
258258
return metadata;
259259
}
260260

261-
constructMsg<Payload>(
261+
encodeMsg<Payload>(
262262
partialMsg: PartialTransportMessage<Payload>,
263-
): TransportMessage<Payload> {
263+
): EncodedTransportMessage {
264264
const msg = {
265265
...partialMsg,
266266
id: generateId(),
@@ -270,17 +270,23 @@ export abstract class IdentifiedSession extends CommonSession {
270270
ack: this.ack,
271271
};
272272

273+
const encodedMsg = {
274+
id: msg.id,
275+
seq: msg.seq,
276+
data: this.options.codec.toBuffer(msg),
277+
};
278+
273279
this.seq++;
274280

275-
return msg;
281+
return encodedMsg;
276282
}
277283

278284
nextSeq(): number {
279285
return this.sendBuffer.length > 0 ? this.sendBuffer[0].seq : this.seq;
280286
}
281287

282288
send(msg: PartialTransportMessage): string {
283-
const constructedMsg = this.constructMsg(msg);
289+
const constructedMsg = this.encodeMsg(msg);
284290
this.sendBuffer.push(constructedMsg);
285291

286292
return constructedMsg.id;

transport/sessionStateMachine/stateMachine.test.ts

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1891,8 +1891,8 @@ describe('session state machine', () => {
18911891
expect(onConnectionClosed).not.toHaveBeenCalled();
18921892
expect(onConnectionErrored).not.toHaveBeenCalled();
18931893

1894-
const msg = session.constructMsg(payloadToTransportMessage('hello'));
1895-
session.conn.emitData(session.options.codec.toBuffer(msg));
1894+
const msg = session.encodeMsg(payloadToTransportMessage('hello'));
1895+
session.conn.emitData(msg.data);
18961896

18971897
await waitFor(async () => {
18981898
expect(onMessage).toHaveBeenCalledTimes(1);
@@ -1940,15 +1940,13 @@ describe('session state machine', () => {
19401940

19411941
// send a heartbeat
19421942
conn.emitData(
1943-
session.options.codec.toBuffer(
1944-
session.constructMsg({
1945-
streamId: 'heartbeat',
1946-
controlFlags: ControlFlags.AckBit,
1947-
payload: {
1948-
type: 'ACK',
1949-
} satisfies Static<typeof ControlMessageAckSchema>,
1950-
}),
1951-
),
1943+
session.encodeMsg({
1944+
streamId: 'heartbeat',
1945+
controlFlags: ControlFlags.AckBit,
1946+
payload: {
1947+
type: 'ACK',
1948+
} satisfies Static<typeof ControlMessageAckSchema>,
1949+
}).data,
19521950
);
19531951

19541952
// make sure the session acks the heartbeat
@@ -1962,15 +1960,13 @@ describe('session state machine', () => {
19621960

19631961
// send a heartbeat
19641962
conn.emitData(
1965-
session.options.codec.toBuffer(
1966-
session.constructMsg({
1967-
streamId: 'heartbeat',
1968-
controlFlags: ControlFlags.AckBit,
1969-
payload: {
1970-
type: 'ACK',
1971-
} satisfies Static<typeof ControlMessageAckSchema>,
1972-
}),
1973-
),
1963+
session.encodeMsg({
1964+
streamId: 'heartbeat',
1965+
controlFlags: ControlFlags.AckBit,
1966+
payload: {
1967+
type: 'ACK',
1968+
} satisfies Static<typeof ControlMessageAckSchema>,
1969+
}).data,
19741970
);
19751971

19761972
expect(sessionHandle.onMessage).not.toHaveBeenCalled();

transport/sessionStateMachine/transitions.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { OpaqueTransportMessage, TransportClientId } from '..';
21
import {
32
SessionConnecting,
43
SessionConnectingListeners,
@@ -38,7 +37,11 @@ import {
3837
SessionBackingOff,
3938
SessionBackingOffListeners,
4039
} from './SessionBackingOff';
41-
import { ProtocolVersion } from '../message';
40+
import {
41+
EncodedTransportMessage,
42+
ProtocolVersion,
43+
TransportClientId,
44+
} from '../message';
4245

4346
function inheritSharedSession(
4447
session: IdentifiedSession,
@@ -78,7 +81,7 @@ export const SessionStateGraph = {
7881
) => {
7982
const id = `session-${generateId()}`;
8083
const telemetry = createSessionTelemetryInfo(id, to, from);
81-
const sendBuffer: Array<OpaqueTransportMessage> = [];
84+
const sendBuffer: Array<EncodedTransportMessage> = [];
8285

8386
const session = new SessionNoConnection({
8487
listeners,

transport/transport.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,7 @@ export interface DeleteSessionOptions {
3838
unhealthy: boolean;
3939
}
4040

41-
export type SessionBoundSendFn = (
42-
msg: PartialTransportMessage,
43-
) => string | undefined;
41+
export type SessionBoundSendFn = (msg: PartialTransportMessage) => string;
4442

4543
/**
4644
* Transports manage the lifecycle (creation/deletion) of sessions

0 commit comments

Comments
 (0)