Skip to content

feat: Python client implementation for River protocol v2.0#356

Draft
Monkatraz wants to merge 29 commits intomainfrom
python-client
Draft

feat: Python client implementation for River protocol v2.0#356
Monkatraz wants to merge 29 commits intomainfrom
python-client

Conversation

@Monkatraz
Copy link
Contributor

@Monkatraz Monkatraz commented Feb 19, 2026

Why

Add a Python client for the River protocol, enabling Python applications to call River services. This is a clean room implementation referencing the TypeScript client and PROTOCOL.md.

Slack thread: https://slack.com/archives/C0AEE1W8E3X/p1771490314906739

What changed

Added python-client/ directory containing a complete River v2.0 client implementation in Python.

Core modules:

  • river/types.py — Transport message types, control flags, handshake payloads, result types, ID generation
  • river/codec.py — NaiveJsonCodec (JSON) and BinaryCodec (msgpack) with CodecMessageAdapter
  • river/streams.py — Async Readable/Writable stream abstractions for procedure I/O
  • river/session.py — Session state machine with seq/ack bookkeeping, send buffer, heartbeat echo, grace periods
  • river/transport.py — WebSocketClientTransport with connection lifecycle, handshake, reconnection with exponential backoff, message dispatch
  • river/client.py — RiverClient providing rpc(), stream(), upload(), subscribe() APIs
  • river/codegen/ — Code generator producing typed Python client stubs from River service schemas

Protocol features implemented:

  • All 4 procedure types (rpc, stream, upload, subscription)
  • Handshake with protocol version negotiation and custom metadata
  • Seq/ack bookkeeping with exactly-once delivery semantics
  • Send buffer for transparent reconnection (message retransmission)
  • Heartbeat echo (client echoes server heartbeats)
  • Session grace period with disconnect detection
  • Exponential backoff with jitter for reconnection (LeakyBucketRateLimit)
  • Stream lifecycle (open/close/cancel control flags)
  • Client-initiated cancellation via abort signals
  • Service-level errors vs protocol errors (UNEXPECTED_DISCONNECT, CANCEL, UNCAUGHT_ERROR)

Test suite (168 tests, all passing):

  • test_e2e.py (57 tests) — E2E tests against a TS server: rpc, stream, upload, subscription, fallible procedures, concurrent operations, disconnect handling, cancellation, idempotent close, transparent reconnect, Readable/Writable edge cases, codec unit tests, listener cleanup
  • test_equivalence.py (64 tests) — Cross-codec parametrized equivalence tests running every scenario against both NaiveJsonCodec and BinaryCodec (each codec paired with a matching TS server instance). Covers RPC, stream, upload, subscription, cancellation, disconnect, and ordering
  • test_handshake.py (4 tests) — Custom handshake metadata: valid token, invalid token (protocolError event), missing metadata, metadata resent across reconnect. Uses a dedicated TS server with handshake validation
  • test_session.py (6 tests) — Deterministic session lifecycle with short timeouts: heartbeat miss → NO_CONNECTION, active RPCs keep connection alive, grace period expiry destroys session, reconnect within grace preserves session, retry budget backoff increases, budget restores after success
  • test_codegen.py (23 tests) — Schema conversion, generated imports, typed client live tests

Test infrastructure:

  • test_server.ts — 6 services (test, ordering, fallible, subscribable, uploadable, cancellation) with RIVER_CODEC env var for binary codec support
  • test_server_handshake.ts — Server with {token: string} handshake validation
  • extract_test_schema.ts — Schema extraction for codegen tests
  • conftest.py — Session-scoped server fixtures (JSON, binary, handshake), codec_and_url parametrized fixture, esbuild bundling

Running the tests:

cd python-client
pip install -e ".[dev]"
pytest tests/ -v

Versioning

  • Breaking protocol change
  • Breaking ts/js API change

Clean room implementation of a River client in Python, referencing
the TypeScript client and PROTOCOL.md. Includes full support for
all four procedure types (rpc, stream, upload, subscription),
transparent reconnection, heartbeat echo, and seq/ack bookkeeping.

40 tests passing against the TypeScript server.
@Monkatraz Monkatraz added the zergling-authored PRs authored by Zerg label Feb 19, 2026
Monkatraz and others added 28 commits February 19, 2026 09:20
Replace the hand-written .mjs test server with the original .ts source.
conftest.py now runs esbuild at test session start to bundle
test_server.ts -> test_server.mjs (gitignored build artifact).

This avoids tsx/ts-node runtime module resolution issues with the
river repo's bundler-style tsconfig while keeping the test server
as authored TypeScript.
…se, and Readable edge cases

- Add CancellationService to test server with blocking + immediate handlers
- Add client-initiated cancellation tests for all 4 proc types (rpc, stream, upload, subscription)
- Add idempotent close tests (stream, subscription) and cancellation-after-transport-close
- Add eagerly-connect E2E test
- Add 17 new Readable unit tests: locking semantics, eager iteration, collect-waits-for-close, break variants
- Fix Readable to enforce locking (TypeError on double __aiter__/collect)
- Replace async generator iterator with _ReadableIterator class for synchronous break cleanup
- Fix break() to be no-op when stream already closed with empty queue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

zergling-authored PRs authored by Zerg

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants