-
Notifications
You must be signed in to change notification settings - Fork 860
fix(query): align full outer USING nullability #19616
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,3 +16,4 @@ mod exec; | |
| mod expr; | ||
| mod planner; | ||
| mod recursive_cte; | ||
| mod schema; | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| // Copyright 2021 Datafuse Labs | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| use databend_common_expression::DataBlock; | ||
| use databend_common_expression::ScalarRef; | ||
| use databend_common_expression::types::DataType; | ||
| use databend_common_expression::types::number::NumberDataType; | ||
| use databend_common_expression::types::number::NumberScalar; | ||
| use databend_query::interpreters::InterpreterFactory; | ||
| use databend_query::sql::Planner; | ||
| use databend_query::test_kits::TestFixture; | ||
| use futures_util::TryStreamExt; | ||
|
|
||
| #[tokio::test(flavor = "multi_thread")] | ||
| async fn test_full_outer_join_using_reports_nullable_result_schema() -> anyhow::Result<()> { | ||
| let fixture = TestFixture::setup().await?; | ||
| let ctx = fixture.new_query_ctx().await?; | ||
| let sql = "SELECT x FROM (SELECT 1::INT64 AS x) AS a FULL OUTER JOIN (SELECT 2::INT64 AS x) AS b USING (x) ORDER BY x NULLS LAST"; | ||
|
|
||
| let mut planner = Planner::new(ctx.clone()); | ||
| let (plan, _) = planner.plan_sql(sql).await?; | ||
|
|
||
| let expected = DataType::Number(NumberDataType::Int64).wrap_nullable(); | ||
| assert_eq!(plan.schema().field(0).data_type(), &expected); | ||
|
|
||
| let executor = InterpreterFactory::get(ctx.clone(), &plan).await?; | ||
| let blocks: Vec<DataBlock> = executor.execute(ctx).await?.try_collect().await?; | ||
| let block = DataBlock::concat(&blocks)?; | ||
| assert_eq!(block.infer_schema().field(0).data_type(), &expected); | ||
| assert_eq!(block.num_rows(), 2, "unexpected rows: {}", block.num_rows()); | ||
|
|
||
| let values = (0..block.num_rows()) | ||
| .map( | ||
| |row| match block.get_by_offset(0).index(row).expect("scalar at row") { | ||
| ScalarRef::Number(NumberScalar::Int64(value)) => value, | ||
| other => panic!("unexpected scalar type at row {row}: {other:?}"), | ||
| }, | ||
| ) | ||
| .collect::<Vec<_>>(); | ||
| assert_eq!(values, vec![1, 2]); | ||
|
|
||
| Ok(()) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,6 +27,7 @@ use databend_common_exception::Result; | |
|
|
||
| use crate::BindContext; | ||
| use crate::ColumnBinding; | ||
| use crate::ColumnBindingBuilder; | ||
| use crate::ColumnSet; | ||
| use crate::MetadataRef; | ||
| use crate::binder::Finder; | ||
|
|
@@ -45,6 +46,7 @@ use crate::planner::semantic::NameResolutionContext; | |
| use crate::plans::BoundColumnRef; | ||
| use crate::plans::EvalScalar; | ||
| use crate::plans::Filter; | ||
| use crate::plans::FunctionCall; | ||
| use crate::plans::HashJoinBuildCacheInfo; | ||
| use crate::plans::Join; | ||
| use crate::plans::JoinEquiCondition; | ||
|
|
@@ -151,7 +153,7 @@ impl Binder { | |
| let build_side_cache_info = self.expression_scan_context.generate_cache_info(cache_idx); | ||
|
|
||
| let join_type = join_type(&join.op); | ||
| let s_expr = self.bind_join_with_type( | ||
| let mut s_expr = self.bind_join_with_type( | ||
| join_type, | ||
| join_conditions, | ||
| (left_child, &mut left_context), | ||
|
|
@@ -167,6 +169,21 @@ impl Binder { | |
| right_context.clone(), | ||
| ); | ||
|
|
||
| if matches!(join.op, JoinOperator::FullOuter) { | ||
| let using_scalars = self.build_full_outer_using_scalars( | ||
| &mut bind_context, | ||
| &join.condition, | ||
| &left_context.columns, | ||
| &right_context.columns, | ||
| )?; | ||
| if !using_scalars.is_empty() { | ||
| let eval_scalar = EvalScalar { | ||
| items: using_scalars, | ||
| }; | ||
| s_expr = SExpr::create_unary(Arc::new(eval_scalar.into()), Arc::new(s_expr)); | ||
| } | ||
| } | ||
|
|
||
| bind_context | ||
| .cte_context | ||
| .set_cte_context(right_context.cte_context); | ||
|
|
@@ -278,6 +295,99 @@ impl Binder { | |
| } | ||
| } | ||
|
|
||
| fn build_full_outer_using_scalars( | ||
| &mut self, | ||
| bind_context: &mut BindContext, | ||
| join_condition: &JoinCondition, | ||
| left_column_bindings: &[ColumnBinding], | ||
| right_column_bindings: &[ColumnBinding], | ||
| ) -> Result<Vec<ScalarItem>> { | ||
| let mut using_columns = Vec::new(); | ||
| match join_condition { | ||
| JoinCondition::Using(identifiers) => { | ||
| using_columns.extend(identifiers.iter().map(|identifier| { | ||
| ( | ||
| identifier.span, | ||
| normalize_identifier(identifier, &self.name_resolution_ctx).name, | ||
| ) | ||
| })); | ||
| } | ||
| JoinCondition::Natural => { | ||
| for left_column in left_column_bindings { | ||
| if right_column_bindings | ||
| .iter() | ||
| .any(|right_column| right_column.column_name == left_column.column_name) | ||
| { | ||
| using_columns.push((None, left_column.column_name.clone())); | ||
| } | ||
| } | ||
| } | ||
| _ => return Ok(vec![]), | ||
| } | ||
|
|
||
| let mut derived_scalars = Vec::with_capacity(using_columns.len()); | ||
| for (_, join_key_name) in using_columns { | ||
| let Some(left_column) = left_column_bindings | ||
| .iter() | ||
| .find(|column| column.column_name == join_key_name) | ||
| else { | ||
| continue; | ||
| }; | ||
| let Some(right_column) = right_column_bindings | ||
| .iter() | ||
| .find(|column| column.column_name == join_key_name) | ||
| else { | ||
| continue; | ||
| }; | ||
|
|
||
| let Some(left_pos) = bind_context | ||
| .columns | ||
| .iter() | ||
| .position(|column| column.index == left_column.index) | ||
| else { | ||
| continue; | ||
| }; | ||
|
|
||
| let coalesced_index = self | ||
| .metadata | ||
| .write() | ||
| .add_derived_column(join_key_name.clone(), (*left_column.data_type).clone()); | ||
| let coalesced_scalar = ScalarExpr::FunctionCall(FunctionCall { | ||
| span: None, | ||
| func_name: "coalesce".to_string(), | ||
| params: vec![], | ||
| arguments: vec![ | ||
| ScalarExpr::BoundColumnRef(BoundColumnRef { | ||
| span: None, | ||
| column: left_column.clone(), | ||
| }), | ||
| ScalarExpr::BoundColumnRef(BoundColumnRef { | ||
| span: None, | ||
| column: right_column.clone(), | ||
| }), | ||
| ], | ||
| }); | ||
| derived_scalars.push(ScalarItem { | ||
| scalar: coalesced_scalar, | ||
| index: coalesced_index, | ||
| }); | ||
|
|
||
| let coalesced_binding = ColumnBindingBuilder::new( | ||
| join_key_name.clone(), | ||
| coalesced_index, | ||
| left_column.data_type.clone(), | ||
| Visibility::Visible, | ||
| ) | ||
| .case_sensitive(self.name_resolution_ctx.unquoted_ident_case_sensitive) | ||
| .build(); | ||
|
|
||
| bind_context.columns[left_pos].visibility = Visibility::UnqualifiedWildcardInVisible; | ||
| bind_context.columns.insert(left_pos, coalesced_binding); | ||
| } | ||
|
|
||
| Ok(derived_scalars) | ||
| } | ||
|
|
||
| // Wrap nullable types for not nullable columns. | ||
| fn wrap_nullable_column_bindings( | ||
| &self, | ||
|
|
@@ -940,19 +1050,34 @@ impl<'a> JoinConditionResolver<'a> { | |
| )) | ||
| .set_span(*span)); | ||
| }; | ||
| let idx = !matches!(join_op, JoinOperator::RightOuter) as usize; | ||
| if let Some(col_binding) = self | ||
| let using_column_indexes = self | ||
| .join_context | ||
| .columns | ||
| .iter_mut() | ||
| .filter(|col_binding| { | ||
| col_binding.column_name == join_key_name | ||
| && col_binding.visibility != Visibility::UnqualifiedWildcardInVisible | ||
| .iter() | ||
| .enumerate() | ||
| .filter_map(|(idx, col_binding)| { | ||
| (col_binding.column_name == join_key_name | ||
| && col_binding.visibility != Visibility::UnqualifiedWildcardInVisible) | ||
| .then_some(idx) | ||
| }) | ||
| .nth(idx) | ||
| .collect::<Vec<_>>(); | ||
|
|
||
| if matches!(join_op, JoinOperator::FullOuter) | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For |
||
| && let Some(visible_index) = using_column_indexes.first() | ||
| { | ||
| // Always make the second using column in the join_context invisible in unqualified wildcard. | ||
| col_binding.visibility = Visibility::UnqualifiedWildcardInVisible; | ||
| let visible_column = &mut self.join_context.columns[*visible_index]; | ||
| if !visible_column.data_type.is_nullable_or_null() { | ||
| visible_column.data_type = Box::new(visible_column.data_type.wrap_nullable()); | ||
| } | ||
| } | ||
|
|
||
| let idx = !matches!(join_op, JoinOperator::RightOuter) as usize; | ||
| if let Some(hidden_index) = using_column_indexes.get(idx) { | ||
| // Always make the second USING column in the join context invisible in | ||
| // unqualified wildcard expansion, except for RIGHT OUTER JOIN where the | ||
| // left-side column is hidden and the right-side column remains visible. | ||
| self.join_context.columns[*hidden_index].visibility = | ||
| Visibility::UnqualifiedWildcardInVisible; | ||
| } | ||
|
|
||
| self.add_equi_conditions( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FunctionCall { func_name: "coalesce" }is not executable in this planner path. CI is failing withUnknownFunction: functioncoalescedoes not exist, and the sqllogic failures point directly atNATURAL/FULL OUTER JOINqueries after this rewrite. Please build this expression through the normal type-check / function-resolution path (or use the internal scalar form that execution already understands) instead of hard-coding a rawFunctionCallhere.