Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2977,6 +2977,33 @@ public boolean getEnableUniqueKeySkipBitmap() {
return hasSkipBitmapColumn();
}

/**
* Validate that the table supports flexible partial update.
* Checks the following constraints:
* 1. Must be MoW unique key table
* 2. Must have skip_bitmap column
* 3. Must have light_schema_change enabled
* 4. Cannot have variant columns
* @throws UserException if any constraint is not satisfied
*/
public void validateForFlexiblePartialUpdate() throws UserException {
if (!getEnableUniqueKeyMergeOnWrite()) {
throw new UserException("Flexible partial update is only supported in unique table MoW");
}
if (!hasSkipBitmapColumn()) {
throw new UserException("Flexible partial update can only support table with skip bitmap hidden column."
+ " But table " + getName() + " doesn't have it. You can use `ALTER TABLE " + getName()
+ " ENABLE FEATURE \"UPDATE_FLEXIBLE_COLUMNS\";` to add it to the table.");
}
if (!getEnableLightSchemaChange()) {
throw new UserException("Flexible partial update can only support table with light_schema_change enabled."
+ " But table " + getName() + "'s property light_schema_change is false");
}
if (hasVariantColumns()) {
throw new UserException("Flexible partial update can only support table without variant columns.");
}
}

public boolean getEnableUniqueKeyMergeOnWrite() {
if (tableProperty == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ private void convertOffset(KafkaDataSourceProperties dataSourceProperties) throw

private void modifyPropertiesInternal(Map<String, String> jobProperties,
KafkaDataSourceProperties dataSourceProperties)
throws DdlException {
throws UserException {
if (null != dataSourceProperties) {
List<Pair<Integer, Long>> kafkaPartitionOffsets = Lists.newArrayList();
Map<String, String> customKafkaProperties = Maps.newHashMap();
Expand Down Expand Up @@ -830,7 +830,7 @@ private void resetCloudProgress(Cloud.ResetRLProgressRequest.Builder builder) th
public void replayModifyProperties(AlterRoutineLoadJobOperationLog log) {
try {
modifyPropertiesInternal(log.getJobProperties(), (KafkaDataSourceProperties) log.getDataSourceProperties());
} catch (DdlException e) {
} catch (UserException e) {
// should not happen
LOG.error("failed to replay modify kafka routine load job: {}", id, e);
}
Expand Down Expand Up @@ -974,6 +974,6 @@ public NereidsRoutineLoadTaskInfo toNereidsRoutineLoadTaskInfo() throws UserExce
return new NereidsRoutineLoadTaskInfo(execMemLimit, new HashMap<>(jobProperties), maxBatchIntervalS,
partitionNamesInfo, mergeType, deleteCondition, sequenceCol, maxFilterRatio, importColumnDescs,
precedingFilter, whereExpr, columnSeparator, lineDelimiter, enclose, escape, sendBatchParallelism,
loadToSingleTablet, isPartialUpdate, partialUpdateNewKeyPolicy, memtableOnSinkNode);
loadToSingleTablet, uniqueKeyUpdateMode, partialUpdateNewKeyPolicy, memtableOnSinkNode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
import org.apache.doris.thrift.TPipelineFragmentParams;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.thrift.TUniqueKeyUpdateMode;
import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
import org.apache.doris.transaction.TransactionException;
import org.apache.doris.transaction.TransactionState;
Expand Down Expand Up @@ -225,6 +226,7 @@ public boolean isFinalState() {

protected boolean isPartialUpdate = false;
protected TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND;
protected TUniqueKeyUpdateMode uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPSERT;

protected String sequenceCol;

Expand Down Expand Up @@ -387,11 +389,14 @@ protected void setOptional(CreateRoutineLoadInfo info) throws UserException {
jobProperties.put(info.STRICT_MODE, String.valueOf(info.isStrictMode()));
jobProperties.put(info.SEND_BATCH_PARALLELISM, String.valueOf(this.sendBatchParallelism));
jobProperties.put(info.LOAD_TO_SINGLE_TABLET, String.valueOf(this.loadToSingleTablet));
jobProperties.put(info.PARTIAL_COLUMNS, info.isPartialUpdate() ? "true" : "false");
if (info.isPartialUpdate()) {
this.isPartialUpdate = true;
// Set unique key update mode
this.uniqueKeyUpdateMode = info.getUniqueKeyUpdateMode();
this.isPartialUpdate = (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS);
jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, uniqueKeyUpdateMode.name());
jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, isPartialUpdate ? "true" : "false");
if (uniqueKeyUpdateMode != TUniqueKeyUpdateMode.UPSERT) {
this.partialUpdateNewKeyPolicy = info.getPartialUpdateNewKeyPolicy();
jobProperties.put(info.PARTIAL_UPDATE_NEW_KEY_POLICY,
jobProperties.put(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY,
this.partialUpdateNewKeyPolicy == TPartialUpdateNewRowPolicy.ERROR ? "ERROR" : "APPEND");
}
jobProperties.put(info.MAX_FILTER_RATIO_PROPERTY, String.valueOf(maxFilterRatio));
Expand Down Expand Up @@ -570,6 +575,10 @@ public LoadTask.MergeType getMergeType() {
return mergeType;
}

public TUniqueKeyUpdateMode getUniqueKeyUpdateMode() {
return uniqueKeyUpdateMode;
}

@Override
public Expr getDeleteCondition() {
return deleteCondition;
Expand Down Expand Up @@ -1059,7 +1068,8 @@ public TPipelineFragmentParams plan(NereidsStreamLoadPlanner planner, TUniqueId
throw new UserException("txn does not exist: " + txnId);
}
txnState.addTableIndexes(planner.getDestTable());
if (isPartialUpdate) {
if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS
|| uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS) {
txnState.setSchemaForPartialUpdate((OlapTable) table);
}

Expand Down Expand Up @@ -1932,9 +1942,29 @@ public void gsonPostProcess() throws IOException {
if (tableId == 0) {
isMultiTable = true;
}
// Process UNIQUE_KEY_UPDATE_MODE first to ensure correct backward compatibility
// with PARTIAL_COLUMNS (HashMap iteration order is not guaranteed)
if (jobProperties.containsKey(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)) {
String modeValue = jobProperties.get(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE);
TUniqueKeyUpdateMode mode = CreateRoutineLoadInfo.parseUniqueKeyUpdateMode(modeValue);
if (mode != null) {
uniqueKeyUpdateMode = mode;
isPartialUpdate = (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS);
} else {
uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPSERT;
}
}
// Process remaining properties
jobProperties.forEach((k, v) -> {
if (k.equals(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) {
isPartialUpdate = Boolean.parseBoolean(v);
// Backward compatibility: only use partial_columns if unique_key_update_mode is not set
// unique_key_update_mode takes precedence
if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPSERT) {
isPartialUpdate = Boolean.parseBoolean(v);
if (isPartialUpdate) {
uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
}
}
} else if (k.equals(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY)) {
if ("ERROR".equalsIgnoreCase(v)) {
partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.ERROR;
Expand Down Expand Up @@ -1979,7 +2009,7 @@ public void gsonPostProcess() throws IOException {
public abstract NereidsRoutineLoadTaskInfo toNereidsRoutineLoadTaskInfo() throws UserException;

// for ALTER ROUTINE LOAD
protected void modifyCommonJobProperties(Map<String, String> jobProperties) {
protected void modifyCommonJobProperties(Map<String, String> jobProperties) throws UserException {
if (jobProperties.containsKey(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY)) {
this.desireTaskConcurrentNum = Integer.parseInt(
jobProperties.remove(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY));
Expand Down Expand Up @@ -2010,5 +2040,79 @@ protected void modifyCommonJobProperties(Map<String, String> jobProperties) {
this.maxBatchSizeBytes = Long.parseLong(
jobProperties.remove(CreateRoutineLoadInfo.MAX_BATCH_SIZE_PROPERTY));
}

if (jobProperties.containsKey(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)) {
String modeStr = jobProperties.remove(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE);
TUniqueKeyUpdateMode newMode = CreateRoutineLoadInfo.parseAndValidateUniqueKeyUpdateMode(modeStr);
// Validate flexible partial update constraints when changing to UPDATE_FLEXIBLE_COLUMNS
if (newMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS) {
validateFlexiblePartialUpdateForAlter();
}
this.uniqueKeyUpdateMode = newMode;
this.isPartialUpdate = (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS);
this.jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, uniqueKeyUpdateMode.name());
this.jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, String.valueOf(isPartialUpdate));
}

if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) {
// Backward compatibility: only use partial_columns if unique_key_update_mode is UPSERT (not explicitly set)
// unique_key_update_mode takes precedence
this.isPartialUpdate = Boolean.parseBoolean(
jobProperties.remove(CreateRoutineLoadInfo.PARTIAL_COLUMNS));
if (this.isPartialUpdate && uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPSERT) {
this.uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
Copy link

Copilot AI Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When PARTIAL_COLUMNS is set to false but uniqueKeyUpdateMode is already UPDATE_FIXED_COLUMNS (e.g., from a previous ALTER), this code doesn't reset uniqueKeyUpdateMode back to UPSERT. This could leave the job in an inconsistent state. Add: 'else if (!this.isPartialUpdate && uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) { this.uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPSERT; }'

Suggested change
this.uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
this.uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
} else if (!this.isPartialUpdate && uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) {
this.uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPSERT;

Copilot uses AI. Check for mistakes.
}
this.jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, String.valueOf(isPartialUpdate));
this.jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, uniqueKeyUpdateMode.name());
}
}

/**
* Validate flexible partial update constraints when altering routine load job.
*/
private void validateFlexiblePartialUpdateForAlter() throws UserException {
// Multi-table load does not support flexible partial update
if (isMultiTable) {
throw new DdlException("Flexible partial update is not supported in multi-table load");
}

// Get the table to check table-level constraints
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
throw new DdlException("Database not found: " + dbId);
}
Table table = db.getTableNullable(tableId);
if (table == null) {
throw new DdlException("Table not found: " + tableId);
}
if (!(table instanceof OlapTable)) {
throw new DdlException("Flexible partial update is only supported for OLAP tables");
}
OlapTable olapTable = (OlapTable) table;

// Validate table-level constraints (MoW, skip_bitmap, light_schema_change, variant columns)
olapTable.validateForFlexiblePartialUpdate();

// Routine load specific validations
// Must use JSON format
String format = this.jobProperties.getOrDefault(FileFormatProperties.PROP_FORMAT, "csv");
if (!"json".equalsIgnoreCase(format)) {
throw new DdlException("Flexible partial update only supports JSON format, but current job uses: "
+ format);
}
// Cannot use fuzzy_parse
if (Boolean.parseBoolean(this.jobProperties.getOrDefault(
JsonFileFormatProperties.PROP_FUZZY_PARSE, "false"))) {
throw new DdlException("Flexible partial update does not support fuzzy_parse");
}
// Cannot use jsonpaths
String jsonPaths = getJsonPaths();
if (jsonPaths != null && !jsonPaths.isEmpty()) {
throw new DdlException("Flexible partial update does not support jsonpaths");
}
// Cannot specify COLUMNS mapping
if (columnDescs != null && !columnDescs.descs.isEmpty()) {
throw new DdlException("Flexible partial update does not support COLUMNS specification");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ public NereidsRoutineLoadTaskInfo(long execMemLimit, Map<String, String> jobProp
String sequenceCol, double maxFilterRatio, NereidsImportColumnDescs columnDescs,
Expression precedingFilter, Expression whereExpr, Separator columnSeparator,
Separator lineDelimiter, byte enclose, byte escape, int sendBatchParallelism,
boolean loadToSingleTablet, boolean isPartialUpdate, TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy,
boolean memtableOnSinkNode) {
boolean loadToSingleTablet, TUniqueKeyUpdateMode uniqueKeyUpdateMode,
TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, boolean memtableOnSinkNode) {
this.execMemLimit = execMemLimit;
this.jobProperties = jobProperties;
this.maxBatchIntervalS = maxBatchIntervalS;
Expand All @@ -99,9 +99,7 @@ public NereidsRoutineLoadTaskInfo(long execMemLimit, Map<String, String> jobProp
this.escape = escape;
this.sendBatchParallelism = sendBatchParallelism;
this.loadToSingleTablet = loadToSingleTablet;
if (isPartialUpdate) {
this.uniquekeyUpdateMode = TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
}
this.uniquekeyUpdateMode = uniqueKeyUpdateMode;
this.partialUpdateNewKeyPolicy = partialUpdateNewKeyPolicy;
this.memtableOnSinkNode = memtableOnSinkNode;
this.timeoutSec = calTimeoutSec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,20 +145,9 @@ public TPipelineFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde
}
}

if (uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS && !destTable.hasSkipBitmapColumn()) {
String tblName = destTable.getName();
throw new UserException("Flexible partial update can only support table with skip bitmap hidden column."
+ " But table " + tblName + " doesn't have it. You can use `ALTER TABLE " + tblName
+ " ENABLE FEATURE \"UPDATE_FLEXIBLE_COLUMNS\";` to add it to the table.");
}
if (uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS
&& !destTable.getEnableLightSchemaChange()) {
throw new UserException("Flexible partial update can only support table with light_schema_change enabled."
+ " But table " + destTable.getName() + "'s property light_schema_change is false");
}
if (uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS
&& destTable.hasVariantColumns()) {
throw new UserException("Flexible partial update can only support table without variant columns.");
if (uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS) {
// Validate table-level constraints for flexible partial update
destTable.validateForFlexiblePartialUpdate();
}
HashSet<String> partialUpdateInputColumns = new HashSet<>();
if (uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class AlterRoutineLoadCommand extends AlterCommand {
.add(CreateRoutineLoadInfo.MAX_BATCH_ROWS_PROPERTY)
.add(CreateRoutineLoadInfo.MAX_BATCH_SIZE_PROPERTY)
.add(CreateRoutineLoadInfo.PARTIAL_COLUMNS)
.add(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)
.add(CreateRoutineLoadInfo.STRICT_MODE)
.add(CreateRoutineLoadInfo.TIMEZONE)
.add(CreateRoutineLoadInfo.WORKLOAD_GROUP)
Expand Down Expand Up @@ -279,6 +280,13 @@ private void checkJobProperties() throws UserException {
String.valueOf(isPartialUpdate));
}

if (jobProperties.containsKey(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)) {
String modeStr = jobProperties.get(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE);
// Validate the mode string
CreateRoutineLoadInfo.parseAndValidateUniqueKeyUpdateMode(modeStr);
analyzedJobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, modeStr.toUpperCase());
}

if (jobProperties.containsKey(CreateRoutineLoadInfo.WORKLOAD_GROUP)) {
String workloadGroup = jobProperties.get(CreateRoutineLoadInfo.WORKLOAD_GROUP);
if (!StringUtil.isEmpty(workloadGroup)) {
Expand Down
Loading