Skip to content

Commit a26d7d1

Browse files
committed
fix CI issues
1 parent 74dd951 commit a26d7d1

File tree

3 files changed

+45
-36
lines changed

3 files changed

+45
-36
lines changed

etl-destinations/src/realtime/core.rs

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,27 @@ use tokio::time::{Duration, interval};
1111
use tracing::{debug, warn};
1212

1313
#[cfg(feature = "egress")]
14-
use crate::egress::{PROCESSING_TYPE_STREAMING, PROCESSING_TYPE_TABLE_COPY, log_processed_bytes};
14+
use crate::egress::{PROCESSING_TYPE_STREAMING, log_processed_bytes};
1515

1616
use super::connection::ConnectionManager;
1717
use super::encoding::{
1818
build_broadcast_message, build_heartbeat_message, build_join_message, build_topic,
19-
delete_payload, insert_payload, table_row_to_json, truncate_payload,
20-
update_payload,
19+
delete_payload, insert_payload, table_row_to_json, truncate_payload, update_payload,
2120
};
2221

2322
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(20);
2423

24+
pub struct RealtimeConfig {
25+
pub url: String,
26+
pub api_key: String,
27+
pub channel_prefix: String,
28+
pub private_channels: bool,
29+
pub insert_event: String,
30+
pub update_event: String,
31+
pub delete_event: String,
32+
pub max_retries: u32,
33+
}
34+
2535
/// Internal mutable state, protected by a single `Mutex`.
2636
struct RealtimeInner {
2737
connection: ConnectionManager,
@@ -108,17 +118,12 @@ pub struct RealtimeDestination {
108118
}
109119

110120
impl RealtimeDestination {
111-
pub fn new(
112-
url: String,
113-
api_key: String,
114-
channel_prefix: String,
115-
private_channels: bool,
116-
insert_event: String,
117-
update_event: String,
118-
delete_event: String,
119-
max_retries: u32,
120-
) -> Self {
121-
let inner = Arc::new(Mutex::new(RealtimeInner::new(url, api_key, max_retries)));
121+
pub fn new(config: RealtimeConfig) -> Self {
122+
let inner = Arc::new(Mutex::new(RealtimeInner::new(
123+
config.url,
124+
config.api_key,
125+
config.max_retries,
126+
)));
122127

123128
// Spawn a background task that sends a heartbeat every 20 seconds.
124129
// The Realtime server closes the connection after ~25 seconds of silence,
@@ -141,11 +146,11 @@ impl RealtimeDestination {
141146
drop(heartbeat_task);
142147

143148
Self {
144-
channel_prefix,
145-
private_channels,
146-
insert_event,
147-
update_event,
148-
delete_event,
149+
channel_prefix: config.channel_prefix,
150+
private_channels: config.private_channels,
151+
insert_event: config.insert_event,
152+
update_event: config.update_event,
153+
delete_event: config.delete_event,
149154
inner,
150155
heartbeat_abort: Arc::new(abort_handle),
151156
}
@@ -200,7 +205,8 @@ impl Destination for RealtimeDestination {
200205
build_topic(&schema.name, &self.channel_prefix, self.private_channels);
201206
let record = table_row_to_json(&ins.table_row, &schema);
202207
let payload = insert_payload(&schema.name, record, ins.commit_lsn);
203-
bytes_sent += inner.broadcast(&topic, &self.insert_event, payload).await? as u64;
208+
bytes_sent +=
209+
inner.broadcast(&topic, &self.insert_event, payload).await? as u64;
204210
}
205211
Event::Update(upd) => {
206212
let schema = inner.get_schema(upd.table_id)?;
@@ -212,7 +218,8 @@ impl Destination for RealtimeDestination {
212218
.as_ref()
213219
.map(|(_, row)| table_row_to_json(row, &schema));
214220
let payload = update_payload(&schema.name, record, old_record, upd.commit_lsn);
215-
bytes_sent += inner.broadcast(&topic, &self.update_event, payload).await? as u64;
221+
bytes_sent +=
222+
inner.broadcast(&topic, &self.update_event, payload).await? as u64;
216223
}
217224
Event::Delete(del) => {
218225
let schema = inner.get_schema(del.table_id)?;
@@ -223,7 +230,8 @@ impl Destination for RealtimeDestination {
223230
.as_ref()
224231
.map(|(_, row)| table_row_to_json(row, &schema));
225232
let payload = delete_payload(&schema.name, old_record, del.commit_lsn);
226-
bytes_sent += inner.broadcast(&topic, &self.delete_event, payload).await? as u64;
233+
bytes_sent +=
234+
inner.broadcast(&topic, &self.delete_event, payload).await? as u64;
227235
}
228236
Event::Truncate(trunc) => {
229237
for rel_id in &trunc.rel_ids {
@@ -236,8 +244,9 @@ impl Destination for RealtimeDestination {
236244
self.private_channels,
237245
);
238246
let payload = truncate_payload(&schema.name, trunc.commit_lsn);
239-
bytes_sent +=
240-
inner.broadcast(&topic, &self.delete_event, payload).await? as u64;
247+
bytes_sent += inner
248+
.broadcast(&topic, &self.delete_event, payload)
249+
.await? as u64;
241250
}
242251
Err(e) => {
243252
warn!(table_id = %table_id, error = %e, "skipping truncate for unknown table");

etl-destinations/src/realtime/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ mod connection;
22
mod core;
33
pub mod encoding;
44

5-
pub use core::RealtimeDestination;
5+
pub use core::{RealtimeConfig, RealtimeDestination};
66

77
pub const DEFAULT_CHANNEL_PREFIX: &str = "etl";
88
pub const DEFAULT_MAX_RETRIES: u32 = 5;

etl-replicator/src/core.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use etl_destinations::iceberg::{
1919
use etl_destinations::{
2020
bigquery::BigQueryDestination,
2121
iceberg::{IcebergClient, IcebergDestination},
22-
realtime::{DEFAULT_MAX_RETRIES, RealtimeDestination},
22+
realtime::{DEFAULT_MAX_RETRIES, RealtimeConfig, RealtimeDestination},
2323
};
2424
use secrecy::ExposeSecret;
2525
use tokio::signal::unix::{SignalKind, signal};
@@ -140,16 +140,16 @@ pub async fn start_replicator_with_config(
140140
update_event,
141141
delete_event,
142142
} => {
143-
let destination = RealtimeDestination::new(
144-
url.clone(),
145-
api_key.expose_secret().to_string(),
146-
channel_prefix.clone(),
147-
*private_channels,
148-
insert_event.clone(),
149-
update_event.clone(),
150-
delete_event.clone(),
151-
DEFAULT_MAX_RETRIES,
152-
);
143+
let destination = RealtimeDestination::new(RealtimeConfig {
144+
url: url.clone(),
145+
api_key: api_key.expose_secret().to_string(),
146+
channel_prefix: channel_prefix.clone(),
147+
private_channels: *private_channels,
148+
insert_event: insert_event.clone(),
149+
update_event: update_event.clone(),
150+
delete_event: delete_event.clone(),
151+
max_retries: DEFAULT_MAX_RETRIES,
152+
});
153153

154154
let pipeline = Pipeline::new(replicator_config.pipeline, state_store, destination);
155155
start_pipeline(pipeline).await?;

0 commit comments

Comments
 (0)