Skip to content

feat: add realtime destination#615

Open
filipecabaco wants to merge 4 commits intomainfrom
feat/realtime-destination
Open

feat: add realtime destination#615
filipecabaco wants to merge 4 commits intomainfrom
feat/realtime-destination

Conversation

@filipecabaco
Copy link
Copy Markdown
Member

What kind of change does this PR introduce?

Add realtime destination

@filipecabaco filipecabaco requested a review from a team as a code owner February 26, 2026 16:08
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Feb 26, 2026

📝 Walkthrough

Summary by CodeRabbit

  • New Features

    • Added an opt-in Realtime destination to broadcast DB changes to Supabase Realtime channels with automatic connection management, reconnects, joins, and heartbeat.
    • Configurable URL, secret API key, private/public channels, max retry behavior, and heartbeat interval.
    • Produces Phoenix v2-compatible topics, join/heartbeat/broadcast messages and robust JSON payloads for row/cell types.
  • Tests

    • Added unit and integration tests for connection lifecycle, backoff/retry, message ordering, joins, and serialization.
  • Chores

    • Introduced an optional "realtime" feature flag and required dependency updates.

Walkthrough

Adds a Supabase Realtime destination and wiring across crates: a new SupabaseRealtime variant on destination config (url, api_key secret, private_channels, max_retries), an optional realtime feature and module in etl-destinations, a ConnectionManager with lazy connect/reconnect and send/close APIs, encoding helpers for Phoenix v2 messages, and RealtimeDestination implementing Destination with schema cache, phx_join tracking, broadcast/heartbeat logic, retry/backoff, and shutdown. The replicator builds and runs the RealtimeDestination when configured.

Sequence Diagram(s)

sequenceDiagram
    participant Replicator
    participant RealtimeDestination
    participant ConnectionManager
    participant WebSocket
    participant SupabaseRT

    Replicator->>RealtimeDestination: write_events(events)
    RealtimeDestination->>RealtimeDestination: update schema cache / build topic & payload
    RealtimeDestination->>RealtimeDestination: ensure_joined(topic)
    RealtimeDestination->>ConnectionManager: try_send(join_message)

    alt connected
        ConnectionManager->>WebSocket: send(join_message)
    else not connected
        ConnectionManager->>ConnectionManager: ensure_connected()
        ConnectionManager->>WebSocket: connect + auth
        WebSocket->>SupabaseRT: WebSocket handshake
        SupabaseRT->>WebSocket: connected
        ConnectionManager->>RealtimeDestination: notify generation change
        ConnectionManager->>WebSocket: send(join_message)
    end

    WebSocket->>SupabaseRT: phx_join
    SupabaseRT->>WebSocket: join ack

    RealtimeDestination->>ConnectionManager: try_send(broadcast_message)
    ConnectionManager->>WebSocket: send(broadcast_message)
    WebSocket->>SupabaseRT: broadcast CDC event

    par heartbeat
        RealtimeDestination->>ConnectionManager: send(heartbeat)
        ConnectionManager->>WebSocket: send(heartbeat)
    end
