Conversation
📝 WalkthroughSummary by CodeRabbit
WalkthroughAdds a Supabase Realtime destination and wiring across crates: a new Sequence Diagram(s)sequenceDiagram
participant Replicator
participant RealtimeDestination
participant ConnectionManager
participant WebSocket
participant SupabaseRT
Replicator->>RealtimeDestination: write_events(events)
RealtimeDestination->>RealtimeDestination: update schema cache / build topic & payload
RealtimeDestination->>RealtimeDestination: ensure_joined(topic)
RealtimeDestination->>ConnectionManager: try_send(join_message)
alt connected
ConnectionManager->>WebSocket: send(join_message)
else not connected
ConnectionManager->>ConnectionManager: ensure_connected()
ConnectionManager->>WebSocket: connect + auth
WebSocket->>SupabaseRT: WebSocket handshake
SupabaseRT->>WebSocket: connected
ConnectionManager->>RealtimeDestination: notify generation change
ConnectionManager->>WebSocket: send(join_message)
end
WebSocket->>SupabaseRT: phx_join
SupabaseRT->>WebSocket: join ack
RealtimeDestination->>ConnectionManager: try_send(broadcast_message)
ConnectionManager->>WebSocket: send(broadcast_message)
WebSocket->>SupabaseRT: broadcast CDC event
par heartbeat
RealtimeDestination->>ConnectionManager: send(heartbeat)
ConnectionManager->>WebSocket: send(heartbeat)
end
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@etl-destinations/src/realtime/connection.rs`:
- Around line 99-110: The retry loop currently calls
self.connection.as_mut().expect("just ensured connected") which can panic if a
prior reconnect() failed; change the loop to avoid expect by checking
self.connection before attempting send: replace the expect usage with an if let
Some(conn) = self.connection.as_mut() { attempt send(conn.send(message).await)
... } else { try to reconnect (await self.reconnect()), log any reconnect error
into last_err, sleep backoff and continue }, ensuring reconnect() success sets
self.connection before next send attempt and that failures simply continue the
retry loop without panicking; reference functions/fields: self.connection,
reconnect(), send(), max_retries, last_err, backoff_duration, attempt.
In `@etl-destinations/src/realtime/core.rs`:
- Line 14: The function write_table_rows currently returns success without
emitting rows, causing silent data loss (and the unused-import warning for
PROCESSING_TYPE_TABLE_COPY); update write_table_rows to fail fast until
table-copy emission is implemented—return an Err (or propagate a clear error)
indicating table-copy is unsupported rather than returning Ok, and ensure
PROCESSING_TYPE_TABLE_COPY is referenced or the import removed; locate
write_table_rows and adjust its return path to return a descriptive error (and
keep related symbols like PROCESSING_TYPE_TABLE_COPY and log_processed_bytes in
sync).
ℹ️ Review info
Configuration used: Central YAML (base), Organization UI (inherited)
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Disabled knowledge base sources:
- Linear integration is disabled
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (9)
etl-config/src/shared/destination.rsetl-destinations/Cargo.tomletl-destinations/src/lib.rsetl-destinations/src/realtime/connection.rsetl-destinations/src/realtime/core.rsetl-destinations/src/realtime/encoding.rsetl-destinations/src/realtime/mod.rsetl-replicator/Cargo.tomletl-replicator/src/core.rs
| use tracing::{debug, warn}; | ||
|
|
||
| #[cfg(feature = "egress")] | ||
| use crate::egress::{PROCESSING_TYPE_STREAMING, PROCESSING_TYPE_TABLE_COPY, log_processed_bytes}; |
There was a problem hiding this comment.
Do not silently drop table-copy rows in write_table_rows
write_table_rows currently returns success without emitting anything. If snapshot/table-copy calls this path, rows are lost silently. This also explains the current CI failure at Line 14 (PROCESSING_TYPE_TABLE_COPY imported but never used).
Proposed fix (fail fast until table-copy emission is implemented)
@@
#[cfg(feature = "egress")]
-use crate::egress::{PROCESSING_TYPE_STREAMING, PROCESSING_TYPE_TABLE_COPY, log_processed_bytes};
+use crate::egress::{PROCESSING_TYPE_STREAMING, log_processed_bytes};
@@
async fn write_table_rows(
&self,
_table_id: TableId,
- _table_rows: Vec<TableRow>,
+ table_rows: Vec<TableRow>,
) -> EtlResult<()> {
- Ok(())
+ if table_rows.is_empty() {
+ return Ok(());
+ }
+ Err(etl_error!(
+ ErrorKind::DestinationIoError,
+ "realtime destination table copy is not implemented",
+ "write_table_rows was called with non-empty snapshot rows"
+ ))
}As per coding guidelines, “Highlight only issues that could cause runtime errors, data loss, or severe maintainability issues.”
Also applies to: 177-183
🧰 Tools
🪛 GitHub Check: Lint (clippy)
[failure] 14-14:
unused import: PROCESSING_TYPE_TABLE_COPY
🪛 GitHub Check: Tests (Full) (15)
[failure] 14-14:
unused import: PROCESSING_TYPE_TABLE_COPY
🪛 GitHub Check: Tests (Full) (16)
[failure] 14-14:
unused import: PROCESSING_TYPE_TABLE_COPY
🪛 GitHub Check: Tests (Full) (17)
[failure] 14-14:
unused import: PROCESSING_TYPE_TABLE_COPY
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@etl-destinations/src/realtime/core.rs` at line 14, The function
write_table_rows currently returns success without emitting rows, causing silent
data loss (and the unused-import warning for PROCESSING_TYPE_TABLE_COPY); update
write_table_rows to fail fast until table-copy emission is implemented—return an
Err (or propagate a clear error) indicating table-copy is unsupported rather
than returning Ok, and ensure PROCESSING_TYPE_TABLE_COPY is referenced or the
import removed; locate write_table_rows and adjust its return path to return a
descriptive error (and keep related symbols like PROCESSING_TYPE_TABLE_COPY and
log_processed_bytes in sync).
| let heartbeat_task = tokio::spawn(async move { | ||
| let mut ticker = interval(HEARTBEAT_INTERVAL); | ||
| ticker.tick().await; // skip the immediate first tick | ||
| loop { | ||
| ticker.tick().await; | ||
| let mut guard = inner_clone.lock().await; | ||
| let msg = build_heartbeat_message(); | ||
| if let Err(e) = guard.connection.send_with_retry(&msg).await { | ||
| warn!(error = %e, "heartbeat send failed"); | ||
| } | ||
| } | ||
| }); |
There was a problem hiding this comment.
Might be better to use https://docs.rs/tokio-stream/latest/tokio_stream/wrappers/struct.IntervalStream.html instead and set a https://docs.rs/tokio/1.49.0/tokio/time/struct.Interval.html#method.set_missed_tick_behavior properly to skip the interval if a send is too long for example
| delete_event: String, | ||
| max_retries: u32, | ||
| ) -> Self { | ||
| let inner = Arc::new(Mutex::new(RealtimeInner::new(url, api_key, max_retries))); |
There was a problem hiding this comment.
I'm concerned about having a Mutex here. Especially because it can lock for a long time since we have retry in place
There was a problem hiding this comment.
I think joined_generation can be an Atomic, joined_topics wrapped into RwLock and we could just wrap schema_cache into an Arc. Doing this we would only have to lock joined_topics when needed and it would only be locked on writes and not reads
There was a problem hiding this comment.
for connection_manager I think the only reason it's mutable in different calls is to increment or update generation which can also be an Atomic
| _table_id: TableId, | ||
| _table_rows: Vec<TableRow>, | ||
| ) -> EtlResult<()> { | ||
| Ok(()) |
There was a problem hiding this comment.
Is it intentional to not implement this ? Don't we need a mode where this is enabled or not ?
There was a problem hiding this comment.
I'm wondering if for all these encoding functions we can just return json serialized in bytes. Do we need to manipulate the json afterwards ? If not then we don't need a serde_json::Value
There was a problem hiding this comment.
also all these functions can just expose pub(super)
etl-destinations/src/realtime/mod.rs
Outdated
| pub use core::RealtimeDestination; | ||
|
|
||
| pub const DEFAULT_CHANNEL_PREFIX: &str = "etl"; | ||
| pub const DEFAULT_MAX_RETRIES: u32 = 5; |
etl-config/src/shared/destination.rs
Outdated
| /// Broadcast event name for INSERT operations. Defaults to `"insert"`. | ||
| #[serde(default = "default_insert_event")] | ||
| insert_event: String, | ||
| /// Broadcast event name for UPDATE operations. Defaults to `"update"`. | ||
| #[serde(default = "default_update_event")] | ||
| update_event: String, | ||
| /// Broadcast event name for DELETE operations. Defaults to `"delete"`. | ||
| #[serde(default = "default_delete_event")] | ||
| delete_event: String, |
There was a problem hiding this comment.
I'm not sure to understand why it should be configurable ? What would be the use case ?
| // Detach — the AbortHandle keeps control; the task runs independently. | ||
| drop(heartbeat_task); |
There was a problem hiding this comment.
Kind of useless as it will be dropped once this function will return
iambriccardo
left a comment
There was a problem hiding this comment.
Left some comments, thanks for contributing!
| .map_err(|e| { | ||
| etl_error!( | ||
| ErrorKind::DestinationIoError, | ||
| "failed to send message to Realtime", |
There was a problem hiding this comment.
nit: errors start with uppercase, logs are all lowercase.
etl-destinations/Cargo.toml
Outdated
| serde = { workspace = true, optional = true, features = ["derive"] } | ||
| serde_json = { workspace = true, optional = true } | ||
| tokio = { workspace = true, optional = true, features = ["sync"] } | ||
| tokio-tungstenite = { version = "0.26", optional = true, features = ["rustls-tls-native-roots"] } |
There was a problem hiding this comment.
Let's add this at the workspace level and inherit it.
etl-destinations/Cargo.toml
Outdated
| base64 = { workspace = true, optional = true } | ||
| base64 = { workspace = true, optional = true, features = ["std"] } | ||
| chrono = { workspace = true } | ||
| futures = { version = "0.3", optional = true } |
etl-config/src/shared/destination.rs
Outdated
| #[serde(flatten)] | ||
| config: IcebergConfigWithoutSecrets, | ||
| }, | ||
| Realtime { |
There was a problem hiding this comment.
I would maybe call this SupabaseRealtime to avoid clashes with future names.
a26d7d1 to
0f6de2a
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@etl-destinations/src/realtime/core.rs`:
- Around line 157-457: The impl Destination for RealtimeDestination block must
be placed before the #[cfg(test)] mod tests to avoid "items after a test module"
clippy failure; locate the impl Destination for RealtimeDestination declaration
and cut/paste that entire impl block so it appears above the tests module (i.e.,
before the line starting with #[cfg(test)] mod tests), ensuring no other item
order is changed and keeping the tests module intact.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Central YAML (base), Organization UI (inherited)
Review profile: CHILL
Plan: Pro
Run ID: 83d4ce2f-864f-438c-93b6-ae87a8ffbfb7
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (10)
Cargo.tomletl-config/src/shared/destination.rsetl-destinations/Cargo.tomletl-destinations/src/lib.rsetl-destinations/src/realtime/connection.rsetl-destinations/src/realtime/core.rsetl-destinations/src/realtime/encoding.rsetl-destinations/src/realtime/mod.rsetl-replicator/Cargo.tomletl-replicator/src/core.rs
🚧 Files skipped from review as they are similar to previous changes (3)
- etl-destinations/src/realtime/mod.rs
- etl-replicator/Cargo.toml
- etl-replicator/src/core.rs
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@etl-destinations/src/realtime/core.rs`:
- Around line 61-80: send_with_retry currently reconnects and resends only the
broadcast frame, which can end up using a fresh socket that hasn't run phx_join;
update send_with_retry to, after a successful reconnect (the reconnect() call on
ConnectionManager) and before the next try_send, call the join path
(ensure_joined for the current generation/topic) so the socket re-enters the
topic for that generation before resending the broadcast; apply the same fix to
the similar retry block around lines 122-155 by invoking ensure_joined (or the
ConnectionManager method that performs phx_join for the stored generation) on
reconnect success and only then retrying try_send.
In `@etl-destinations/src/realtime/encoding.rs`:
- Around line 195-202: build_join_message currently always sends a static join
config and omits config.private, so subscriptions to private topics built by
build_topic (which uses the private_channels flag to prefix topics with
"realtime:private:") are rejected; fix by making build_join_message include
config: { private: true } when subscribing to private channels—either detect the
"realtime:private:" prefix on the topic string inside build_join_message or
change build_join_message's signature to accept the private_channels boolean and
propagate it from callers, then update all call sites to pass the flag so
private subscriptions include the required "private": true in the join payload.
- Around line 29-35: The code uses i64::abs() (which can panic for i64::MIN) and
the wrong JS safe-integer boundary (uses 2^53 instead of 2^53-1); update both
occurrences (the Cell::I64 handling blocks around the shown diff and the similar
block at lines ~118-124) to use a correct MAX_SAFE constant (const MAX_SAFE: i64
= (1i64 << 53) - 1) and perform a range check without calling abs(), e.g. if v <
-MAX_SAFE || v > MAX_SAFE { Value::String(v.to_string()) } else {
Value::Number((*v).into()) }, so negative min values are handled safely and only
truly safe integers become numbers.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Central YAML (base), Organization UI (inherited)
Review profile: CHILL
Plan: Pro
Run ID: 16bbb0b9-e5d7-4d49-92a6-56f37f10c0a2
📒 Files selected for processing (5)
Cargo.tomletl-destinations/src/realtime/connection.rsetl-destinations/src/realtime/core.rsetl-destinations/src/realtime/encoding.rsetl-replicator/src/core.rs
| async fn send_with_retry( | ||
| connection: &Mutex<ConnectionManager>, | ||
| msg: &str, | ||
| max_retries: u32, | ||
| ) -> EtlResult<()> { | ||
| let mut last_err = None; | ||
| for attempt in 0..=max_retries { | ||
| match connection.lock().await.try_send(msg).await { | ||
| Ok(()) => return Ok(()), | ||
| Err(e) => { | ||
| warn!(attempt, error = %e, "send failed, reconnecting"); | ||
| last_err = Some(e); | ||
| sleep(backoff_duration(attempt)).await; | ||
| if let Err(e) = connection.lock().await.reconnect().await { | ||
| warn!(error = %e, "reconnect failed"); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| Err(last_err.expect("loop ran at least once")) |
There was a problem hiding this comment.
Reconnects can bypass phx_join.
ensure_joined() records the topic before the join is durable, and send_with_retry() retries only the broadcast frame. If the socket reconnects after the join attempt or during the broadcast retry loop, the retry is sent on a fresh connection that never rejoined that topic, so the event can be dropped silently. The retry path needs to rerun phx_join for the current generation before resending the broadcast.
Also applies to: 122-155
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@etl-destinations/src/realtime/core.rs` around lines 61 - 80, send_with_retry
currently reconnects and resends only the broadcast frame, which can end up
using a fresh socket that hasn't run phx_join; update send_with_retry to, after
a successful reconnect (the reconnect() call on ConnectionManager) and before
the next try_send, call the join path (ensure_joined for the current
generation/topic) so the socket re-enters the topic for that generation before
resending the broadcast; apply the same fix to the similar retry block around
lines 122-155 by invoking ensure_joined (or the ConnectionManager method that
performs phx_join for the stored generation) on reconnect success and only then
retrying try_send.
| Cell::I64(v) => { | ||
| const MAX_SAFE: i64 = 1 << 53; | ||
| if v.abs() > MAX_SAFE { | ||
| Value::String(v.to_string()) | ||
| } else { | ||
| Value::Number((*v).into()) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In Rust, what happens when calling i64::abs()oni64::MIN, and what is JavaScript Number.MAX_SAFE_INTEGER?
💡 Result:
-
Rust (
i64::abs()oni64::MIN): it overflows because|i64::MIN|can’t fit in ani64. With overflow checks (typically debug builds) it panics; with overflow checks disabled (typically release/optimized builds) it wraps and returnsi64::MIN. [1] -
JavaScript (
Number.MAX_SAFE_INTEGER):9007199254740991(which is 2^53 − 1). [2]
Sources: [1] Rust std docs for i64::abs overflow behavior, [2] MDN Number.MAX_SAFE_INTEGER.
Fix i64::abs() overflow and incorrect JavaScript safe-integer boundary.
i64::abs() panics on i64::MIN in debug builds and wraps in release builds. Additionally, JavaScript's Number.MAX_SAFE_INTEGER is 2^53 - 1 (9007199254740991), but the code uses 2^53 (9007199254740992), which allows unsafe values to be serialized as numbers. This affects both lines 29-35 and 118-124.
Use unsigned_abs() or a range check instead of abs(). The suggested fix:
Proposed fix
- const MAX_SAFE: i64 = 1 << 53;
- if v.abs() > MAX_SAFE {
+ const MAX_SAFE: i64 = (1 << 53) - 1;
+ if !(-MAX_SAFE..=MAX_SAFE).contains(v) {
Value::String(v.to_string())
} else {
Value::Number((*v).into())
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| Cell::I64(v) => { | |
| const MAX_SAFE: i64 = 1 << 53; | |
| if v.abs() > MAX_SAFE { | |
| Value::String(v.to_string()) | |
| } else { | |
| Value::Number((*v).into()) | |
| } | |
| Cell::I64(v) => { | |
| const MAX_SAFE: i64 = (1 << 53) - 1; | |
| if !(-MAX_SAFE..=MAX_SAFE).contains(v) { | |
| Value::String(v.to_string()) | |
| } else { | |
| Value::Number((*v).into()) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@etl-destinations/src/realtime/encoding.rs` around lines 29 - 35, The code
uses i64::abs() (which can panic for i64::MIN) and the wrong JS safe-integer
boundary (uses 2^53 instead of 2^53-1); update both occurrences (the Cell::I64
handling blocks around the shown diff and the similar block at lines ~118-124)
to use a correct MAX_SAFE constant (const MAX_SAFE: i64 = (1i64 << 53) - 1) and
perform a range check without calling abs(), e.g. if v < -MAX_SAFE || v >
MAX_SAFE { Value::String(v.to_string()) } else { Value::Number((*v).into()) },
so negative min values are handled safely and only truly safe integers become
numbers.
| pub(super) fn build_topic(table_name: &TableName, private_channels: bool) -> String { | ||
| let channel = format!("{CHANNEL_PREFIX}:{}.{}", table_name.schema, table_name.name); | ||
| if private_channels { | ||
| format!("realtime:private:{channel}") | ||
| } else { | ||
| format!("realtime:{channel}") | ||
| } | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
According to the current Supabase Realtime authorization docs, how should a client join a private broadcast channel? Is config.private: true part of the join config?
💡 Result:
To join a private (authorized) Broadcast channel, the client should create the channel with the channel options including config: { private: true }, then call .subscribe():
const channel = supabase.channel('room-1', {
config: { private: true },
})
channel.subscribe()Yes—per the current Realtime Authorization docs, config.private: true is part of the channel “join” configuration (the options you pass when instantiating the channel), not something separate you pass to .subscribe(). [1]
🏁 Script executed:
cd etl-destinations && head -n 230 src/realtime/encoding.rs | tail -n 40Repository: supabase/etl
Length of output: 1423
🏁 Script executed:
cd etl-destinations && rg -n "build_join_message" src/realtime/Repository: supabase/etl
Length of output: 414
🏁 Script executed:
cd etl-destinations && sed -n '130,145p' src/realtime/core.rsRepository: supabase/etl
Length of output: 524
🏁 Script executed:
cd etl-destinations && sed -n '100,145p' src/realtime/core.rsRepository: supabase/etl
Length of output: 1752
🏁 Script executed:
cd etl-destinations && rg -n "ensure_joined" src/realtime/ -A 2 -B 2Repository: supabase/etl
Length of output: 827
🏁 Script executed:
cd etl-destinations && sed -n '145,170p' src/realtime/core.rsRepository: supabase/etl
Length of output: 774
🏁 Script executed:
cd etl-destinations && rg -n "build_topic\|broadcast\(" src/realtime/ -B 3 | head -80Repository: supabase/etl
Length of output: 38
🏁 Script executed:
cd etl-destinations && rg -n "\.broadcast\|build_topic" src/realtime/core.rs -B 2 -A 2Repository: supabase/etl
Length of output: 38
🏁 Script executed:
cd etl-destinations && rg -n "broadcast\(" src/realtime/core.rsRepository: supabase/etl
Length of output: 617
🏁 Script executed:
cd etl-destinations && sed -n '180,190p' src/realtime/core.rsRepository: supabase/etl
Length of output: 449
Private-channel authorization incomplete: config.private missing from join payload.
The private_channels flag correctly changes the topic string to realtime:private:..., but build_join_message() always sends the same static join config without "private": true. According to Supabase Realtime authorization docs, private broadcast channels require config: { private: true } in the channel join configuration. Without this field, the server will reject private-channel subscriptions as unauthorized, making the feature non-functional.
Either detect the "private:" prefix in the topic string and conditionally add "private": true to the config, or pass the private_channels flag to build_join_message() and update the function signature accordingly.
Applies to: build_join_message() (lines 205–219) and all call sites.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@etl-destinations/src/realtime/encoding.rs` around lines 195 - 202,
build_join_message currently always sends a static join config and omits
config.private, so subscriptions to private topics built by build_topic (which
uses the private_channels flag to prefix topics with "realtime:private:") are
rejected; fix by making build_join_message include config: { private: true }
when subscribing to private channels—either detect the "realtime:private:"
prefix on the topic string inside build_join_message or change
build_join_message's signature to accept the private_channels boolean and
propagate it from callers, then update all call sites to pass the flag so
private subscriptions include the required "private": true in the join payload.
What kind of change does this PR introduce?
Add realtime destination