Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions src/query/service/src/pipelines/builders/builder_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::FunctionContext;
Expand Down Expand Up @@ -45,11 +46,15 @@ impl PipelineBuilder {
#[cfg(debug_assertions)]
{
let f = input_schema.field_with_name(index.to_string().as_str())?;
assert_eq!(
f.data_type(),
column_binding.data_type.as_ref(),
"Result projection schema mismatch"
);
if f.data_type() != column_binding.data_type.as_ref() {
return Err(ErrorCode::Internal(format!(
"Result projection schema mismatch for column {} (index {}): actual {:?}, expected {:?}",
column_binding.column_name,
column_binding.index,
f.data_type(),
column_binding.data_type.as_ref()
)));
}
}
}
let num_input_columns = input_schema.num_fields();
Expand Down
1 change: 1 addition & 0 deletions src/query/service/tests/it/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ mod exec;
mod expr;
mod planner;
mod recursive_cte;
mod schema;
54 changes: 54 additions & 0 deletions src/query/service/tests/it/sql/schema.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_expression::DataBlock;
use databend_common_expression::ScalarRef;
use databend_common_expression::types::DataType;
use databend_common_expression::types::number::NumberDataType;
use databend_common_expression::types::number::NumberScalar;
use databend_query::interpreters::InterpreterFactory;
use databend_query::sql::Planner;
use databend_query::test_kits::TestFixture;
use futures_util::TryStreamExt;

#[tokio::test(flavor = "multi_thread")]
async fn test_full_outer_join_using_reports_nullable_result_schema() -> anyhow::Result<()> {
let fixture = TestFixture::setup().await?;
let ctx = fixture.new_query_ctx().await?;
let sql = "SELECT x FROM (SELECT 1::INT64 AS x) AS a FULL OUTER JOIN (SELECT 2::INT64 AS x) AS b USING (x) ORDER BY x NULLS LAST";

let mut planner = Planner::new(ctx.clone());
let (plan, _) = planner.plan_sql(sql).await?;

let expected = DataType::Number(NumberDataType::Int64).wrap_nullable();
assert_eq!(plan.schema().field(0).data_type(), &expected);

let executor = InterpreterFactory::get(ctx.clone(), &plan).await?;
let blocks: Vec<DataBlock> = executor.execute(ctx).await?.try_collect().await?;
let block = DataBlock::concat(&blocks)?;
assert_eq!(block.infer_schema().field(0).data_type(), &expected);
assert_eq!(block.num_rows(), 2, "unexpected rows: {}", block.num_rows());

let values = (0..block.num_rows())
.map(
|row| match block.get_by_offset(0).index(row).expect("scalar at row") {
ScalarRef::Number(NumberScalar::Int64(value)) => value,
other => panic!("unexpected scalar type at row {row}: {other:?}"),
},
)
.collect::<Vec<_>>();
assert_eq!(values, vec![1, 2]);

Ok(())
}
145 changes: 135 additions & 10 deletions src/query/sql/src/planner/binder/bind_table_reference/bind_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use databend_common_exception::Result;

