Skip to content

Add ducklake destination#627

Open
bnjjj wants to merge 29 commits intomainfrom
bnjjj/ducklake
Open

Add ducklake destination#627
bnjjj wants to merge 29 commits intomainfrom
bnjjj/ducklake

Conversation

@bnjjj
Copy link
Copy Markdown
Contributor

@bnjjj bnjjj commented Mar 9, 2026

Add ducklake destination, duckdb doesn't implement an async compliant api so in order to provide good performances and not break the entire async runtime I had to deal with semaphore and blocking task using spawn_blocking. The most important thing is to spend less time as possible in this spawn_blocking task, that's one of the main reason why I use Semaphore. Because without this we would have to potentially wait on pool.get() from r2d2, by using a Semaphore with the same "size" as the pool size we avoid waiting too long on pool.get(). Another point is I put the retry mechanism outside of spawn_blocking to asynchronously sleep and not blocking too long in spawn_blocking.

Outside of these considerations, this destination use duckdb-rs crate and it also uses the ducklake extension.

TODO:

  • Clarify extensions loading, maybe vendor the needed extensions to avoid relying on an external repository and also pin the versions

@bnjjj bnjjj requested a review from a team as a code owner March 9, 2026 15:48
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 9, 2026

📝 Walkthrough

Summary by CodeRabbit

  • New Features

    • Added a feature-gated DuckLake destination for streaming Postgres CDC with optional S3-compatible storage and table lifecycle handling.
    • New example CLI to run replication to DuckLake.
  • Documentation

    • Expanded examples and README with DuckLake prerequisites, run commands, flags, and query guidance.
  • Tests

    • Added end-to-end integration tests covering writes, truncation, concurrency, and type mappings.
  • Chores

    • Minor .gitignore cleanup and readability fixes.

Walkthrough

This PR introduces a new DuckLake destination feature: workspace Cargo dependencies and feature gates; a ducklake module with config, core, and schema implementations; a DuckLakeDestination type with constructor and Destination impl; S3/catalog URL parsing and libpq conninfo generation; DDL/type-mapping utilities; connection pooling with per-connection setup SQL and retried batched writes; CDC event handling; integration tests; an example binary and README updates; plus minor project file changes (.gitignore, workspace Cargo.toml).

Sequence Diagram(s)

sequenceDiagram
    actor Client
    participant Pipeline
    participant DuckLakeDestination
    participant Pool as DuckDB Pool
    participant DuckLake as DuckLake Storage

    Client->>Pipeline: submit CDC events / row batch
    Pipeline->>DuckLakeDestination: write_table_rows(rows) / write_events(events)
    DuckLakeDestination->>Pool: acquire connection
    Pool->>DuckDB: open connection & run setup SQL (attach catalog, load extensions)
    DuckLakeDestination->>Pool: CREATE TABLE IF NOT EXISTS (if needed)
    DuckLakeDestination->>Pool: CREATE TEMP staging table
    DuckLakeDestination->>Pool: INSERT rows into staging
    DuckLakeDestination->>Pool: INSERT INTO ducklake.table SELECT ... FROM staging
    Pool->>DuckLake: persist Parquet snapshot / apply changes
    DuckLakeDestination->>Pipeline: respond success / error
Loading
sequenceDiagram
    participant App as Application
    participant Config as DuckLake Config
    participant Pool as Connection Pool
    participant DuckDB as DuckDB Engine
    participant Catalog as DuckLake Catalog

    App->>Config: provide catalog_url + data_path (+ optional S3)
    Config->>Config: catalog_conninfo_from_url / validate_data_path / build_setup_sql
    Config-->>App: setup SQL (attach catalog, create secret if S3)
    App->>Pool: create pool with setup SQL
    Pool->>DuckDB: get connection and execute setup SQL
    DuckDB->>Catalog: INSTALL/LOAD ducklake (and postgres/httpfs if needed)
    alt S3 used
        DuckDB->>Catalog: CREATE SECRET for httpfs (endpoint, creds)
    end
    Catalog-->>DuckDB: catalog attached and ready
    Pool-->>App: pool initialized and ready for use