Loading

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@etl-destinations/src/realtime/connection.rs`:
- Around line 99-110: The retry loop currently calls
self.connection.as_mut().expect("just ensured connected") which can panic if a
prior reconnect() failed; change the loop to avoid expect by checking
self.connection before attempting send: replace the expect usage with an if let
Some(conn) = self.connection.as_mut() { attempt send(conn.send(message).await)
... } else { try to reconnect (await self.reconnect()), log any reconnect error
into last_err, sleep backoff and continue }, ensuring reconnect() success sets
self.connection before next send attempt and that failures simply continue the
retry loop without panicking; reference functions/fields: self.connection,
reconnect(), send(), max_retries, last_err, backoff_duration, attempt.

In `@etl-destinations/src/realtime/core.rs`:
- Line 14: The function write_table_rows currently returns success without
emitting rows, causing silent data loss (and the unused-import warning for
PROCESSING_TYPE_TABLE_COPY); update write_table_rows to fail fast until
table-copy emission is implemented—return an Err (or propagate a clear error)
indicating table-copy is unsupported rather than returning Ok, and ensure
PROCESSING_TYPE_TABLE_COPY is referenced or the import removed; locate
write_table_rows and adjust its return path to return a descriptive error (and
keep related symbols like PROCESSING_TYPE_TABLE_COPY and log_processed_bytes in
sync).

ℹ️ Review info

Configuration used: Central YAML (base), Organization UI (inherited)

Review profile: CHILL

Plan: Pro

Cache: Disabled due to Reviews > Disable Cache setting

Disabled knowledge base sources:

  • Linear integration is disabled

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between e880d4d and 74dd951.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (9)
  • etl-config/src/shared/destination.rs
  • etl-destinations/Cargo.toml
  • etl-destinations/src/lib.rs
  • etl-destinations/src/realtime/connection.rs
  • etl-destinations/src/realtime/core.rs
  • etl-destinations/src/realtime/encoding.rs
  • etl-destinations/src/realtime/mod.rs
  • etl-replicator/Cargo.toml
  • etl-replicator/src/core.rs

use tracing::{debug, warn};

#[cfg(feature = "egress")]
use crate::egress::{PROCESSING_TYPE_STREAMING, PROCESSING_TYPE_TABLE_COPY, log_processed_bytes};
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Do not silently drop table-copy rows in write_table_rows

write_table_rows currently returns success without emitting anything. If snapshot/table-copy calls this path, rows are lost silently. This also explains the current CI failure at Line 14 (PROCESSING_TYPE_TABLE_COPY imported but never used).

Proposed fix (fail fast until table-copy emission is implemented)
@@
 #[cfg(feature = "egress")]
-use crate::egress::{PROCESSING_TYPE_STREAMING, PROCESSING_TYPE_TABLE_COPY, log_processed_bytes};
+use crate::egress::{PROCESSING_TYPE_STREAMING, log_processed_bytes};
@@
     async fn write_table_rows(
         &self,
         _table_id: TableId,
-        _table_rows: Vec<TableRow>,
+        table_rows: Vec<TableRow>,
     ) -> EtlResult<()> {
-        Ok(())
+        if table_rows.is_empty() {
+            return Ok(());
+        }
+        Err(etl_error!(
+            ErrorKind::DestinationIoError,
+            "realtime destination table copy is not implemented",
+            "write_table_rows was called with non-empty snapshot rows"
+        ))
     }

As per coding guidelines, “Highlight only issues that could cause runtime errors, data loss, or severe maintainability issues.”

Also applies to: 177-183

🧰 Tools
🪛 GitHub Check: Lint (clippy)

[failure] 14-14:
unused import: PROCESSING_TYPE_TABLE_COPY

🪛 GitHub Check: Tests (Full) (15)

[failure] 14-14:
unused import: PROCESSING_TYPE_TABLE_COPY

🪛 GitHub Check: Tests (Full) (16)

[failure] 14-14:
unused import: PROCESSING_TYPE_TABLE_COPY

🪛 GitHub Check: Tests (Full) (17)

[failure] 14-14:
unused import: PROCESSING_TYPE_TABLE_COPY

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@etl-destinations/src/realtime/core.rs` at line 14, The function
write_table_rows currently returns success without emitting rows, causing silent
data loss (and the unused-import warning for PROCESSING_TYPE_TABLE_COPY); update
write_table_rows to fail fast until table-copy emission is implemented—return an
Err (or propagate a clear error) indicating table-copy is unsupported rather
than returning Ok, and ensure PROCESSING_TYPE_TABLE_COPY is referenced or the
import removed; locate write_table_rows and adjust its return path to return a
descriptive error (and keep related symbols like PROCESSING_TYPE_TABLE_COPY and
log_processed_bytes in sync).

