Describe the bug
The logical optimizer (ie, into_optimized_plan()) creates logical plans that can't be unparsed into correct SQL. This doesn't seem universal but does apply to a lot of the queries I'm trying.
Succeeds:
- parse SQL - ✅
- build unoptimized plan - ✅
- unparse SQL - ✅
- execute against DuckDB - ✅
Fails:
- parse SQL - ✅
- build optimized plan - ✅
- unparse SQL - ❌
- execute against DuckDB - ❌
To Reproduce
Here is a simple example that reproduces the behavior I'm describing:
datafusion = "54.0.0"
duckdb = { version = "1.10503.1", features = ["bundled"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
use std::sync::Arc;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::catalog::{
CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, SchemaProvider,
};
use datafusion::datasource::empty::EmptyTable;
use datafusion::optimizer::OptimizerRule;
use datafusion::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
use datafusion::prelude::*;
use datafusion::sql::unparser::Unparser;
use datafusion::sql::unparser::dialect::DuckDBDialect;
use duckdb::Connection;
const QUERY: &str = r#"
SELECT * FROM
(
SELECT
item_id,
order_id,
product_id,
quantity,
unit_price,
quantity * unit_price AS line_total
FROM
"warehouse"."main"."order_items"
) oi
JOIN (
SELECT
order_id,
customer_id,
order_date,
lower(STATUS) AS STATUS,
lower(channel) AS channel,
coalesce(discount_pct, 0) AS discount_pct,
coalesce(shipping_cost, 0) AS shipping_cost,
STATUS IN ('completed', 'shipped') AS is_fulfilled
FROM
"warehouse"."main"."orders"
) o USING (order_id)
JOIN (
SELECT
p.product_id,
p.category_id,
p.sku,
p.name AS product_name,
p.price,
p.cost,
p.weight_kg,
p.is_active,
p.stock_qty,
round(p.price - p.cost, 2) AS gross_margin,
round((p.price - p.cost) / nullif(p.price, 0), 4) AS margin_pct,
c.name AS category_name
FROM
"warehouse"."main"."products" p
LEFT JOIN "warehouse"."main"."categories" c USING (category_id)
) p USING (product_id)
"#;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("registering tables");
let order_items_schema = Arc::new(Schema::new(vec![
Field::new("item_id", DataType::Int32, false),
Field::new("order_id", DataType::Int32, true),
Field::new("product_id", DataType::Int32, true),
Field::new("quantity", DataType::Int32, true),
Field::new("unit_price", DataType::Decimal128(10, 2), true),
]));
let orders_schema = Arc::new(Schema::new(vec![
Field::new("order_id", DataType::Int32, false),
Field::new("customer_id", DataType::Int32, true),
Field::new("order_date", DataType::Date32, true),
Field::new("status", DataType::Utf8, true),
Field::new("channel", DataType::Utf8, true),
Field::new("discount_pct", DataType::Decimal128(5, 2), true),
Field::new("shipping_cost", DataType::Decimal128(8, 2), true),
]));
let products_schema = Arc::new(Schema::new(vec![
Field::new("product_id", DataType::Int32, false),
Field::new("category_id", DataType::Int32, true),
Field::new("sku", DataType::Utf8, true),
Field::new("name", DataType::Utf8, true),
Field::new("price", DataType::Decimal128(10, 2), true),
Field::new("cost", DataType::Decimal128(10, 2), true),
Field::new("weight_kg", DataType::Decimal128(6, 3), true),
Field::new("is_active", DataType::Boolean, true),
Field::new("stock_qty", DataType::Int32, true),
]));
let categories_schema = Arc::new(Schema::new(vec![
Field::new("category_id", DataType::Int32, false),
Field::new("name", DataType::Utf8, true),
Field::new("parent_id", DataType::Int32, true),
Field::new("display_rank", DataType::Int32, true),
]));
let ctx = SessionContext::new();
// ctx.remove_optimizer_rule(CommonSubexprEliminate::new().name());
let schema_provider = Arc::new(MemorySchemaProvider::new());
schema_provider.register_table(
"order_items".to_string(),
Arc::new(EmptyTable::new(order_items_schema)),
)?;
schema_provider.register_table(
"orders".to_string(),
Arc::new(EmptyTable::new(orders_schema)),
)?;
schema_provider.register_table(
"products".to_string(),
Arc::new(EmptyTable::new(products_schema)),
)?;
schema_provider.register_table(
"categories".to_string(),
Arc::new(EmptyTable::new(categories_schema)),
)?;
let catalog = Arc::new(MemoryCatalogProvider::new());
catalog.register_schema("main", schema_provider)?;
ctx.register_catalog("warehouse", catalog);
let dialect = DuckDBDialect::new();
let unparser = Unparser::new(&dialect);
let conn = Connection::open("warehouse.duckdb")?;
{
println!("Unoptimized query:");
let unopt_plan = ctx.sql(QUERY).await?.into_unoptimized_plan();
let sql = unparser.plan_to_sql(&unopt_plan)?;
let mut stmt = conn.prepare(&sql.to_string())?;
match stmt.query([]) {
Ok(_) => {
println!("success");
}
Err(e) => {
println!("failed: {e}");
}
}
}
println!("building optimized plan");
let plan = ctx.sql(QUERY).await?.into_optimized_plan()?;
println!("unparsing optimized plan");
let sql = unparser.plan_to_sql(&plan)?;
println!("Generated SQL:\n{sql}");
{
println!("Optimized query:");
let mut stmt = conn.prepare(&sql.to_string())?;
match stmt.query([]) {
Ok(_) => {
println!("success");
}
Err(e) => {
println!("failed: {e}");
}
}
}
Ok(())
}
Generate the empty DuckDB tables:
duckdb warehouse.duckdb "
CREATE TABLE IF NOT EXISTS main.order_items (
item_id INTEGER NOT NULL,
order_id INTEGER,
product_id INTEGER,
quantity INTEGER,
unit_price DECIMAL(10, 2)
);
CREATE TABLE IF NOT EXISTS main.orders (
order_id INTEGER NOT NULL,
customer_id INTEGER,
order_date DATE,
status VARCHAR,
channel VARCHAR,
discount_pct DECIMAL(5, 2),
shipping_cost DECIMAL(8, 2)
);
CREATE TABLE IF NOT EXISTS main.products (
product_id INTEGER NOT NULL,
category_id INTEGER,
sku VARCHAR,
name VARCHAR,
price DECIMAL(10, 2),
cost DECIMAL(10, 2),
weight_kg DECIMAL(6, 3),
is_active BOOLEAN,
stock_qty INTEGER
);
CREATE TABLE IF NOT EXISTS main.categories (
category_id INTEGER NOT NULL,
name VARCHAR,
parent_id INTEGER,
display_rank INTEGER
);
"
The reproducer will result in the following error:
registering tables
Unoptimized query:
success
building optimized plan
unparsing optimized plan
Error: SchemaError(FieldNotFound { field: Column { relation: Some(Bare { table: "o" }), name: "__common_expr_1" }, valid_fields: [Column { relation: None, name: "__common_expr_1" }, Column { relation: None, name: "__common_expr_2" }, Column { relation: Some(Bare { table: "o" }), name: "order_id" }, Column { relation: Some(Bare { table: "o" }), name: "customer_id" }, Column { relation: Some(Bare { table: "o" }), name: "order_date" }, Column { relation: Some(Bare { table: "o" }), name: "status" }, Column { relation: Some(Bare { table: "o" }), name: "channel" }] }, Some(""))
Clearly the CSE pass here seems to be introducing an error here, so lets disable that optimizer and try again:
ctx.remove_optimizer_rule(CommonSubexprEliminate::new().name());
This results in a different error, now coming from DuckDB:
registering tables
Unoptimized query:
success
building optimized plan
unparsing optimized plan
Generated SQL:
SELECT "oi"."item_id", "oi"."product_id", "oi"."quantity", "oi"."unit_price", "oi"."line_total", "o"."order_id", "o"."customer_id", "o"."order_date", "o"."status", "o"."channel", "o"."discount_pct", "o"."shipping_cost", "o"."is_fulfilled", "p"."category_id", "p"."sku", "p"."product_name", "p"."price", "p"."cost", "p"."weight_kg", "p"."is_active", "p"."stock_qty", "p"."gross_margin", "p"."margin_pct", "p"."category_name" FROM (SELECT "oi"."item_id", "oi"."product_id", "oi"."quantity", "oi"."unit_price", "oi"."line_total", "o"."order_id", "o"."customer_id", "o"."order_date", "o"."status", "o"."channel", "o"."discount_pct", "o"."shipping_cost", "o"."is_fulfilled" FROM (SELECT "oi"."item_id", "oi"."order_id", "oi"."product_id", "oi"."quantity", "oi"."unit_price", (CAST("oi"."quantity" AS DECIMAL(10,0)) * "oi"."unit_price") AS "line_total" FROM "warehouse"."main"."order_items" AS "oi") AS "oi" INNER JOIN (SELECT "o"."order_id", "o"."customer_id", "o"."order_date", lower("o"."status") AS "status", lower("o"."channel") AS "channel", CASE WHEN CAST("o"."discount_pct" AS DECIMAL(22,2)) IS NOT NULL THEN CAST("o"."discount_pct" AS DECIMAL(22,2)) ELSE 0.00 END AS "discount_pct", CASE WHEN CAST("o"."shipping_cost" AS DECIMAL(22,2)) IS NOT NULL THEN CAST("o"."shipping_cost" AS DECIMAL(22,2)) ELSE 0.00 END AS "shipping_cost", (("o"."status" = 'completed') OR ("o"."status" = 'shipped')) AS "is_fulfilled" FROM "warehouse"."main"."orders" AS "o") AS "o" ON "oi"."order_id" = "o"."order_id") INNER JOIN (SELECT "p"."product_id", "p"."category_id", "p"."sku", "p"."name" AS "product_name", "p"."price", "p"."cost", "p"."weight_kg", "p"."is_active", "p"."stock_qty", round(("p"."price" - "p"."cost"), 2) AS "gross_margin", round((("p"."price" - "p"."cost") // nullif(CAST("p"."price" AS DECIMAL(22,2)), 0.00)), 4) AS "margin_pct", "c"."name" AS "category_name" FROM "warehouse"."main"."products" AS "p" LEFT OUTER JOIN "warehouse"."main"."categories" AS "c" USING("category_id")) AS "p" ON "oi"."product_id" = "p"."product_id"
Optimized query:
Error: DuckDBFailure(Error { code: Unknown, extended_code: 1 }, Some("Binder Error: Referenced table \"oi\" not found!\nCandidate tables: \"p\"\n\nLINE 1: ...\".\"main\".\"categories\" AS \"c\" USING(\"category_id\")) AS \"p\" ON \"oi\".\"product_id\" = \"p\".\"product_id\"\n
Expected behavior
Both of the unoptimized and optimized plans should be able to be unparsed into executable SQL. I don't expect them to be identical SQL statements, but I do expect them to be logically equivalent.
Additional context
As far as I understand the Unparser, this is unintended and an optimized LogicalPlan should still be able to be unparsed into valid SQL. I've run into quite a lot of similar problems where the resulting SQL is mostly correct but then there are either missing or incorrect SubqueryAlias's that result in ambiguity in the resulting query. To me this seems more likely to be a bug in how the Unparser processes plans that have been mangled by the optimizer, rather than a bug with the optimizer. The optimized LogicalPlans that are generated look correct to me.
Note, I've also seen problems relating to the group_alias_N's that are created by the SingleDistinctToGroupBy pass. But, I didn't include them here yet as I haven't been able to put together a simple reproducible example.
Describe the bug
The logical optimizer (ie,
into_optimized_plan()) creates logical plans that can't be unparsed into correct SQL. This doesn't seem universal but does apply to a lot of the queries I'm trying.Succeeds:
Fails:
To Reproduce
Here is a simple example that reproduces the behavior I'm describing:
Generate the empty DuckDB tables:
The reproducer will result in the following error:
Clearly the CSE pass here seems to be introducing an error here, so lets disable that optimizer and try again:
This results in a different error, now coming from DuckDB:
Expected behavior
Both of the unoptimized and optimized plans should be able to be unparsed into executable SQL. I don't expect them to be identical SQL statements, but I do expect them to be logically equivalent.
Additional context
As far as I understand the
Unparser, this is unintended and an optimizedLogicalPlanshould still be able to be unparsed into valid SQL. I've run into quite a lot of similar problems where the resulting SQL is mostly correct but then there are either missing or incorrectSubqueryAlias's that result in ambiguity in the resulting query. To me this seems more likely to be a bug in how theUnparserprocesses plans that have been mangled by the optimizer, rather than a bug with the optimizer. The optimizedLogicalPlans that are generated look correct to me.Note, I've also seen problems relating to the
group_alias_N's that are created by the SingleDistinctToGroupBy pass. But, I didn't include them here yet as I haven't been able to put together a simple reproducible example.