Loading

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Comment on lines +509 to +533
match run_duckdb_blocking(pool.clone(), Arc::clone(&blocking_slots), move |conn| {
insert_rows(
conn,
&attempt_table_name,
attempt_values.as_ref().as_slice(),
)
})
.await
{
Ok(()) => return Ok(()),
Err(e) if attempt < MAX_COMMIT_RETRIES => {
// Apply full-jitter: sleep between 50 % and 150 % of the
// calculated delay so concurrent retriers spread out instead
// of staying in lock-step and repeatedly colliding.
let jitter_ratio = rand::rng().random_range(0.5..=1.5_f64);
let jittered = delay.mul_f64(jitter_ratio);
warn!(
attempt = attempt + 1,
max = MAX_COMMIT_RETRIES,
table = %table_name,
error = ?e,
"DuckLake write attempt failed, retrying"
);
tokio::time::sleep(jittered).await;
delay = std::cmp::min(delay * 2, Duration::from_millis(MAX_RETRY_DELAY_MS));
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where the retry mechanism happens, to make sure we don't block a thread for too long

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@etl-destinations/src/ducklake/config.rs`:
- Around line 142-147: The code pushes tcp_user_timeout using seconds (calling
as_secs()), but libpq requires milliseconds; in the block that checks
config.get_tcp_user_timeout() (the parts.push call that formats
"tcp_user_timeout={}" in config.rs) change the conversion to use as_millis() so
the URL encodes milliseconds rather than seconds and ensure the formatted value
is an integer millisecond value compatible with libpq.
- Around line 314-316: validate_data_path currently accepts the "az" scheme but
Azure is not initialized; either remove "az" from the allowed schemes in
validate_data_path or fully implement Azure support in the initialization code:
when handling az:// URIs ensure the DuckDB "azure" extension is installed (not
just httpfs), and create a DuckDB secret with TYPE azure and the Azure
credentials (instead of creating an S3 secret). Also update the setup logic that
installs httpfs and creates S3 secrets to branch on the scheme and
install/create the azure extension/secret for "az" URIs.

In `@etl-destinations/src/ducklake/schema.rs`:
- Around line 73-85: Replace the debug formatting usage that produces invalid
SQL identifiers: call pg_escape::quote_identifier on the table name and on each
column name instead of using {:?} in the CREATE TABLE assembly (the code that
builds col_defs and the final format! call). Specifically, in the iterator
mapping that builds col_defs (referencing col.name and
postgres_column_type_to_ducklake_sql) wrap the column identifier with
quote_identifier and use that escaped identifier in the format string, and
likewise escape table_name with quote_identifier when constructing the final
CREATE TABLE string so both table and column identifiers are valid SQL
identifiers.

In `@etl-examples/src/bin/ducklake.rs`:
- Around line 150-165: Remove the unsafe std::env::set_var use in set_log_level
and instead set the default log level via the EnvFilter fallback in
init_tracing: delete the unsafe block in set_log_level (or make set_log_level a
no-op) and change init_tracing to call
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_|
tracing_subscriber::EnvFilter::new("ducklake=info")) so the default
"ducklake=info" is provided without mutating process environment; reference
init_tracing and set_log_level to locate the code.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Central YAML (base), Organization UI (inherited)

Review profile: CHILL

Plan: Pro

Run ID: 18b38e58-5cb2-46c6-9c8b-1a1851055534

📥 Commits

Reviewing files that changed from the base of the PR and between 01337b9 and 1afa7ef.

⛔ Files ignored due to path filters (2)
  • Cargo.lock is excluded by !**/*.lock
  • replication_bis.db is excluded by !**/*.db
📒 Files selected for processing (12)
  • .gitignore
  • Cargo.toml
  • etl-destinations/Cargo.toml
  • etl-destinations/src/ducklake/config.rs
  • etl-destinations/src/ducklake/core.rs
  • etl-destinations/src/ducklake/mod.rs
  • etl-destinations/src/ducklake/schema.rs
  • etl-destinations/src/lib.rs
  • etl-destinations/tests/ducklake_destination.rs
  • etl-examples/Cargo.toml
  • etl-examples/README.md
  • etl-examples/src/bin/ducklake.rs

bnjjj added 2 commits March 9, 2026 17:16
Signed-off-by: Benjamin <[email protected]>
Signed-off-by: Benjamin <[email protected]>
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@etl-destinations/src/ducklake/core.rs`:
- Around line 267-293: The current write_events_inner treats Event::Update and
Event::Delete the same as inserts by pushing rows into table_id_to_rows and
calling insert_rows_with_retry; change this to honor the Destination trait
contract by handling each event type appropriately: keep Event::Insert behavior
using insert_rows_with_retry, for Event::Delete construct and route delete
keys/rows to a delete_rows_with_retry (or issue DELETE statements) instead of
inserting old_row, and for Event::Update either emit a delete for the old_row
and an insert for the new table_row or call an upsert/merge API (depending on
DuckLake capabilities) so the old version is removed/merged; update
write_events_inner to track separate collections (e.g., insert_rows_by_table,
delete_rows_by_table, upsert_rows_by_table) and call the corresponding helper
functions (insert_rows_with_retry, delete_rows_with_retry, or
upsert_rows_with_retry), referencing Event::Update, Event::Delete,
write_events_inner, insert_rows_with_retry, table_id_to_rows, and cell_to_value
to locate and modify the code paths.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Central YAML (base), Organization UI (inherited)

Review profile: CHILL

Plan: Pro

Run ID: d25f47ea-50c9-4add-94df-edb6868728ad

📥 Commits

Reviewing files that changed from the base of the PR and between 1afa7ef and e2d7bbe.

📒 Files selected for processing (4)
  • etl-destinations/Cargo.toml
  • etl-destinations/src/ducklake/core.rs
  • etl-destinations/tests/ducklake_destination.rs
  • etl-examples/src/bin/ducklake.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • etl-destinations/tests/ducklake_destination.rs

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
etl-destinations/src/ducklake/core.rs (2)

445-452: ⚠️ Potential issue | 🔴 Critical

The table-name encoding is still lossy and not safe for downstream SQL use.

This mapping is not injective (a_.b and a._b both collapse to a___b), so two source tables can end up sharing one DuckLake table. It also preserves characters like " that later break the raw SQL interpolation paths used for truncate/insert statements.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@etl-destinations/src/ducklake/core.rs` around lines 445 - 452, The current
table_name_to_ducklake_table_name implementation is lossy and unsafe; replace it
with an injective, SQL-safe encoding: percent-encode (or otherwise escape) the
entire schema and name components (including the escape char, delimiter, and
quote characters like '"') and then join them with TABLE_NAME_DELIMITER so the
mapping is reversible and unique. Concretely, update
table_name_to_ducklake_table_name (and any helpers/constants if needed) to first
percent-encode each TableName.schema and TableName.name (ensuring '%' itself is
encoded), then return
format!("{encoded_schema}{TABLE_NAME_DELIMITER}{encoded_table}") so collisions
like `a_.b` vs `a._b` cannot occur and raw SQL interpolation no longer breaks;
keep references to TABLE_NAME_DELIMITER, TABLE_NAME_ESCAPE, TableName, and
DuckLakeTableName to locate the change.

256-293: ⚠️ Potential issue | 🔴 Critical

Update/Delete events are still being written as inserts.

Lines 268-293 push both Event::Update and Event::Delete rows into table_id_to_rows, and that batch is later sent through insert_rows_with_retry. That means deletes re-insert old rows and updates leave the old version behind, so the destination diverges from the replication stream.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@etl-destinations/src/ducklake/core.rs` around lines 256 - 293, The current
Event::Update and Event::Delete branches are accumulating rows into
table_id_to_rows and later calling insert_rows_with_retry, causing updates to
re-insert old data and deletes to resurrect deleted rows. Change the logic so
inserts and upserts go to the existing table_id_to_rows (used with
insert_rows_with_retry or an upsert variant), but collect deletes separately
(e.g., table_id_to_deletes) using the primary-key values from
delete.old_table_row and call the appropriate delete_rows_with_retry/delete_rows
function; for updates, ensure you send the new row as an upsert (or perform
delete by pk + insert of new row) rather than treating it as a blind insert.
Update the Event::Update and Event::Delete branches to populate the correct
collections and invoke the matching insert/upsert/delete helper functions
instead of always calling insert_rows_with_retry.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@etl-examples/README.md`:
- Around line 46-55: The README overclaims support for GCS/Azure and autoloading
of the "ducklake" and "httpfs" extensions while the CLI binary (ducklake.rs)
currently only implements S3-specific cloud flags; update the documentation to
accurately reflect the shipped surface by restricting storage examples to the
supported S3/local flows and removing references to GCS/Azure and autoloaded
extensions, or alternatively implement the missing cloud backends and
extension-loading behavior in the ducklake CLI (the code that parses cloud flags
and loads extensions in ducklake.rs) before merging so the README and runtime
behavior match.

---

Duplicate comments:
In `@etl-destinations/src/ducklake/core.rs`:
- Around line 445-452: The current table_name_to_ducklake_table_name
implementation is lossy and unsafe; replace it with an injective, SQL-safe
encoding: percent-encode (or otherwise escape) the entire schema and name
components (including the escape char, delimiter, and quote characters like '"')
and then join them with TABLE_NAME_DELIMITER so the mapping is reversible and
unique. Concretely, update table_name_to_ducklake_table_name (and any
helpers/constants if needed) to first percent-encode each TableName.schema and
TableName.name (ensuring '%' itself is encoded), then return
format!("{encoded_schema}{TABLE_NAME_DELIMITER}{encoded_table}") so collisions
like `a_.b` vs `a._b` cannot occur and raw SQL interpolation no longer breaks;
keep references to TABLE_NAME_DELIMITER, TABLE_NAME_ESCAPE, TableName, and
DuckLakeTableName to locate the change.
- Around line 256-293: The current Event::Update and Event::Delete branches are
accumulating rows into table_id_to_rows and later calling
insert_rows_with_retry, causing updates to re-insert old data and deletes to
resurrect deleted rows. Change the logic so inserts and upserts go to the
existing table_id_to_rows (used with insert_rows_with_retry or an upsert
variant), but collect deletes separately (e.g., table_id_to_deletes) using the
primary-key values from delete.old_table_row and call the appropriate
delete_rows_with_retry/delete_rows function; for updates, ensure you send the
new row as an upsert (or perform delete by pk + insert of new row) rather than
treating it as a blind insert. Update the Event::Update and Event::Delete
branches to populate the correct collections and invoke the matching
insert/upsert/delete helper functions instead of always calling
insert_rows_with_retry.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Central YAML (base), Organization UI (inherited)

Review profile: CHILL

Plan: Pro

Run ID: 600105be-7205-4915-a9f7-87d2d80d2cbe

📥 Commits

Reviewing files that changed from the base of the PR and between e2d7bbe and 881ce2d.

📒 Files selected for processing (4)
  • etl-destinations/src/ducklake/config.rs
  • etl-destinations/src/ducklake/core.rs
  • etl-destinations/src/ducklake/schema.rs
  • etl-examples/README.md
🚧 Files skipped from review as they are similar to previous changes (1)
  • etl-destinations/src/ducklake/schema.rs

Comment on lines +46 to +55
DuckLake separates storage into two components:

| Component | Role | Example |
|-----------|------|---------|
| **Catalog** | Metadata (tables, snapshots, stats) | PostgreSQL database |
| **Data** | Row data as Parquet files | Local directory or S3/GCS/Azure |

The `ducklake` DuckDB extension (a core extension, autoloaded) handles reads
and writes. Each batch of rows is committed as a single Parquet snapshot so
the lake stays consistent and queryable at all times.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Scope the DuckLake docs to the storage modes and extension loading behavior that actually ship.

This section documents S3/GCS/Azure support and says ducklake/httpfs are autoloaded, but the provided CLI surface in etl-examples/src/bin/ducklake.rs:96-138 only exposes S3-specific cloud options, and the PR objective still calls out extension loading as unresolved. As written, users can follow the documented GCS/Azure or autoloaded-extension paths and hit runtime failures immediately. Please narrow the README to the currently supported path or land the missing support before merge.

As per coding guidelines, "Comment only when the issue must be resolved before merge" and "Highlight only issues that could cause runtime errors, data loss, or severe maintainability issues."

Also applies to: 78-79, 98-118

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@etl-examples/README.md` around lines 46 - 55, The README overclaims support
for GCS/Azure and autoloading of the "ducklake" and "httpfs" extensions while
the CLI binary (ducklake.rs) currently only implements S3-specific cloud flags;
update the documentation to accurately reflect the shipped surface by
restricting storage examples to the supported S3/local flows and removing
references to GCS/Azure and autoloaded extensions, or alternatively implement
the missing cloud backends and extension-loading behavior in the ducklake CLI
(the code that parses cloud flags and loads extensions in ducklake.rs) before
merging so the README and runtime behavior match.

Copy link
Copy Markdown
Contributor

@iambriccardo iambriccardo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work, left a few minor comments!

iceberg = { workspace = true, optional = true }
iceberg-catalog-rest = { workspace = true, optional = true }
metrics = { workspace = true, optional = true }
parking_lot = { version = "0.12.5", optional = true }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would put this in the main workspace and inherit it here.

name = "etl-examples"
path = "src/main.rs"

[[bin]]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we maybe want in the future to also split the example in individual binaries per destination?

);
}

/// `truncate_table` deletes all rows while leaving the table schema intact.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would remove these comments in favor of just a good test name to be in line with the other tests but I don't have strong opinions.

//! "
//! ```

use chrono::NaiveDate;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also add a file ducklake_pipeline to properly test the pipeline with the destination end to end.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example in BigQuery we are missing direct integration tests on the destination, which we could add. It's just that I deemed them unnecessary since we want to test the whole system end to end.

/// # Example
/// - `public.my_table` → `public_my__table`
/// - `my_schema.orders` → `my__schema_orders`
pub fn table_name_to_ducklake_table_name(table_name: &TableName) -> DuckLakeTableName {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will make a PR to share this naming logic between all destinations.

Comment on lines +535 to +536
let jitter_ratio = rand::rng().random_range(0.5..=1.5_f64);
let jittered = delay.mul_f64(jitter_ratio);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should standardize a backoff system within ETL to make everything more predictable?

format!("{escaped_schema}{TABLE_NAME_DELIMITER}{escaped_table}")
}

// ── helpers ───────────────────────────────────────────────────────────────────
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// ── helpers ───────────────────────────────────────────────────────────────────

R: Send + 'static,
F: FnOnce(&duckdb::Connection) -> EtlResult<R> + Send + 'static,
{
let permit = blocking_slots.acquire_owned().await.map_err(|_| {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need the semaphore? Can't we rely on the fact that a saturated pool will already control the maximum amount of work to be done?

let blocking_slots = Arc::clone(&self.blocking_slots);

join_set.spawn(async move {
insert_rows_with_retry(pool, blocking_slots, table_name, rows).await
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It's not only inserting rows, so it might be a bit confusing for the reader:

Suggested change
insert_rows_with_retry(pool, blocking_slots, table_name, rows).await
apply_events_with_retry(pool, blocking_slots, table_name, rows).await

Signed-off-by: Benjamin <[email protected]>
Copilot AI review requested due to automatic review settings March 26, 2026 17:03
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new DuckLake destination (DuckDB + ducklake extension) to the ETL system, wiring it through the destinations crate, replicator, and API/Kubernetes deployment paths, while adding operational support for vendoring/loading DuckDB extensions and improving replication status reporting.

Changes:

  • Add etl-destinations::ducklake destination implementation (schema/type mapping, encoding, blocking execution, metrics, and maintenance).
  • Integrate DuckLake into etl-replicator and etl-api (config/types, validation, k8s secrets/env wiring, Docker builds with extension fetching).
  • Enhance apply-loop observability and correctness (batch-flush status updates to Postgres; improved slot resume logging; retry policy for DuckLake atomic batch failures).

Reviewed changes

Copilot reviewed 46 out of 50 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
scripts/vendor_duckdb_extensions.sh Adds a script to fetch/pin DuckDB extensions for a specific DuckDB version and platform.
scripts/deploy-local-replicator-orbstack.sh Ensures scripts/ is included in the Docker build context copy.
etl/src/workers/policy.rs Adds retry policy handling for DuckLakeAtomicBatchRetryable and a unit test.
etl/src/workers/apply.rs Adds structured logs for chosen start LSN when resuming/creating replication slots.
etl/src/replication/stream.rs Adds StatusUpdateType::BatchFlush for clearer status update attribution.
etl/src/replication/apply.rs Sends an explicit status update to Postgres after each durable batch flush.
etl/src/error.rs Introduces new ErrorKind::DuckLakeAtomicBatchRetryable.
etl-replicator/src/core.rs Wires DuckLake destination creation from config (incl. S3 + URL normalization helper).
etl-replicator/configuration/base.yaml Adds default config file (currently ClickHouse-oriented).
etl-replicator/configuration/dev.yaml Adds dev config file (currently ClickHouse-oriented).
etl-replicator/configuration/prod.yaml Adds prod config file (currently ClickHouse-oriented).
etl-replicator/Dockerfile Fetches DuckDB extensions during image build and copies them into runtime image.
etl-replicator/Cargo.toml Enables ducklake destination feature and adds url dependency.
etl-examples/src/bin/ducklake.rs Adds a dedicated DuckLake example binary with CLI parsing and URL normalization.
etl-examples/README.md Expands examples documentation and adds DuckLake usage/docs.
etl-examples/Cargo.toml Adds ducklake binary + enables ducklake destination + adds etl-telemetry and url.
etl-destinations/src/lib.rs Exposes ducklake module behind a feature flag.
etl-destinations/src/ducklake/mod.rs Declares DuckLake module layout and re-exports destination/config types.
etl-destinations/src/ducklake/client.rs Implements r2d2 manager + blocking execution with timeouts and interruption watchdog.
etl-destinations/src/ducklake/config.rs Implements extension loading strategy + ATTACH/setup SQL + URL/conninfo serialization.
etl-destinations/src/ducklake/core.rs Implements the DuckLake destination write path, batching/retry, and coordination.
etl-destinations/src/ducklake/encoding.rs Adds row/cell encoding to DuckDB values or SQL literals (arrays fallback).
etl-destinations/src/ducklake/maintenance.rs Adds DuckLake maintenance worker logic (flush/merge/rewrite/checkpoint etc.).
etl-destinations/src/ducklake/metrics.rs Adds DuckLake metrics registration and background sampler.
etl-destinations/src/ducklake/schema.rs Adds Postgres→DuckLake type mapping and CREATE TABLE DDL builder.
etl-destinations/src/ducklake/METRICS.md Documents DuckLake-specific metrics and tuning guidance.
etl-destinations/Cargo.toml Adds ducklake feature and dependencies (duckdb, r2d2, tokio-postgres, etc.).
etl-config/src/shared/destination.rs Adds DestinationConfig::Ducklake plus supporting structs/defaults.
etl-api/src/utils.rs Adds trim_option_secret_string helper for DuckLake secret fields in API payloads.
etl-api/src/validation/validators.rs Adds DuckLake destination validator (connectivity + URL normalization).
etl-api/src/k8s/base.rs Adds DuckLake destination type and K8s secret lifecycle trait methods.
etl-api/src/k8s/http.rs Implements create/update/delete DuckLake S3 credential secret + env var injection + tests.
etl-api/src/k8s/core.rs Threads DuckLake secret creation/deletion into reconciler flow + adds unit tests.
etl-api/src/k8s/cache.rs Extends K8s client mocks for DuckLake secret methods.
etl-api/tests/support/k8s_client.rs Extends test mock K8s client for DuckLake secret methods.
etl-api/src/configs/destination.rs Adds API/stored/encrypted config support for DuckLake (incl. DuckDB log config).
etl-api/src/k8s/snapshots/etl_api__k8s__http__tests__create_ducklake_container_environment.snap Adds snapshot for DuckLake env var rendering (dev).
etl-api/src/k8s/snapshots/etl_api__k8s__http__tests__create_ducklake_container_environment-2.snap Adds snapshot for DuckLake env var rendering (staging).
etl-api/src/k8s/snapshots/etl_api__k8s__http__tests__create_ducklake_container_environment-3.snap Adds snapshot for DuckLake env var rendering (prod).
etl-api/src/k8s/snapshots/etl_api__k8s__http__tests__create_ducklake_replicator_stateful_set_json.snap Adds snapshot for DuckLake StatefulSet JSON (dev).
etl-api/src/k8s/snapshots/etl_api__k8s__http__tests__create_ducklake_replicator_stateful_set_json-2.snap Adds snapshot for DuckLake StatefulSet JSON (staging).
etl-api/src/k8s/snapshots/etl_api__k8s__http__tests__create_ducklake_replicator_stateful_set_json-3.snap Adds snapshot for DuckLake StatefulSet JSON (prod).
etl-api/src/configs/snapshots/etl_api__configs__destination__tests__full_api_destination_config_serialization_ducklake.snap Adds snapshot for DuckLake destination config JSON serialization.
etl-api/Dockerfile Fetches DuckDB extensions during image build and copies them into runtime image.
etl-api/Cargo.toml Enables ducklake destination feature and adds url dependency.
Cargo.toml Adds workspace deps (duckdb, r2d2, url).
Cargo.lock Locks new dependencies and resolves multiple Arrow versions due to DuckDB.
.gitignore Anchors venv ignores and ignores optional local DuckDB extension vendoring directory.
.dockerignore Excludes optional local DuckDB extension vendoring directory from Docker context.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +189 to +191
/// Used when `data_path` points to an S3, GCS, or Azure URI. The `endpoint`
/// field supports an optional path prefix (e.g. `localhost:5000/s3` for
/// Supabase Storage or other S3-compatible services mounted at a sub-path).
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring says DuckLake data_path can point to an Azure URI, but validate_data_path() only accepts file, s3, and gs schemes (and tests explicitly reject az://...). Either add Azure scheme support (and ensure the required DuckDB extension/config is loaded) or update the documentation to match the supported schemes.

Suggested change
/// Used when `data_path` points to an S3, GCS, or Azure URI. The `endpoint`
/// field supports an optional path prefix (e.g. `localhost:5000/s3` for
/// Supabase Storage or other S3-compatible services mounted at a sub-path).
/// Used when `data_path` points to an S3 or GCS URI. The `endpoint` field
/// supports an optional path prefix (e.g. `localhost:5000/s3` for Supabase
/// Storage or other S3-compatible services mounted at a sub-path).

Copilot uses AI. Check for mistakes.
Comment on lines +314 to +317
info!(
wait_ms = slot_wait_started.elapsed().as_millis() as u64,
"wait for ducklake blocking slot"
);
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run_duckdb_blocking_with_timeout logs at info level for every blocking-slot wait. This function is on the hot path for every DuckDB operation, so these info! logs can flood production logs. Consider downgrading to debug, or only logging when wait exceeds a threshold, relying on the histogram metrics for normal visibility.

Copilot uses AI. Check for mistakes.
Comment on lines +351 to +354
info!(
wait_ms = checkout_started.elapsed().as_millis() as u64,
"wait for ducklake pool checkout"
);
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run_duckdb_blocking_with_timeout logs at info level for every pool checkout wait. Since this runs per DuckDB operation, it can be extremely noisy in steady state. Prefer debug level and/or conditional logging for slow checkouts; keep the histogram metric as the primary signal.

Copilot uses AI. Check for mistakes.
Comment on lines +372 to +375
info!(
duration_ms = operation_started.elapsed().as_millis() as u64,
"ducklake blocking operation finished"
);
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run_duckdb_blocking_with_timeout emits an info! log for every blocking operation completion. On a busy pipeline this will generate a very high log volume; consider moving this to debug and/or only logging when duration exceeds a threshold (the histogram already captures this systematically).

Copilot uses AI. Check for mistakes.
Comment on lines +4 to +19
destination:
click_house:
url: http://clickhouse.etl-data-plane.svc.cluster.local:8123
user: default
password: password
database: mydb
pipeline:
id: 42
publication_name: my_pub
pg_connection:
host: postgres.etl-data-plane.svc.cluster.local
port: 5432
name: mydb
username: postgres
password: password
tls:
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This committed prod.yaml includes plaintext passwords (destination.click_house.password and pipeline.pg_connection.password). Even as placeholders, shipping a prod config with credentials encourages accidental insecure deployments. Prefer removing these, renaming to *.example.yaml, or using environment-variable placeholders so real credentials are never committed.

Copilot uses AI. Check for mistakes.
Comment on lines +48 to +52
| Component | Role | Example |
|-----------|------|---------|
| **Catalog** | Metadata (tables, snapshots, stats) | PostgreSQL database |
| **Data** | Row data as Parquet files | Local directory or S3/GCS/Azure |

Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The README states DuckLake data can be stored in S3/GCS/Azure, but the current DuckLake destination only accepts file, s3, and gs schemes for data_path. Update the README to match the actual supported schemes, or add Azure support so this statement is accurate.

Copilot uses AI. Check for mistakes.
Comment on lines +303 to +325
fn parse_ducklake_url(value: &str) -> Result<Url, std::io::Error> {
if value.contains("://") {
return Url::parse(value).map_err(std::io::Error::other);
}

if let Ok(url) = Url::parse(value) {
return Ok(url);
}

let path = PathBuf::from(value);
let path = if path.is_absolute() {
path
} else {
std::env::current_dir()?.join(path)
};

Url::from_file_path(&path).map_err(|_| {
std::io::Error::other(format!(
"failed to convert path `{}` to a file url",
path.display()
))
})
}
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parse_ducklake_url is now implemented here, and very similar logic exists in other crates (e.g. the API validator and examples). Duplicated URL normalization tends to drift (different error types/messages, scheme handling, Windows paths, etc.). Consider extracting a shared helper (e.g. in etl-config or etl-destinations) and reusing it across the replicator/API/examples.

Copilot uses AI. Check for mistakes.
Comment on lines +4 to +19
destination:
click_house:
url: http://clickhouse.etl-data-plane.svc.cluster.local:8123
user: default
password: password
database: mydb
pipeline:
id: 42
publication_name: my_pub
pg_connection:
host: postgres.etl-data-plane.svc.cluster.local
port: 5432
name: mydb
username: postgres
password: password
tls:
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This committed dev.yaml includes plaintext passwords. If these files are intended as templates, consider renaming to *.example.yaml (or similar) and/or using environment-variable placeholders to avoid ever committing credentials-like values, even defaults.

Copilot uses AI. Check for mistakes.
Comment on lines +4 to +19
destination:
click_house:
url: http://clickhouse.etl-data-plane.svc.cluster.local:8123
user: default
password: password
database: mydb
pipeline:
id: 42
publication_name: my_pub
pg_connection:
host: postgres.etl-data-plane.svc.cluster.local
port: 5432
name: mydb
username: postgres
password: password
tls:
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This committed base.yaml includes plaintext passwords. To reduce the chance of accidentally running with insecure defaults, prefer keeping sensitive fields empty, sourcing them from env/secret management, or moving this to an example/template config rather than a default-loaded base config.

Copilot uses AI. Check for mistakes.
"processing syncing tables after batch flush"
);

events_stream
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this while debugging something and I thought it would be better to flush after a successful batch flush. It looks like we wait for the next keepalive to update and I was thinking it would potentially be better to notify as quickly as possible but maybe I'm wrong or I'm misunderstanding something. Feel free to tell me, I don't think it's like mandatory but it can potentially also reduce the WAL retention lag even if it's not really important. I need your opinion on this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants