-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[feature](routine-load) Support flexible partial update for routine load #59896
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[feature](routine-load) Support flexible partial update for routine load #59896
Conversation
Add support for `unique_key_update_mode` property in routine load to enable flexible partial columns update. This allows different rows in the same batch to update different columns, unlike fixed partial update where all rows must update the same columns. Changes: - Add `unique_key_update_mode` property to CreateRoutineLoadInfo with values: UPSERT (default), UPDATE_FIXED_COLUMNS, UPDATE_FLEXIBLE_COLUMNS - Add validation for flexible partial update constraints (JSON format only, no jsonpaths, no fuzzy_parse, no COLUMNS clause, no WHERE clause, table must have skip_bitmap column enabled) - Update RoutineLoadJob to persist and restore the update mode - Update KafkaRoutineLoadJob to pass update mode to task info - Support ALTER ROUTINE LOAD to change unique_key_update_mode - Add regression tests covering basic usage and error cases - Fix HashMap ordering issue in gsonPostProcess for backward compatibility - Add validation when ALTER changes mode to UPDATE_FLEXIBLE_COLUMNS - Add comprehensive ALTER test cases for flexible partial update validation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds support for flexible partial columns update in routine load jobs, enabling different rows in the same batch to update different columns. This is achieved by introducing the unique_key_update_mode property with three modes: UPSERT (default), UPDATE_FIXED_COLUMNS (backward compatible with partial_columns), and UPDATE_FLEXIBLE_COLUMNS (new flexible mode).
Changes:
- Introduced
unique_key_update_modeproperty to configure update behavior with three modes - Added validation logic for flexible partial update constraints (JSON format, no jsonpaths, no fuzzy_parse, skip_bitmap column required, etc.)
- Updated ALTER ROUTINE LOAD to support changing the update mode with appropriate validation
- Added comprehensive regression tests covering basic usage, error cases, and ALTER operations
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| CreateRoutineLoadInfo.java | Added unique_key_update_mode property parsing, validation, and flexible partial update constraint checks |
| AlterRoutineLoadCommand.java | Added support for altering unique_key_update_mode property with validation |
| RoutineLoadJob.java | Added update mode persistence, restoration in gsonPostProcess, and validation logic for ALTER operations |
| KafkaRoutineLoadJob.java | Updated to pass uniqueKeyUpdateMode to task info and changed method signature to throw UserException |
| NereidsRoutineLoadTaskInfo.java | Changed constructor to accept uniqueKeyUpdateMode instead of isPartialUpdate flag |
| test_routine_load_flexible_partial_update.groovy | Comprehensive test suite with 21 test cases covering feature functionality and error scenarios |
| test_routine_load_flexible_partial_update.out | Expected output for regression tests |
Comments suppressed due to low confidence (2)
fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java:1
- The modifyPropertiesInternal method in KafkaRoutineLoadJob doesn't synchronize with the new modifyCommonJobProperties logic. When PARTIAL_COLUMNS is set here, it doesn't update uniqueKeyUpdateMode, which could lead to inconsistency. This code should be removed since modifyCommonJobProperties (called at line 792) now handles PARTIAL_COLUMNS and uniqueKeyUpdateMode synchronization.
// Licensed to the Apache Software Foundation (ASF) under one
fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java:1
- This code duplicates the PARTIAL_UPDATE_NEW_KEY_POLICY handling that already exists in modifyCommonJobProperties. Since modifyCommonJobProperties is called first at line 792, this duplicate code can be removed to avoid redundancy and potential inconsistency.
// Licensed to the Apache Software Foundation (ASF) under one
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Backward compatibility: partial_columns=true maps to UPDATE_FIXED_COLUMNS | ||
| this.isPartialUpdate = this.jobProperties.getOrDefault(PARTIAL_COLUMNS, "false").equalsIgnoreCase("true"); | ||
| if (this.isPartialUpdate) { | ||
| this.uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS; | ||
| } |
Copilot
AI
Jan 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When both partial_columns and unique_key_update_mode are specified in jobProperties, the order of processing matters due to HashMap iteration. Consider adding validation in checkJobProperties to prevent users from specifying conflicting combinations like partial_columns=false with unique_key_update_mode=UPDATE_FIXED_COLUMNS.
| ( | ||
| "max_batch_interval" = "10", | ||
| "format" = "json", | ||
| "jsonpaths" = "[\\"\\$.id\\", \\"\\$.name\\", \\"\\$.score\\"]" |
Copilot
AI
Jan 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The excessive escaping in jsonpaths string makes it hard to read. Consider using single quotes for the outer string or triple-quoted strings in Groovy to improve readability: "jsonpaths" = '["$.id", "$.name", "$.score"]'
| this.isPartialUpdate = Boolean.parseBoolean( | ||
| jobProperties.remove(CreateRoutineLoadInfo.PARTIAL_COLUMNS)); | ||
| if (this.isPartialUpdate && uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPSERT) { | ||
| this.uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS; |
Copilot
AI
Jan 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When PARTIAL_COLUMNS is set to false but uniqueKeyUpdateMode is already UPDATE_FIXED_COLUMNS (e.g., from a previous ALTER), this code doesn't reset uniqueKeyUpdateMode back to UPSERT. This could leave the job in an inconsistent state. Add: 'else if (!this.isPartialUpdate && uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) { this.uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPSERT; }'
| this.uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS; | |
| this.uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS; | |
| } else if (!this.isPartialUpdate && uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) { | |
| this.uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPSERT; |
| // Backward compatibility: if unique_key_update_mode is not set, use partial_columns | ||
| if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPSERT) { | ||
| isPartialUpdate = Boolean.parseBoolean(v); | ||
| if (isPartialUpdate) { | ||
| uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS; | ||
| } |
Copilot
AI
Jan 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The gsonPostProcess logic doesn't handle the case where PARTIAL_COLUMNS is 'false' but uniqueKeyUpdateMode was previously UPDATE_FIXED_COLUMNS. When deserializing old metadata, if partial_columns=false is processed, it should reset uniqueKeyUpdateMode to UPSERT for consistency. Add an else branch to handle this case.
| // Backward compatibility: if unique_key_update_mode is not set, use partial_columns | |
| if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPSERT) { | |
| isPartialUpdate = Boolean.parseBoolean(v); | |
| if (isPartialUpdate) { | |
| uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS; | |
| } | |
| boolean partialColumns = Boolean.parseBoolean(v); | |
| // Backward compatibility: if unique_key_update_mode is not set, use partial_columns | |
| if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPSERT) { | |
| isPartialUpdate = partialColumns; | |
| if (isPartialUpdate) { | |
| uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS; | |
| } | |
| } else if (!partialColumns | |
| && uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) { | |
| // Backward compatibility: if partial_columns is false but unique_key_update_mode | |
| // was UPDATE_FIXED_COLUMNS, reset to UPSERT for consistency | |
| isPartialUpdate = false; | |
| uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPSERT; |
| // Check for conflicting settings: partial_columns=true with unique_key_update_mode=UPSERT | ||
| if (jobProperties.containsKey(PARTIAL_COLUMNS) | ||
| && jobProperties.get(PARTIAL_COLUMNS).equalsIgnoreCase("true") | ||
| && "UPSERT".equals(modeStr)) { | ||
| throw new AnalysisException("Cannot set both 'partial_columns=true' and " | ||
| + "'unique_key_update_mode=UPSERT'. " | ||
| + "Use unique_key_update_mode=UPDATE_FIXED_COLUMNS instead."); |
Copilot
AI
Jan 14, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This validation only checks for the conflicting case (partial_columns=true with UPSERT) but doesn't check for partial_columns=false with UPDATE_FIXED_COLUMNS or UPDATE_FLEXIBLE_COLUMNS, which is also conflicting. Add validation for these cases as well to ensure consistency.
| // Check for conflicting settings: partial_columns=true with unique_key_update_mode=UPSERT | |
| if (jobProperties.containsKey(PARTIAL_COLUMNS) | |
| && jobProperties.get(PARTIAL_COLUMNS).equalsIgnoreCase("true") | |
| && "UPSERT".equals(modeStr)) { | |
| throw new AnalysisException("Cannot set both 'partial_columns=true' and " | |
| + "'unique_key_update_mode=UPSERT'. " | |
| + "Use unique_key_update_mode=UPDATE_FIXED_COLUMNS instead."); | |
| // Check for conflicting settings between partial_columns and unique_key_update_mode | |
| if (jobProperties.containsKey(PARTIAL_COLUMNS)) { | |
| String partialColumnsVal = jobProperties.get(PARTIAL_COLUMNS); | |
| // partial_columns=true is not allowed with UPSERT | |
| if (partialColumnsVal.equalsIgnoreCase("true") | |
| && "UPSERT".equals(modeStr)) { | |
| throw new AnalysisException("Cannot set both 'partial_columns=true' and " | |
| + "'unique_key_update_mode=UPSERT'. " | |
| + "Use unique_key_update_mode=UPDATE_FIXED_COLUMNS instead."); | |
| } | |
| // partial_columns=false is not allowed with UPDATE_FIXED_COLUMNS or UPDATE_FLEXIBLE_COLUMNS | |
| if (partialColumnsVal.equalsIgnoreCase("false") | |
| && ("UPDATE_FIXED_COLUMNS".equals(modeStr) | |
| || "UPDATE_FLEXIBLE_COLUMNS".equals(modeStr))) { | |
| throw new AnalysisException("Cannot set 'partial_columns=false' when " | |
| + "'unique_key_update_mode' is 'UPDATE_FIXED_COLUMNS' or 'UPDATE_FLEXIBLE_COLUMNS'. " | |
| + "Use unique_key_update_mode=UPSERT instead, or enable partial columns."); | |
| } |
c821a14 to
bf7fe5d
Compare
1. Fix checkstyle: line length exceeds 120 characters - Split long exception message string to comply with 120-character limit 2. Add shared parseUniqueKeyUpdateMode() helper methods in CreateRoutineLoadInfo - parseUniqueKeyUpdateMode(String): returns TUniqueKeyUpdateMode or null - parseAndValidateUniqueKeyUpdateMode(String): validates and throws on error - Replaces duplicated switch/if-else logic across 4 files 3. Add OlapTable.validateForFlexiblePartialUpdate() method - Centralizes table-level validation (MoW, skip_bitmap, light_schema_change, variant) - Used by CreateRoutineLoadInfo, RoutineLoadJob, and NereidsStreamLoadPlanner 4. Update all callers to use shared validation methods - Reduces code duplication and ensures consistent error messages 5. Allow jsonpaths, WHERE clause, and MERGE/DELETE with flexible partial update - Removed restrictions that blocked these features
bf7fe5d to
3353055
Compare
Code reviewFound 1 issue:
doris/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy Lines 273 to 275 in 3353055
Test expects: exception "Flexible partial update does not support jsonpaths"But Lines 449 to 468 in 3353055
🤖 Generated with Claude Code - If this code review was useful, please react with 👍. Otherwise, react with 👎. |
1. Fix exception type mismatch in KafkaRoutineLoadJob.replayModifyProperties
- Changed catch block from DdlException to UserException since
modifyPropertiesInternal now throws UserException
2. Fix setSchemaForPartialUpdate not called for flexible partial update
- Changed condition from isPartialUpdate to check both
UPDATE_FIXED_COLUMNS and UPDATE_FLEXIBLE_COLUMNS modes
- Aligns with StreamLoadHandler behavior
3. Update tests to allow WHERE clause and jsonpaths with flexible partial update
- Tests 4, 7, 16, 18 now verify these features work correctly
- Added expected output for new success test cases
|
run buildall |
- Test parseUniqueKeyUpdateMode() with valid/invalid mode strings - Test parseAndValidateUniqueKeyUpdateMode() with exception handling - Test backward compatibility: partial_columns=true maps to UPDATE_FIXED_COLUMNS - Test unique_key_update_mode takes precedence over partial_columns
|
run buildall |
TPC-H: Total hot run time: 31178 ms |
TPC-DS: Total hot run time: 173916 ms |
ClickBench: Total hot run time: 27.23 s |
Test the backward compatibility and precedence logic directly without calling gsonPostProcess() which requires origStmt
|
run buildall |
TPC-H: Total hot run time: 32188 ms |
TPC-DS: Total hot run time: 174083 ms |
ClickBench: Total hot run time: 26.8 s |
FE UT Coverage ReportIncrement line coverage |
Use 'can only support' format instead of 'requires' to match existing test expectations in test_flexible_partial_update_restricts.groovy
|
run buildall |
TPC-H: Total hot run time: 31576 ms |
TPC-DS: Total hot run time: 173538 ms |
ClickBench: Total hot run time: 26.64 s |
FE UT Coverage ReportIncrement line coverage |
Add support for
unique_key_update_modeproperty in routine load to enable flexible partial columns update. This allows different rows in the same batch to update different columns, unlike fixed partial update where all rows must update the same columns.Changes:
unique_key_update_modeproperty to CreateRoutineLoadInfo with values: UPSERT (default), UPDATE_FIXED_COLUMNS, UPDATE_FLEXIBLE_COLUMNSWhat problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)