use crate::BindContext;
use crate::ColumnBinding;
use crate::ColumnBindingBuilder;
use crate::ColumnSet;
use crate::MetadataRef;
use crate::binder::Finder;
Expand All @@ -45,6 +46,7 @@ use crate::planner::semantic::NameResolutionContext;
use crate::plans::BoundColumnRef;
use crate::plans::EvalScalar;
use crate::plans::Filter;
use crate::plans::FunctionCall;
use crate::plans::HashJoinBuildCacheInfo;
use crate::plans::Join;
use crate::plans::JoinEquiCondition;
Expand Down Expand Up @@ -151,7 +153,7 @@ impl Binder {
let build_side_cache_info = self.expression_scan_context.generate_cache_info(cache_idx);

let join_type = join_type(&join.op);
let s_expr = self.bind_join_with_type(
let mut s_expr = self.bind_join_with_type(
join_type,
join_conditions,
(left_child, &mut left_context),
Expand All @@ -167,6 +169,21 @@ impl Binder {
right_context.clone(),
);

if matches!(join.op, JoinOperator::FullOuter) {
let using_scalars = self.build_full_outer_using_scalars(
&mut bind_context,
&join.condition,
&left_context.columns,
&right_context.columns,
)?;
if !using_scalars.is_empty() {
let eval_scalar = EvalScalar {
items: using_scalars,
};
s_expr = SExpr::create_unary(Arc::new(eval_scalar.into()), Arc::new(s_expr));
}
}

bind_context
.cte_context
.set_cte_context(right_context.cte_context);
Expand Down Expand Up @@ -278,6 +295,99 @@ impl Binder {
}
}

fn build_full_outer_using_scalars(
&mut self,
bind_context: &mut BindContext,
join_condition: &JoinCondition,
left_column_bindings: &[ColumnBinding],
right_column_bindings: &[ColumnBinding],
) -> Result<Vec<ScalarItem>> {
let mut using_columns = Vec::new();
match join_condition {
JoinCondition::Using(identifiers) => {
using_columns.extend(identifiers.iter().map(|identifier| {
(
identifier.span,
normalize_identifier(identifier, &self.name_resolution_ctx).name,
)
}));
}
JoinCondition::Natural => {
for left_column in left_column_bindings {
if right_column_bindings
.iter()
.any(|right_column| right_column.column_name == left_column.column_name)
{
using_columns.push((None, left_column.column_name.clone()));
}
}
}
_ => return Ok(vec![]),
}

let mut derived_scalars = Vec::with_capacity(using_columns.len());
for (_, join_key_name) in using_columns {
let Some(left_column) = left_column_bindings
.iter()
.find(|column| column.column_name == join_key_name)
else {
continue;
};
let Some(right_column) = right_column_bindings
.iter()
.find(|column| column.column_name == join_key_name)
else {
continue;
};

let Some(left_pos) = bind_context
.columns
.iter()
.position(|column| column.index == left_column.index)
else {
continue;
};

let coalesced_index = self
.metadata
.write()
.add_derived_column(join_key_name.clone(), (*left_column.data_type).clone());
let coalesced_scalar = ScalarExpr::FunctionCall(FunctionCall {
span: None,
func_name: "coalesce".to_string(),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FunctionCall { func_name: "coalesce" } is not executable in this planner path. CI is failing with UnknownFunction: function coalesce does not exist, and the sqllogic failures point directly at NATURAL/FULL OUTER JOIN queries after this rewrite. Please build this expression through the normal type-check / function-resolution path (or use the internal scalar form that execution already understands) instead of hard-coding a raw FunctionCall here.

params: vec![],
arguments: vec![
ScalarExpr::BoundColumnRef(BoundColumnRef {
span: None,
column: left_column.clone(),
}),
ScalarExpr::BoundColumnRef(BoundColumnRef {
span: None,
column: right_column.clone(),
}),
],
});
derived_scalars.push(ScalarItem {
scalar: coalesced_scalar,
index: coalesced_index,
});

let coalesced_binding = ColumnBindingBuilder::new(
join_key_name.clone(),
coalesced_index,
left_column.data_type.clone(),
Visibility::Visible,
)
.case_sensitive(self.name_resolution_ctx.unquoted_ident_case_sensitive)
.build();

bind_context.columns[left_pos].visibility = Visibility::UnqualifiedWildcardInVisible;
bind_context.columns.insert(left_pos, coalesced_binding);
}

Ok(derived_scalars)
}

// Wrap nullable types for not nullable columns.
fn wrap_nullable_column_bindings(
&self,
Expand Down Expand Up @@ -940,19 +1050,34 @@ impl<'a> JoinConditionResolver<'a> {
))
.set_span(*span));
};
let idx = !matches!(join_op, JoinOperator::RightOuter) as usize;
if let Some(col_binding) = self
let using_column_indexes = self
.join_context
.columns
.iter_mut()
.filter(|col_binding| {
col_binding.column_name == join_key_name
&& col_binding.visibility != Visibility::UnqualifiedWildcardInVisible
.iter()
.enumerate()
.filter_map(|(idx, col_binding)| {
(col_binding.column_name == join_key_name
&& col_binding.visibility != Visibility::UnqualifiedWildcardInVisible)
.then_some(idx)
})
.nth(idx)
.collect::<Vec<_>>();

if matches!(join_op, JoinOperator::FullOuter)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For USING, unqualified name resolution keeps only the one binding whose visibility stays visible (see BindContext::match_column_binding), so this branch is still making x resolve to a single input column. For FULL OUTER JOIN, that is not the merged USING semantics: SELECT x FROM (SELECT 1 AS x) a FULL OUTER JOIN (SELECT 2 AS x) b USING (x) should surface 1, 2, but the surviving left-side binding yields 1, NULL on the right-only row. This patch aligns the schema with the current nullable left-column behavior instead of fixing the merged-key behavior. The fix needs to produce a coalesced visible USING expression (or equivalent) and the regression test should assert row values, not only the inferred type.

&& let Some(visible_index) = using_column_indexes.first()
{
// Always make the second using column in the join_context invisible in unqualified wildcard.
col_binding.visibility = Visibility::UnqualifiedWildcardInVisible;
let visible_column = &mut self.join_context.columns[*visible_index];
if !visible_column.data_type.is_nullable_or_null() {
visible_column.data_type = Box::new(visible_column.data_type.wrap_nullable());
}
}

let idx = !matches!(join_op, JoinOperator::RightOuter) as usize;
if let Some(hidden_index) = using_column_indexes.get(idx) {
// Always make the second USING column in the join context invisible in
// unqualified wildcard expansion, except for RIGHT OUTER JOIN where the
// left-side column is hidden and the right-side column remains visible.
self.join_context.columns[*hidden_index].visibility =
Visibility::UnqualifiedWildcardInVisible;
}

self.add_equi_conditions(
Expand Down
Loading