Comment on lines +127 to +138
let heartbeat_task = tokio::spawn(async move {
let mut ticker = interval(HEARTBEAT_INTERVAL);
ticker.tick().await; // skip the immediate first tick
loop {
ticker.tick().await;
let mut guard = inner_clone.lock().await;
let msg = build_heartbeat_message();
if let Err(e) = guard.connection.send_with_retry(&msg).await {
warn!(error = %e, "heartbeat send failed");
}
}
});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete_event: String,
max_retries: u32,
) -> Self {
let inner = Arc::new(Mutex::new(RealtimeInner::new(url, api_key, max_retries)));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned about having a Mutex here. Especially because it can lock for a long time since we have retry in place

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think joined_generation can be an Atomic, joined_topics wrapped into RwLock and we could just wrap schema_cache into an Arc. Doing this we would only have to lock joined_topics when needed and it would only be locked on writes and not reads

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for connection_manager I think the only reason it's mutable in different calls is to increment or update generation which can also be an Atomic

_table_id: TableId,
_table_rows: Vec<TableRow>,
) -> EtlResult<()> {
Ok(())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intentional to not implement this ? Don't we need a mode where this is enabled or not ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if for all these encoding functions we can just return json serialized in bytes. Do we need to manipulate the json afterwards ? If not then we don't need a serde_json::Value

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also all these functions can just expose pub(super)

pub use core::RealtimeDestination;

pub const DEFAULT_CHANNEL_PREFIX: &str = "etl";
pub const DEFAULT_MAX_RETRIES: u32 = 5;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be configurable ?

Comment on lines +62 to +70
/// Broadcast event name for INSERT operations. Defaults to `"insert"`.
#[serde(default = "default_insert_event")]
insert_event: String,
/// Broadcast event name for UPDATE operations. Defaults to `"update"`.
#[serde(default = "default_update_event")]
update_event: String,
/// Broadcast event name for DELETE operations. Defaults to `"delete"`.
#[serde(default = "default_delete_event")]
delete_event: String,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure to understand why it should be configurable ? What would be the use case ?

Comment on lines +140 to +141
// Detach — the AbortHandle keeps control; the task runs independently.
drop(heartbeat_task);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of useless as it will be dropped once this function will return

Copy link
Copy Markdown
Contributor

@iambriccardo iambriccardo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments, thanks for contributing!

.map_err(|e| {
etl_error!(
ErrorKind::DestinationIoError,
"failed to send message to Realtime",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: errors start with uppercase, logs are all lowercase.

serde = { workspace = true, optional = true, features = ["derive"] }
serde_json = { workspace = true, optional = true }
tokio = { workspace = true, optional = true, features = ["sync"] }
tokio-tungstenite = { version = "0.26", optional = true, features = ["rustls-tls-native-roots"] }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add this at the workspace level and inherit it.

base64 = { workspace = true, optional = true }
base64 = { workspace = true, optional = true, features = ["std"] }
chrono = { workspace = true }
futures = { version = "0.3", optional = true }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

#[serde(flatten)]
config: IcebergConfigWithoutSecrets,
},
Realtime {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would maybe call this SupabaseRealtime to avoid clashes with future names.

@filipecabaco filipecabaco force-pushed the feat/realtime-destination branch from a26d7d1 to 0f6de2a Compare March 6, 2026 18:08
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@etl-destinations/src/realtime/core.rs`:
- Around line 157-457: The impl Destination for RealtimeDestination block must
be placed before the #[cfg(test)] mod tests to avoid "items after a test module"
clippy failure; locate the impl Destination for RealtimeDestination declaration
and cut/paste that entire impl block so it appears above the tests module (i.e.,
before the line starting with #[cfg(test)] mod tests), ensuring no other item
order is changed and keeping the tests module intact.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Central YAML (base), Organization UI (inherited)

Review profile: CHILL

Plan: Pro

Run ID: 83d4ce2f-864f-438c-93b6-ae87a8ffbfb7

📥 Commits

Reviewing files that changed from the base of the PR and between a26d7d1 and 0f6de2a.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (10)
  • Cargo.toml
  • etl-config/src/shared/destination.rs
  • etl-destinations/Cargo.toml
  • etl-destinations/src/lib.rs
  • etl-destinations/src/realtime/connection.rs
  • etl-destinations/src/realtime/core.rs
  • etl-destinations/src/realtime/encoding.rs
  • etl-destinations/src/realtime/mod.rs
  • etl-replicator/Cargo.toml
  • etl-replicator/src/core.rs
🚧 Files skipped from review as they are similar to previous changes (3)
  • etl-destinations/src/realtime/mod.rs
  • etl-replicator/Cargo.toml
  • etl-replicator/src/core.rs

@coveralls
Copy link
Copy Markdown

coveralls commented Mar 6, 2026

Coverage Status

coverage: 78.048% (+0.2%) from 77.882%
when pulling 503bdf7 on feat/realtime-destination
into 6e46731 on main.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@etl-destinations/src/realtime/core.rs`:
- Around line 61-80: send_with_retry currently reconnects and resends only the
broadcast frame, which can end up using a fresh socket that hasn't run phx_join;
update send_with_retry to, after a successful reconnect (the reconnect() call on
ConnectionManager) and before the next try_send, call the join path
(ensure_joined for the current generation/topic) so the socket re-enters the
topic for that generation before resending the broadcast; apply the same fix to
the similar retry block around lines 122-155 by invoking ensure_joined (or the
ConnectionManager method that performs phx_join for the stored generation) on
reconnect success and only then retrying try_send.

In `@etl-destinations/src/realtime/encoding.rs`:
- Around line 195-202: build_join_message currently always sends a static join
config and omits config.private, so subscriptions to private topics built by
build_topic (which uses the private_channels flag to prefix topics with
"realtime:private:") are rejected; fix by making build_join_message include
config: { private: true } when subscribing to private channels—either detect the
"realtime:private:" prefix on the topic string inside build_join_message or
change build_join_message's signature to accept the private_channels boolean and
propagate it from callers, then update all call sites to pass the flag so
private subscriptions include the required "private": true in the join payload.
- Around line 29-35: The code uses i64::abs() (which can panic for i64::MIN) and
the wrong JS safe-integer boundary (uses 2^53 instead of 2^53-1); update both
occurrences (the Cell::I64 handling blocks around the shown diff and the similar
block at lines ~118-124) to use a correct MAX_SAFE constant (const MAX_SAFE: i64
= (1i64 << 53) - 1) and perform a range check without calling abs(), e.g. if v <
-MAX_SAFE || v > MAX_SAFE { Value::String(v.to_string()) } else {
Value::Number((*v).into()) }, so negative min values are handled safely and only
truly safe integers become numbers.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Central YAML (base), Organization UI (inherited)

Review profile: CHILL

Plan: Pro

Run ID: 16bbb0b9-e5d7-4d49-92a6-56f37f10c0a2

📥 Commits

Reviewing files that changed from the base of the PR and between 0f6de2a and 503bdf7.

📒 Files selected for processing (5)
  • Cargo.toml
  • etl-destinations/src/realtime/connection.rs
  • etl-destinations/src/realtime/core.rs
  • etl-destinations/src/realtime/encoding.rs
  • etl-replicator/src/core.rs

Comment on lines +61 to +80
async fn send_with_retry(
connection: &Mutex<ConnectionManager>,
msg: &str,
max_retries: u32,
) -> EtlResult<()> {
let mut last_err = None;
for attempt in 0..=max_retries {
match connection.lock().await.try_send(msg).await {
Ok(()) => return Ok(()),
Err(e) => {
warn!(attempt, error = %e, "send failed, reconnecting");
last_err = Some(e);
sleep(backoff_duration(attempt)).await;
if let Err(e) = connection.lock().await.reconnect().await {
warn!(error = %e, "reconnect failed");
}
}
}
}
Err(last_err.expect("loop ran at least once"))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Reconnects can bypass phx_join.

ensure_joined() records the topic before the join is durable, and send_with_retry() retries only the broadcast frame. If the socket reconnects after the join attempt or during the broadcast retry loop, the retry is sent on a fresh connection that never rejoined that topic, so the event can be dropped silently. The retry path needs to rerun phx_join for the current generation before resending the broadcast.

Also applies to: 122-155

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@etl-destinations/src/realtime/core.rs` around lines 61 - 80, send_with_retry
currently reconnects and resends only the broadcast frame, which can end up
using a fresh socket that hasn't run phx_join; update send_with_retry to, after
a successful reconnect (the reconnect() call on ConnectionManager) and before
the next try_send, call the join path (ensure_joined for the current
generation/topic) so the socket re-enters the topic for that generation before
resending the broadcast; apply the same fix to the similar retry block around
lines 122-155 by invoking ensure_joined (or the ConnectionManager method that
performs phx_join for the stored generation) on reconnect success and only then
retrying try_send.

Comment on lines +29 to +35
Cell::I64(v) => {
const MAX_SAFE: i64 = 1 << 53;
if v.abs() > MAX_SAFE {
Value::String(v.to_string())
} else {
Value::Number((*v).into())
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🌐 Web query:

In Rust, what happens when calling i64::abs()oni64::MIN, and what is JavaScript Number.MAX_SAFE_INTEGER?

💡 Result:

  • Rust (i64::abs() on i64::MIN): it overflows because |i64::MIN| can’t fit in an i64. With overflow checks (typically debug builds) it panics; with overflow checks disabled (typically release/optimized builds) it wraps and returns i64::MIN. [1]

  • JavaScript (Number.MAX_SAFE_INTEGER): 9007199254740991 (which is 2^53 − 1). [2]

Sources: [1] Rust std docs for i64::abs overflow behavior, [2] MDN Number.MAX_SAFE_INTEGER.


Fix i64::abs() overflow and incorrect JavaScript safe-integer boundary.

i64::abs() panics on i64::MIN in debug builds and wraps in release builds. Additionally, JavaScript's Number.MAX_SAFE_INTEGER is 2^53 - 1 (9007199254740991), but the code uses 2^53 (9007199254740992), which allows unsafe values to be serialized as numbers. This affects both lines 29-35 and 118-124.

Use unsigned_abs() or a range check instead of abs(). The suggested fix:

Proposed fix
-            const MAX_SAFE: i64 = 1 << 53;
-            if v.abs() > MAX_SAFE {
+            const MAX_SAFE: i64 = (1 << 53) - 1;
+            if !(-MAX_SAFE..=MAX_SAFE).contains(v) {
                 Value::String(v.to_string())
             } else {
                 Value::Number((*v).into())
             }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Cell::I64(v) => {
const MAX_SAFE: i64 = 1 << 53;
if v.abs() > MAX_SAFE {
Value::String(v.to_string())
} else {
Value::Number((*v).into())
}
Cell::I64(v) => {
const MAX_SAFE: i64 = (1 << 53) - 1;
if !(-MAX_SAFE..=MAX_SAFE).contains(v) {
Value::String(v.to_string())
} else {
Value::Number((*v).into())
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@etl-destinations/src/realtime/encoding.rs` around lines 29 - 35, The code
uses i64::abs() (which can panic for i64::MIN) and the wrong JS safe-integer
boundary (uses 2^53 instead of 2^53-1); update both occurrences (the Cell::I64
handling blocks around the shown diff and the similar block at lines ~118-124)
to use a correct MAX_SAFE constant (const MAX_SAFE: i64 = (1i64 << 53) - 1) and
perform a range check without calling abs(), e.g. if v < -MAX_SAFE || v >
MAX_SAFE { Value::String(v.to_string()) } else { Value::Number((*v).into()) },
so negative min values are handled safely and only truly safe integers become
numbers.

Comment on lines +195 to +202
pub(super) fn build_topic(table_name: &TableName, private_channels: bool) -> String {
let channel = format!("{CHANNEL_PREFIX}:{}.{}", table_name.schema, table_name.name);
if private_channels {
format!("realtime:private:{channel}")
} else {
format!("realtime:{channel}")
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🌐 Web query:

According to the current Supabase Realtime authorization docs, how should a client join a private broadcast channel? Is config.private: true part of the join config?

💡 Result:

To join a private (authorized) Broadcast channel, the client should create the channel with the channel options including config: { private: true }, then call .subscribe():

const channel = supabase.channel('room-1', {
  config: { private: true },
})

channel.subscribe()

Yes—per the current Realtime Authorization docs, config.private: true is part of the channel “join” configuration (the options you pass when instantiating the channel), not something separate you pass to .subscribe(). [1]


🏁 Script executed:

cd etl-destinations && head -n 230 src/realtime/encoding.rs | tail -n 40

Repository: supabase/etl

Length of output: 1423


🏁 Script executed:

cd etl-destinations && rg -n "build_join_message" src/realtime/

Repository: supabase/etl

Length of output: 414


🏁 Script executed:

cd etl-destinations && sed -n '130,145p' src/realtime/core.rs

Repository: supabase/etl

Length of output: 524


🏁 Script executed:

cd etl-destinations && sed -n '100,145p' src/realtime/core.rs

Repository: supabase/etl

Length of output: 1752


🏁 Script executed:

cd etl-destinations && rg -n "ensure_joined" src/realtime/ -A 2 -B 2

Repository: supabase/etl

Length of output: 827


🏁 Script executed:

cd etl-destinations && sed -n '145,170p' src/realtime/core.rs

Repository: supabase/etl

Length of output: 774


🏁 Script executed:

cd etl-destinations && rg -n "build_topic\|broadcast\(" src/realtime/ -B 3 | head -80

Repository: supabase/etl

Length of output: 38


🏁 Script executed:

cd etl-destinations && rg -n "\.broadcast\|build_topic" src/realtime/core.rs -B 2 -A 2

Repository: supabase/etl

Length of output: 38


🏁 Script executed:

cd etl-destinations && rg -n "broadcast\(" src/realtime/core.rs

Repository: supabase/etl

Length of output: 617


🏁 Script executed:

cd etl-destinations && sed -n '180,190p' src/realtime/core.rs

Repository: supabase/etl

Length of output: 449


Private-channel authorization incomplete: config.private missing from join payload.

The private_channels flag correctly changes the topic string to realtime:private:..., but build_join_message() always sends the same static join config without "private": true. According to Supabase Realtime authorization docs, private broadcast channels require config: { private: true } in the channel join configuration. Without this field, the server will reject private-channel subscriptions as unauthorized, making the feature non-functional.

Either detect the "private:" prefix in the topic string and conditionally add "private": true to the config, or pass the private_channels flag to build_join_message() and update the function signature accordingly.

Applies to: build_join_message() (lines 205–219) and all call sites.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@etl-destinations/src/realtime/encoding.rs` around lines 195 - 202,
build_join_message currently always sends a static join config and omits
config.private, so subscriptions to private topics built by build_topic (which
uses the private_channels flag to prefix topics with "realtime:private:") are
rejected; fix by making build_join_message include config: { private: true }
when subscribing to private channels—either detect the "realtime:private:"
prefix on the topic string inside build_join_message or change
build_join_message's signature to accept the private_channels boolean and
propagate it from callers, then update all call sites to pass the flag so
private subscriptions include the required "private": true in the join payload.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants