diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index c8a33c7b7fdd5b..5d08d1c72cc0af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -2977,6 +2977,33 @@ public boolean getEnableUniqueKeySkipBitmap() { return hasSkipBitmapColumn(); } + /** + * Validate that the table supports flexible partial update. + * Checks the following constraints: + * 1. Must be MoW unique key table + * 2. Must have skip_bitmap column + * 3. Must have light_schema_change enabled + * 4. Cannot have variant columns + * @throws UserException if any constraint is not satisfied + */ + public void validateForFlexiblePartialUpdate() throws UserException { + if (!getEnableUniqueKeyMergeOnWrite()) { + throw new UserException("Flexible partial update is only supported in unique table MoW"); + } + if (!hasSkipBitmapColumn()) { + throw new UserException("Flexible partial update can only support table with skip bitmap hidden column." + + " But table " + getName() + " doesn't have it. You can use `ALTER TABLE " + getName() + + " ENABLE FEATURE \"UPDATE_FLEXIBLE_COLUMNS\";` to add it to the table."); + } + if (!getEnableLightSchemaChange()) { + throw new UserException("Flexible partial update can only support table with light_schema_change enabled." + + " But table " + getName() + "'s property light_schema_change is false"); + } + if (hasVariantColumns()) { + throw new UserException("Flexible partial update can only support table without variant columns."); + } + } + public boolean getEnableUniqueKeyMergeOnWrite() { if (tableProperty == null) { return false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 82cd83cfe6ed8e..7cd2a71d2f26c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -729,7 +729,7 @@ private void convertOffset(KafkaDataSourceProperties dataSourceProperties) throw private void modifyPropertiesInternal(Map jobProperties, KafkaDataSourceProperties dataSourceProperties) - throws DdlException { + throws UserException { if (null != dataSourceProperties) { List> kafkaPartitionOffsets = Lists.newArrayList(); Map customKafkaProperties = Maps.newHashMap(); @@ -830,7 +830,7 @@ private void resetCloudProgress(Cloud.ResetRLProgressRequest.Builder builder) th public void replayModifyProperties(AlterRoutineLoadJobOperationLog log) { try { modifyPropertiesInternal(log.getJobProperties(), (KafkaDataSourceProperties) log.getDataSourceProperties()); - } catch (DdlException e) { + } catch (UserException e) { // should not happen LOG.error("failed to replay modify kafka routine load job: {}", id, e); } @@ -974,6 +974,6 @@ public NereidsRoutineLoadTaskInfo toNereidsRoutineLoadTaskInfo() throws UserExce return new NereidsRoutineLoadTaskInfo(execMemLimit, new HashMap<>(jobProperties), maxBatchIntervalS, partitionNamesInfo, mergeType, deleteCondition, sequenceCol, maxFilterRatio, importColumnDescs, precedingFilter, whereExpr, columnSeparator, lineDelimiter, enclose, escape, sendBatchParallelism, - loadToSingleTablet, isPartialUpdate, partialUpdateNewKeyPolicy, memtableOnSinkNode); + loadToSingleTablet, uniqueKeyUpdateMode, partialUpdateNewKeyPolicy, memtableOnSinkNode); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 825cc2176dd079..309ef614c14af2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -71,6 +71,7 @@ import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; import org.apache.doris.thrift.TPipelineFragmentParams; import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.thrift.TUniqueKeyUpdateMode; import org.apache.doris.transaction.AbstractTxnStateChangeCallback; import org.apache.doris.transaction.TransactionException; import org.apache.doris.transaction.TransactionState; @@ -225,6 +226,7 @@ public boolean isFinalState() { protected boolean isPartialUpdate = false; protected TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND; + protected TUniqueKeyUpdateMode uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPSERT; protected String sequenceCol; @@ -387,11 +389,14 @@ protected void setOptional(CreateRoutineLoadInfo info) throws UserException { jobProperties.put(info.STRICT_MODE, String.valueOf(info.isStrictMode())); jobProperties.put(info.SEND_BATCH_PARALLELISM, String.valueOf(this.sendBatchParallelism)); jobProperties.put(info.LOAD_TO_SINGLE_TABLET, String.valueOf(this.loadToSingleTablet)); - jobProperties.put(info.PARTIAL_COLUMNS, info.isPartialUpdate() ? "true" : "false"); - if (info.isPartialUpdate()) { - this.isPartialUpdate = true; + // Set unique key update mode + this.uniqueKeyUpdateMode = info.getUniqueKeyUpdateMode(); + this.isPartialUpdate = (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS); + jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, uniqueKeyUpdateMode.name()); + jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, isPartialUpdate ? "true" : "false"); + if (uniqueKeyUpdateMode != TUniqueKeyUpdateMode.UPSERT) { this.partialUpdateNewKeyPolicy = info.getPartialUpdateNewKeyPolicy(); - jobProperties.put(info.PARTIAL_UPDATE_NEW_KEY_POLICY, + jobProperties.put(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY, this.partialUpdateNewKeyPolicy == TPartialUpdateNewRowPolicy.ERROR ? "ERROR" : "APPEND"); } jobProperties.put(info.MAX_FILTER_RATIO_PROPERTY, String.valueOf(maxFilterRatio)); @@ -570,6 +575,10 @@ public LoadTask.MergeType getMergeType() { return mergeType; } + public TUniqueKeyUpdateMode getUniqueKeyUpdateMode() { + return uniqueKeyUpdateMode; + } + @Override public Expr getDeleteCondition() { return deleteCondition; @@ -1059,7 +1068,8 @@ public TPipelineFragmentParams plan(NereidsStreamLoadPlanner planner, TUniqueId throw new UserException("txn does not exist: " + txnId); } txnState.addTableIndexes(planner.getDestTable()); - if (isPartialUpdate) { + if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS + || uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS) { txnState.setSchemaForPartialUpdate((OlapTable) table); } @@ -1932,9 +1942,29 @@ public void gsonPostProcess() throws IOException { if (tableId == 0) { isMultiTable = true; } + // Process UNIQUE_KEY_UPDATE_MODE first to ensure correct backward compatibility + // with PARTIAL_COLUMNS (HashMap iteration order is not guaranteed) + if (jobProperties.containsKey(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)) { + String modeValue = jobProperties.get(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE); + TUniqueKeyUpdateMode mode = CreateRoutineLoadInfo.parseUniqueKeyUpdateMode(modeValue); + if (mode != null) { + uniqueKeyUpdateMode = mode; + isPartialUpdate = (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS); + } else { + uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPSERT; + } + } + // Process remaining properties jobProperties.forEach((k, v) -> { if (k.equals(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) { - isPartialUpdate = Boolean.parseBoolean(v); + // Backward compatibility: only use partial_columns if unique_key_update_mode is not set + // unique_key_update_mode takes precedence + if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPSERT) { + isPartialUpdate = Boolean.parseBoolean(v); + if (isPartialUpdate) { + uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS; + } + } } else if (k.equals(CreateRoutineLoadInfo.PARTIAL_UPDATE_NEW_KEY_POLICY)) { if ("ERROR".equalsIgnoreCase(v)) { partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.ERROR; @@ -1979,7 +2009,7 @@ public void gsonPostProcess() throws IOException { public abstract NereidsRoutineLoadTaskInfo toNereidsRoutineLoadTaskInfo() throws UserException; // for ALTER ROUTINE LOAD - protected void modifyCommonJobProperties(Map jobProperties) { + protected void modifyCommonJobProperties(Map jobProperties) throws UserException { if (jobProperties.containsKey(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY)) { this.desireTaskConcurrentNum = Integer.parseInt( jobProperties.remove(CreateRoutineLoadInfo.DESIRED_CONCURRENT_NUMBER_PROPERTY)); @@ -2010,5 +2040,79 @@ protected void modifyCommonJobProperties(Map jobProperties) { this.maxBatchSizeBytes = Long.parseLong( jobProperties.remove(CreateRoutineLoadInfo.MAX_BATCH_SIZE_PROPERTY)); } + + if (jobProperties.containsKey(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)) { + String modeStr = jobProperties.remove(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE); + TUniqueKeyUpdateMode newMode = CreateRoutineLoadInfo.parseAndValidateUniqueKeyUpdateMode(modeStr); + // Validate flexible partial update constraints when changing to UPDATE_FLEXIBLE_COLUMNS + if (newMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS) { + validateFlexiblePartialUpdateForAlter(); + } + this.uniqueKeyUpdateMode = newMode; + this.isPartialUpdate = (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS); + this.jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, uniqueKeyUpdateMode.name()); + this.jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, String.valueOf(isPartialUpdate)); + } + + if (jobProperties.containsKey(CreateRoutineLoadInfo.PARTIAL_COLUMNS)) { + // Backward compatibility: only use partial_columns if unique_key_update_mode is UPSERT (not explicitly set) + // unique_key_update_mode takes precedence + this.isPartialUpdate = Boolean.parseBoolean( + jobProperties.remove(CreateRoutineLoadInfo.PARTIAL_COLUMNS)); + if (this.isPartialUpdate && uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPSERT) { + this.uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS; + } + this.jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, String.valueOf(isPartialUpdate)); + this.jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, uniqueKeyUpdateMode.name()); + } + } + + /** + * Validate flexible partial update constraints when altering routine load job. + */ + private void validateFlexiblePartialUpdateForAlter() throws UserException { + // Multi-table load does not support flexible partial update + if (isMultiTable) { + throw new DdlException("Flexible partial update is not supported in multi-table load"); + } + + // Get the table to check table-level constraints + Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); + if (db == null) { + throw new DdlException("Database not found: " + dbId); + } + Table table = db.getTableNullable(tableId); + if (table == null) { + throw new DdlException("Table not found: " + tableId); + } + if (!(table instanceof OlapTable)) { + throw new DdlException("Flexible partial update is only supported for OLAP tables"); + } + OlapTable olapTable = (OlapTable) table; + + // Validate table-level constraints (MoW, skip_bitmap, light_schema_change, variant columns) + olapTable.validateForFlexiblePartialUpdate(); + + // Routine load specific validations + // Must use JSON format + String format = this.jobProperties.getOrDefault(FileFormatProperties.PROP_FORMAT, "csv"); + if (!"json".equalsIgnoreCase(format)) { + throw new DdlException("Flexible partial update only supports JSON format, but current job uses: " + + format); + } + // Cannot use fuzzy_parse + if (Boolean.parseBoolean(this.jobProperties.getOrDefault( + JsonFileFormatProperties.PROP_FUZZY_PARSE, "false"))) { + throw new DdlException("Flexible partial update does not support fuzzy_parse"); + } + // Cannot use jsonpaths + String jsonPaths = getJsonPaths(); + if (jsonPaths != null && !jsonPaths.isEmpty()) { + throw new DdlException("Flexible partial update does not support jsonpaths"); + } + // Cannot specify COLUMNS mapping + if (columnDescs != null && !columnDescs.descs.isEmpty()) { + throw new DdlException("Flexible partial update does not support COLUMNS specification"); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java index 17a4d0a08e491d..7f8ff5b029e5f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsRoutineLoadTaskInfo.java @@ -80,8 +80,8 @@ public NereidsRoutineLoadTaskInfo(long execMemLimit, Map jobProp String sequenceCol, double maxFilterRatio, NereidsImportColumnDescs columnDescs, Expression precedingFilter, Expression whereExpr, Separator columnSeparator, Separator lineDelimiter, byte enclose, byte escape, int sendBatchParallelism, - boolean loadToSingleTablet, boolean isPartialUpdate, TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, - boolean memtableOnSinkNode) { + boolean loadToSingleTablet, TUniqueKeyUpdateMode uniqueKeyUpdateMode, + TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy, boolean memtableOnSinkNode) { this.execMemLimit = execMemLimit; this.jobProperties = jobProperties; this.maxBatchIntervalS = maxBatchIntervalS; @@ -99,9 +99,7 @@ public NereidsRoutineLoadTaskInfo(long execMemLimit, Map jobProp this.escape = escape; this.sendBatchParallelism = sendBatchParallelism; this.loadToSingleTablet = loadToSingleTablet; - if (isPartialUpdate) { - this.uniquekeyUpdateMode = TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS; - } + this.uniquekeyUpdateMode = uniqueKeyUpdateMode; this.partialUpdateNewKeyPolicy = partialUpdateNewKeyPolicy; this.memtableOnSinkNode = memtableOnSinkNode; this.timeoutSec = calTimeoutSec(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java index abb03414b07117..d4cc7c01afe3f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java @@ -145,20 +145,9 @@ public TPipelineFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde } } - if (uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS && !destTable.hasSkipBitmapColumn()) { - String tblName = destTable.getName(); - throw new UserException("Flexible partial update can only support table with skip bitmap hidden column." - + " But table " + tblName + " doesn't have it. You can use `ALTER TABLE " + tblName - + " ENABLE FEATURE \"UPDATE_FLEXIBLE_COLUMNS\";` to add it to the table."); - } - if (uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS - && !destTable.getEnableLightSchemaChange()) { - throw new UserException("Flexible partial update can only support table with light_schema_change enabled." - + " But table " + destTable.getName() + "'s property light_schema_change is false"); - } - if (uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS - && destTable.hasVariantColumns()) { - throw new UserException("Flexible partial update can only support table without variant columns."); + if (uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS) { + // Validate table-level constraints for flexible partial update + destTable.validateForFlexiblePartialUpdate(); } HashSet partialUpdateInputColumns = new HashSet<>(); if (uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java index 1512fd32611318..367480c5d934d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterRoutineLoadCommand.java @@ -70,6 +70,7 @@ public class AlterRoutineLoadCommand extends AlterCommand { .add(CreateRoutineLoadInfo.MAX_BATCH_ROWS_PROPERTY) .add(CreateRoutineLoadInfo.MAX_BATCH_SIZE_PROPERTY) .add(CreateRoutineLoadInfo.PARTIAL_COLUMNS) + .add(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE) .add(CreateRoutineLoadInfo.STRICT_MODE) .add(CreateRoutineLoadInfo.TIMEZONE) .add(CreateRoutineLoadInfo.WORKLOAD_GROUP) @@ -279,6 +280,13 @@ private void checkJobProperties() throws UserException { String.valueOf(isPartialUpdate)); } + if (jobProperties.containsKey(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)) { + String modeStr = jobProperties.get(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE); + // Validate the mode string + CreateRoutineLoadInfo.parseAndValidateUniqueKeyUpdateMode(modeStr); + analyzedJobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, modeStr.toUpperCase()); + } + if (jobProperties.containsKey(CreateRoutineLoadInfo.WORKLOAD_GROUP)) { String workloadGroup = jobProperties.get(CreateRoutineLoadInfo.WORKLOAD_GROUP); if (!StringUtil.isEmpty(workloadGroup)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java index 5f23ff427117fb..4a32cb3d4e3934 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java @@ -57,6 +57,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.resource.workloadgroup.WorkloadGroup; import org.apache.doris.thrift.TPartialUpdateNewRowPolicy; +import org.apache.doris.thrift.TUniqueKeyUpdateMode; import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; @@ -90,6 +91,7 @@ public class CreateRoutineLoadInfo { public static final String PARTIAL_COLUMNS = "partial_columns"; public static final String PARTIAL_UPDATE_NEW_KEY_POLICY = "partial_update_new_key_behavior"; + public static final String UNIQUE_KEY_UPDATE_MODE = "unique_key_update_mode"; public static final String WORKLOAD_GROUP = "workload_group"; public static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]"; public static final String SEND_BATCH_PARALLELISM = "send_batch_parallelism"; @@ -125,6 +127,7 @@ public class CreateRoutineLoadInfo { .add(LOAD_TO_SINGLE_TABLET) .add(PARTIAL_COLUMNS) .add(PARTIAL_UPDATE_NEW_KEY_POLICY) + .add(UNIQUE_KEY_UPDATE_MODE) .add(WORKLOAD_GROUP) .add(FileFormatProperties.PROP_FORMAT) .add(JsonFileFormatProperties.PROP_JSON_PATHS) @@ -170,6 +173,7 @@ public class CreateRoutineLoadInfo { */ private boolean isPartialUpdate = false; private TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND; + private TUniqueKeyUpdateMode uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPSERT; private String comment = ""; @@ -198,8 +202,27 @@ public CreateRoutineLoadInfo(LabelNameInfo labelNameInfo, String tableName, this.dataSourceProperties = RoutineLoadDataSourcePropertyFactory .createDataSource(typeName, dataSourceProperties, this.isMultiTable); this.mergeType = mergeType; - this.isPartialUpdate = this.jobProperties.getOrDefault(PARTIAL_COLUMNS, "false").equalsIgnoreCase("true"); - if (this.isPartialUpdate && this.jobProperties.containsKey(PARTIAL_UPDATE_NEW_KEY_POLICY)) { + // Parse unique_key_update_mode first (takes precedence) + if (this.jobProperties.containsKey(UNIQUE_KEY_UPDATE_MODE)) { + String modeStr = this.jobProperties.get(UNIQUE_KEY_UPDATE_MODE); + TUniqueKeyUpdateMode mode = parseUniqueKeyUpdateMode(modeStr); + if (mode != null) { + this.uniqueKeyUpdateMode = mode; + if (mode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) { + this.isPartialUpdate = true; + } + } + // validation will be done in checkJobProperties() if mode is null + } else { + // Backward compatibility: partial_columns=true maps to UPDATE_FIXED_COLUMNS + this.isPartialUpdate = this.jobProperties.getOrDefault(PARTIAL_COLUMNS, "false").equalsIgnoreCase("true"); + if (this.isPartialUpdate) { + this.uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS; + } + } + // Parse partial_update_new_key_behavior + if ((this.isPartialUpdate || this.uniqueKeyUpdateMode != TUniqueKeyUpdateMode.UPSERT) + && this.jobProperties.containsKey(PARTIAL_UPDATE_NEW_KEY_POLICY)) { String policyStr = this.jobProperties.get(PARTIAL_UPDATE_NEW_KEY_POLICY).toUpperCase(); if ("APPEND".equals(policyStr)) { this.partialUpdateNewKeyPolicy = TPartialUpdateNewRowPolicy.APPEND; @@ -213,6 +236,40 @@ public CreateRoutineLoadInfo(LabelNameInfo labelNameInfo, String tableName, } } + /** + * Parse unique_key_update_mode string to TUniqueKeyUpdateMode enum. + * Returns null if the mode string is invalid. + */ + public static TUniqueKeyUpdateMode parseUniqueKeyUpdateMode(String modeStr) { + if (modeStr == null) { + return null; + } + switch (modeStr.toUpperCase()) { + case "UPSERT": + return TUniqueKeyUpdateMode.UPSERT; + case "UPDATE_FIXED_COLUMNS": + return TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS; + case "UPDATE_FLEXIBLE_COLUMNS": + return TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS; + default: + return null; + } + } + + /** + * Validate unique_key_update_mode string value. + * @throws AnalysisException if the mode string is invalid + */ + public static TUniqueKeyUpdateMode parseAndValidateUniqueKeyUpdateMode(String modeStr) throws AnalysisException { + TUniqueKeyUpdateMode mode = parseUniqueKeyUpdateMode(modeStr); + if (mode == null) { + throw new AnalysisException(UNIQUE_KEY_UPDATE_MODE + + " should be one of {'UPSERT', 'UPDATE_FIXED_COLUMNS', 'UPDATE_FLEXIBLE_COLUMNS'}, but found " + + modeStr); + } + return mode; + } + public String getName() { return name; } @@ -293,6 +350,10 @@ public TPartialUpdateNewRowPolicy getPartialUpdateNewKeyPolicy() { return partialUpdateNewKeyPolicy; } + public TUniqueKeyUpdateMode getUniqueKeyUpdateMode() { + return uniqueKeyUpdateMode; + } + public String getComment() { return comment; } @@ -357,7 +418,7 @@ private void checkDBTable(ConnectContext ctx) throws AnalysisException { name = labelNameInfo.getLabel(); Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName); - if (isPartialUpdate && isMultiTable) { + if ((isPartialUpdate || uniqueKeyUpdateMode != TUniqueKeyUpdateMode.UPSERT) && isMultiTable) { throw new AnalysisException("Partial update is not supported in multi-table load."); } if (isMultiTable) { @@ -379,6 +440,39 @@ private void checkDBTable(ConnectContext ctx) throws AnalysisException { if (isPartialUpdate && !((OlapTable) table).getEnableUniqueKeyMergeOnWrite()) { throw new AnalysisException("load by PARTIAL_COLUMNS is only supported in unique table MoW"); } + // Validate flexible partial update constraints + if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS) { + validateFlexiblePartialUpdate((OlapTable) table); + } + } + + private void validateFlexiblePartialUpdate(OlapTable table) throws AnalysisException { + // Validate table-level constraints (MoW, skip_bitmap, light_schema_change, variant columns) + try { + table.validateForFlexiblePartialUpdate(); + } catch (UserException e) { + throw new AnalysisException(e.getMessage(), e); + } + // Routine load specific validations + // Must use JSON format + String format = jobProperties.getOrDefault(FileFormatProperties.PROP_FORMAT, "csv"); + if (!"json".equalsIgnoreCase(format)) { + throw new AnalysisException("Flexible partial update only supports JSON format, but found: " + format); + } + // Cannot use fuzzy_parse + if (Boolean.parseBoolean(jobProperties.getOrDefault(JsonFileFormatProperties.PROP_FUZZY_PARSE, "false"))) { + throw new AnalysisException("Flexible partial update does not support fuzzy_parse"); + } + // Cannot use jsonpaths + String jsonPaths = jobProperties.get(JsonFileFormatProperties.PROP_JSON_PATHS); + if (jsonPaths != null && !jsonPaths.isEmpty()) { + throw new AnalysisException("Flexible partial update does not support jsonpaths"); + } + // Cannot specify COLUMNS mapping + if (loadPropertyMap != null && loadPropertyMap.values().stream() + .anyMatch(p -> p instanceof LoadColumnClause)) { + throw new AnalysisException("Flexible partial update does not support COLUMNS specification"); + } } /** @@ -532,11 +626,18 @@ public void checkJobProperties() throws UserException { } timezone = TimeUtils.checkTimeZoneValidAndStandardize(jobProperties.getOrDefault(TIMEZONE, timezone)); + // check unique_key_update_mode + // Note: unique_key_update_mode takes precedence over partial_columns for backward compatibility + if (jobProperties.containsKey(UNIQUE_KEY_UPDATE_MODE)) { + String modeStr = jobProperties.get(UNIQUE_KEY_UPDATE_MODE); + parseAndValidateUniqueKeyUpdateMode(modeStr); + } + // check partial_update_new_key_behavior if (jobProperties.containsKey(PARTIAL_UPDATE_NEW_KEY_POLICY)) { - if (!isPartialUpdate) { + if (!isPartialUpdate && uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPSERT) { throw new AnalysisException( - PARTIAL_UPDATE_NEW_KEY_POLICY + " can only be set when partial_columns is true"); + PARTIAL_UPDATE_NEW_KEY_POLICY + " can only be set when partial update is enabled"); } String policy = jobProperties.get(PARTIAL_UPDATE_NEW_KEY_POLICY).toUpperCase(); if (!"APPEND".equals(policy) && !"ERROR".equals(policy)) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java index 8b02ad7076a196..d22464064282c4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java @@ -28,7 +28,9 @@ import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.kafka.KafkaUtil; +import org.apache.doris.nereids.trees.plans.commands.info.CreateRoutineLoadInfo; import org.apache.doris.thrift.TKafkaRLTaskProgress; +import org.apache.doris.thrift.TUniqueKeyUpdateMode; import org.apache.doris.transaction.TransactionException; import org.apache.doris.transaction.TransactionState; @@ -368,4 +370,127 @@ public void testGetShowCreateInfo() throws UserException { Assert.assertEquals(expect, showCreateInfo); } + @Test + public void testParseUniqueKeyUpdateMode() { + // Test valid mode strings + Assert.assertEquals(TUniqueKeyUpdateMode.UPSERT, + CreateRoutineLoadInfo.parseUniqueKeyUpdateMode("UPSERT")); + Assert.assertEquals(TUniqueKeyUpdateMode.UPSERT, + CreateRoutineLoadInfo.parseUniqueKeyUpdateMode("upsert")); + Assert.assertEquals(TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS, + CreateRoutineLoadInfo.parseUniqueKeyUpdateMode("UPDATE_FIXED_COLUMNS")); + Assert.assertEquals(TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS, + CreateRoutineLoadInfo.parseUniqueKeyUpdateMode("update_fixed_columns")); + Assert.assertEquals(TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS, + CreateRoutineLoadInfo.parseUniqueKeyUpdateMode("UPDATE_FLEXIBLE_COLUMNS")); + Assert.assertEquals(TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS, + CreateRoutineLoadInfo.parseUniqueKeyUpdateMode("Update_Flexible_Columns")); + + // Test invalid mode strings + Assert.assertNull(CreateRoutineLoadInfo.parseUniqueKeyUpdateMode(null)); + Assert.assertNull(CreateRoutineLoadInfo.parseUniqueKeyUpdateMode("INVALID")); + Assert.assertNull(CreateRoutineLoadInfo.parseUniqueKeyUpdateMode("")); + Assert.assertNull(CreateRoutineLoadInfo.parseUniqueKeyUpdateMode("PARTIAL_UPDATE")); + } + + @Test + public void testParseAndValidateUniqueKeyUpdateMode() throws Exception { + // Test valid mode strings + Assert.assertEquals(TUniqueKeyUpdateMode.UPSERT, + CreateRoutineLoadInfo.parseAndValidateUniqueKeyUpdateMode("UPSERT")); + Assert.assertEquals(TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS, + CreateRoutineLoadInfo.parseAndValidateUniqueKeyUpdateMode("UPDATE_FIXED_COLUMNS")); + Assert.assertEquals(TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS, + CreateRoutineLoadInfo.parseAndValidateUniqueKeyUpdateMode("UPDATE_FLEXIBLE_COLUMNS")); + + // Test invalid mode string throws exception + try { + CreateRoutineLoadInfo.parseAndValidateUniqueKeyUpdateMode("INVALID_MODE"); + Assert.fail("Expected AnalysisException"); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("unique_key_update_mode")); + Assert.assertTrue(e.getMessage().contains("INVALID_MODE")); + } + } + + @Test + public void testUniqueKeyUpdateModeInJobProperties() { + // Test that uniqueKeyUpdateMode is properly stored in jobProperties + KafkaRoutineLoadJob job = new KafkaRoutineLoadJob(); + Map jobProperties = Maps.newHashMap(); + jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, "UPDATE_FLEXIBLE_COLUMNS"); + Deencapsulation.setField(job, "jobProperties", jobProperties); + Deencapsulation.setField(job, "uniqueKeyUpdateMode", TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS); + + Assert.assertEquals(TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS, job.getUniqueKeyUpdateMode()); + } + + @Test + public void testBackwardCompatibilityPartialColumnsToUniqueKeyUpdateMode() throws Exception { + // Test backward compatibility: partial_columns=true should map to UPDATE_FIXED_COLUMNS + // This tests the logic in gsonPostProcess without calling the full method + KafkaRoutineLoadJob job = new KafkaRoutineLoadJob(); + Map jobProperties = Maps.newHashMap(); + jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, "true"); + // Note: UNIQUE_KEY_UPDATE_MODE is NOT set - testing backward compatibility + Deencapsulation.setField(job, "jobProperties", jobProperties); + Deencapsulation.setField(job, "uniqueKeyUpdateMode", TUniqueKeyUpdateMode.UPSERT); + + // Simulate the backward compatibility logic from gsonPostProcess + TUniqueKeyUpdateMode uniqueKeyUpdateMode = Deencapsulation.getField(job, "uniqueKeyUpdateMode"); + boolean isPartialUpdate = false; + + // Process PARTIAL_COLUMNS when UNIQUE_KEY_UPDATE_MODE is not set + if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPSERT) { + String partialColumnsValue = jobProperties.get(CreateRoutineLoadInfo.PARTIAL_COLUMNS); + isPartialUpdate = Boolean.parseBoolean(partialColumnsValue); + if (isPartialUpdate) { + uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS; + } + } + + // Verify the backward compatibility logic + Assert.assertEquals(TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS, uniqueKeyUpdateMode); + Assert.assertTrue(isPartialUpdate); + } + + @Test + public void testUniqueKeyUpdateModeTakesPrecedenceOverPartialColumns() throws Exception { + // Test that unique_key_update_mode takes precedence over partial_columns + // This tests the logic in gsonPostProcess without calling the full method + KafkaRoutineLoadJob job = new KafkaRoutineLoadJob(); + Map jobProperties = Maps.newHashMap(); + jobProperties.put(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE, "UPDATE_FLEXIBLE_COLUMNS"); + jobProperties.put(CreateRoutineLoadInfo.PARTIAL_COLUMNS, "true"); + Deencapsulation.setField(job, "jobProperties", jobProperties); + + // Simulate the precedence logic from gsonPostProcess + TUniqueKeyUpdateMode uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPSERT; + boolean isPartialUpdate = false; + + // Process UNIQUE_KEY_UPDATE_MODE first (takes precedence) + if (jobProperties.containsKey(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE)) { + String modeValue = jobProperties.get(CreateRoutineLoadInfo.UNIQUE_KEY_UPDATE_MODE); + TUniqueKeyUpdateMode mode = CreateRoutineLoadInfo.parseUniqueKeyUpdateMode(modeValue); + if (mode != null) { + uniqueKeyUpdateMode = mode; + isPartialUpdate = (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS); + } + } + + // Process PARTIAL_COLUMNS only if UNIQUE_KEY_UPDATE_MODE results in UPSERT + if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPSERT) { + String partialColumnsValue = jobProperties.get(CreateRoutineLoadInfo.PARTIAL_COLUMNS); + isPartialUpdate = Boolean.parseBoolean(partialColumnsValue); + if (isPartialUpdate) { + uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS; + } + } + + // unique_key_update_mode should take precedence + Assert.assertEquals(TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS, uniqueKeyUpdateMode); + // isPartialUpdate should be false for UPDATE_FLEXIBLE_COLUMNS + Assert.assertFalse(isPartialUpdate); + } + } diff --git a/regression-test/data/load_p0/routine_load/test_routine_load_flexible_partial_update.out b/regression-test/data/load_p0/routine_load/test_routine_load_flexible_partial_update.out new file mode 100644 index 00000000000000..0d4f162d308119 --- /dev/null +++ b/regression-test/data/load_p0/routine_load/test_routine_load_flexible_partial_update.out @@ -0,0 +1,82 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_initial1 -- +1 alice 100 20 +2 bob 90 21 +3 charlie 80 22 +4 david 70 23 +5 eve 60 24 + +-- !select_after_flex_update1 -- +1 alice 150 20 +2 bob 90 30 +3 chuck 95 22 +4 david 70 23 +5 eve 60 24 +6 frank \N \N + +-- !select_initial2 -- +1 10 20 30 40 +2 100 200 300 400 +3 1000 2000 3000 4000 + +-- !select_after_flex_update2 -- +1 11 20 30 40 +2 100 222 333 400 +3 1000 2000 3000 4000 +4 \N 9876 4444 1234 + +-- !select_initial7 -- +1 alice 100 20 +2 bob 90 21 +3 charlie 80 22 + +-- !select_after_flex_where -- +1 alice 100 20 +2 bob 95 21 +3 chuck 80 22 +4 diana 70 \N + +-- !select_initial11 -- +1 alice 100 20 +2 bob 90 21 +3 charlie 80 22 + +-- !select_after_fixed_update -- +1 alice 150 20 +2 bob 95 21 +3 charlie 80 22 +4 \N 85 \N + +-- !select_initial12 -- +1 alice 100 20 +2 bob 90 21 +3 charlie 80 22 + +-- !select_after_alter_flex -- +1 alice 200 20 +2 bob 90 35 +3 charlie 80 22 +4 diana \N \N + +-- !select_initial18 -- +1 alice 100 20 +2 bob 90 21 + +-- !select_after_alter_flex_where -- +1 alice 100 20 +2 bob 95 21 +3 charlie 80 \N + +-- !select_initial20 -- +1 alice 100 20 +2 bob 90 21 + +-- !select_after_alter_upsert -- +1 \N 200 \N +2 bob 90 21 +3 charlie 80 22 + +-- !select_initial21 -- +1 alice 100 20 +2 bob 90 21 + diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy index 9a5e27d26805be..49711089ab850b 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy @@ -107,6 +107,18 @@ class RoutineLoadTestUtils { } static int waitForTaskFinish(Closure sqlRunner, String job, String tableName, int expectedMinRows = 0, int maxAttempts = 60) { + return waitForTaskFinishInternal(sqlRunner, job, tableName, expectedMinRows, maxAttempts, false) + } + + /** + * Wait for routine load task to finish for MOW (Merge-on-Write) unique key tables. + * Uses skip_delete_bitmap=true to properly count rows during partial update operations. + */ + static int waitForTaskFinishMoW(Closure sqlRunner, String job, String tableName, int expectedMinRows = 0, int maxAttempts = 60) { + return waitForTaskFinishInternal(sqlRunner, job, tableName, expectedMinRows, maxAttempts, true) + } + + private static int waitForTaskFinishInternal(Closure sqlRunner, String job, String tableName, int expectedMinRows, int maxAttempts, boolean isMoW) { def count = 0 while (true) { def res = sqlRunner.call("show routine load for ${job}") @@ -114,7 +126,18 @@ class RoutineLoadTestUtils { def statistic = res[0][14].toString() logger.info("Routine load state: ${routineLoadState}") logger.info("Routine load statistic: ${statistic}") - def rowCount = sqlRunner.call("select count(*) from ${tableName}") + def rowCount + if (isMoW) { + // For MOW tables, use skip_delete_bitmap to properly count rows + sqlRunner.call("set skip_delete_bitmap=true") + sqlRunner.call("set skip_delete_sign=true") + sqlRunner.call("sync") + rowCount = sqlRunner.call("select count(*) from ${tableName}") + sqlRunner.call("set skip_delete_bitmap=false") + sqlRunner.call("set skip_delete_sign=false") + } else { + rowCount = sqlRunner.call("select count(*) from ${tableName}") + } if (routineLoadState == "RUNNING" && rowCount[0][0] > expectedMinRows) { break } diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy new file mode 100644 index 00000000000000..e3f2747e108af8 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_flexible_partial_update.groovy @@ -0,0 +1,1377 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.util.RoutineLoadTestUtils +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord + +suite("test_routine_load_flexible_partial_update", "nonConcurrent") { + + if (RoutineLoadTestUtils.isKafkaTestEnabled(context)) { + def runSql = { String q -> sql q } + def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context) + def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker) + + // Test 1: Basic flexible partial update + def kafkaJsonTopic1 = "test_routine_load_flexible_partial_update_basic" + def tableName1 = "test_routine_load_flex_update_basic" + def job1 = "test_flex_partial_update_job_basic" + + sql """ DROP TABLE IF EXISTS ${tableName1} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName1} ( + `id` int NOT NULL, + `name` varchar(65533) NULL, + `score` int NULL, + `age` int NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + COMMENT 'test flexible partial update' + DISTRIBUTED BY HASH(`id`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true", + "enable_unique_key_merge_on_write" = "true", + "light_schema_change" = "true", + "enable_unique_key_skip_bitmap_column" = "true" + ); + """ + + // verify skip bitmap column is enabled + def show_res = sql "show create table ${tableName1}" + assertTrue(show_res.toString().contains('"enable_unique_key_skip_bitmap_column" = "true"')) + + // insert initial data + sql """ + INSERT INTO ${tableName1} VALUES + (1, 'alice', 100, 20), + (2, 'bob', 90, 21), + (3, 'charlie', 80, 22), + (4, 'david', 70, 23), + (5, 'eve', 60, 24) + """ + + qt_select_initial1 "SELECT id, name, score, age FROM ${tableName1} ORDER BY id" + + try { + // create routine load with flexible partial update + sql """ + CREATE ROUTINE LOAD ${job1} ON ${tableName1} + PROPERTIES + ( + "max_batch_interval" = "10", + "format" = "json", + "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaJsonTopic1}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + + // send JSON data with different columns per row + // Row 1: update only score for id=1 + // Row 2: update only age for id=2 + // Row 3: update both name and score for id=3 + // Row 4: insert new row with only id and name + def data = [ + '{"id": 1, "score": 150}', + '{"id": 2, "age": 30}', + '{"id": 3, "name": "chuck", "score": 95}', + '{"id": 6, "name": "frank"}' + ] + + data.each { line -> + logger.info("Sending to Kafka: ${line}") + def record = new ProducerRecord<>(kafkaJsonTopic1, null, line) + producer.send(record).get() + } + producer.flush() + + // wait for routine load task to finish + // With skip_delete_bitmap=true, count = initial + kafka_messages = 5 + 4 = 9 + RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job1, tableName1, 8) + + // verify flexible partial update results + qt_select_after_flex_update1 "SELECT id, name, score, age FROM ${tableName1} ORDER BY id" + } catch (Exception e) { + logger.error("Error during test: " + e.getMessage()) + throw e + } finally { + sql "STOP ROUTINE LOAD FOR ${job1}" + } + + // Test 2: Flexible partial update with default values + def kafkaJsonTopic2 = "test_routine_load_flexible_partial_update_default" + def tableName2 = "test_routine_load_flex_update_default" + def job2 = "test_flex_partial_update_job_default" + + sql """ DROP TABLE IF EXISTS ${tableName2} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName2} ( + `id` int NOT NULL, + `v1` bigint NULL, + `v2` bigint NULL DEFAULT "9876", + `v3` bigint NOT NULL, + `v4` bigint NOT NULL DEFAULT "1234" + ) ENGINE=OLAP + UNIQUE KEY(`id`) + COMMENT 'test flexible partial update with default values' + DISTRIBUTED BY HASH(`id`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true", + "enable_unique_key_merge_on_write" = "true", + "light_schema_change" = "true", + "enable_unique_key_skip_bitmap_column" = "true" + ); + """ + + // insert initial data + sql """ + INSERT INTO ${tableName2} VALUES + (1, 10, 20, 30, 40), + (2, 100, 200, 300, 400), + (3, 1000, 2000, 3000, 4000) + """ + + qt_select_initial2 "SELECT id, v1, v2, v3, v4 FROM ${tableName2} ORDER BY id" + + try { + sql """ + CREATE ROUTINE LOAD ${job2} ON ${tableName2} + PROPERTIES + ( + "max_batch_interval" = "10", + "format" = "json", + "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaJsonTopic2}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + + // send JSON data with different columns per row + def data2 = [ + '{"id": 1, "v1": 11}', + '{"id": 2, "v2": 222, "v3": 333}', + '{"id": 4, "v3": 4444}' + ] + + data2.each { line -> + logger.info("Sending to Kafka: ${line}") + def record = new ProducerRecord<>(kafkaJsonTopic2, null, line) + producer.send(record).get() + } + producer.flush() + + // With skip_delete_bitmap=true, count = initial + kafka_messages = 3 + 3 = 6 + RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job2, tableName2, 5) + + qt_select_after_flex_update2 "SELECT id, v1, v2, v3, v4 FROM ${tableName2} ORDER BY id" + } catch (Exception e) { + logger.error("Error during test: " + e.getMessage()) + throw e + } finally { + sql "STOP ROUTINE LOAD FOR ${job2}" + } + + // Test 3: Error case - CSV format not supported + def kafkaCsvTopic3 = "test_routine_load_flexible_partial_update_csv_error" + def tableName3 = "test_routine_load_flex_update_csv_error" + def job3 = "test_flex_partial_update_job_csv_error" + + sql """ DROP TABLE IF EXISTS ${tableName3} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName3} ( + `id` int NOT NULL, + `name` varchar(65533) NULL, + `score` int NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true", + "enable_unique_key_merge_on_write" = "true", + "light_schema_change" = "true", + "enable_unique_key_skip_bitmap_column" = "true" + ); + """ + + test { + sql """ + CREATE ROUTINE LOAD ${job3} ON ${tableName3} + COLUMNS TERMINATED BY "," + PROPERTIES + ( + "max_batch_interval" = "10", + "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaCsvTopic3}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + exception "Flexible partial update only supports JSON format" + } + + // Test 4: Error case - jsonpaths not supported with flexible partial update + def tableName4 = "test_routine_load_flex_update_jsonpaths_error" + def job4 = "test_flex_partial_update_job_jsonpaths_error" + + sql """ DROP TABLE IF EXISTS ${tableName4} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName4} ( + `id` int NOT NULL, + `name` varchar(65533) NULL, + `score` int NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true", + "enable_unique_key_merge_on_write" = "true", + "light_schema_change" = "true", + "enable_unique_key_skip_bitmap_column" = "true" + ); + """ + + test { + sql """ + CREATE ROUTINE LOAD ${job4} ON ${tableName4} + PROPERTIES + ( + "max_batch_interval" = "10", + "format" = "json", + "jsonpaths" = '[\"\$.id\", \"\$.name\", \"\$.score\"]', + "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "test_topic", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + exception "Flexible partial update does not support jsonpaths" + } + + // Test 5: Error case - fuzzy_parse not supported + def kafkaJsonTopic5 = "test_routine_load_flexible_partial_update_fuzzy_error" + def tableName5 = "test_routine_load_flex_update_fuzzy_error" + def job5 = "test_flex_partial_update_job_fuzzy_error" + + sql """ DROP TABLE IF EXISTS ${tableName5} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName5} ( + `id` int NOT NULL, + `name` varchar(65533) NULL, + `score` int NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true", + "enable_unique_key_merge_on_write" = "true", + "light_schema_change" = "true", + "enable_unique_key_skip_bitmap_column" = "true" + ); + """ + + test { + sql """ + CREATE ROUTINE LOAD ${job5} ON ${tableName5} + PROPERTIES + ( + "max_batch_interval" = "10", + "format" = "json", + "fuzzy_parse" = "true", + "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaJsonTopic5}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + exception "Flexible partial update does not support fuzzy_parse" + } + + // Test 6: Error case - COLUMNS clause not supported + def kafkaJsonTopic6 = "test_routine_load_flexible_partial_update_columns_error" + def tableName6 = "test_routine_load_flex_update_columns_error" + def job6 = "test_flex_partial_update_job_columns_error" + + sql """ DROP TABLE IF EXISTS ${tableName6} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName6} ( + `id` int NOT NULL, + `name` varchar(65533) NULL, + `score` int NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true", + "enable_unique_key_merge_on_write" = "true", + "light_schema_change" = "true", + "enable_unique_key_skip_bitmap_column" = "true" + ); + """ + + test { + sql """ + CREATE ROUTINE LOAD ${job6} ON ${tableName6} + COLUMNS (id, name, score) + PROPERTIES + ( + "max_batch_interval" = "10", + "format" = "json", + "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaJsonTopic6}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + exception "Flexible partial update does not support COLUMNS specification" + } + + // Test 7: Success case - WHERE clause works with flexible partial update + def kafkaJsonTopic7 = "test_routine_load_flexible_partial_update_where" + def tableName7 = "test_routine_load_flex_update_where" + def job7 = "test_flex_partial_update_job_where" + + sql """ DROP TABLE IF EXISTS ${tableName7} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName7} ( + `id` int NOT NULL, + `name` varchar(65533) NULL, + `score` int NULL, + `age` int NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true", + "enable_unique_key_merge_on_write" = "true", + "light_schema_change" = "true", + "enable_unique_key_skip_bitmap_column" = "true" + ); + """ + + // insert initial data + sql """ + INSERT INTO ${tableName7} VALUES + (1, 'alice', 100, 20), + (2, 'bob', 90, 21), + (3, 'charlie', 80, 22) + """ + + qt_select_initial7 "SELECT id, name, score, age FROM ${tableName7} ORDER BY id" + + try { + // create routine load with WHERE clause and flexible partial update + sql """ + CREATE ROUTINE LOAD ${job7} ON ${tableName7} + WHERE id > 1 + PROPERTIES + ( + "max_batch_interval" = "10", + "format" = "json", + "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaJsonTopic7}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + + // send JSON data - WHERE clause filters id > 1, so id=1 row should NOT be processed + def data7 = [ + '{"id": 1, "score": 999}', + '{"id": 2, "score": 95}', + '{"id": 3, "name": "chuck"}', + '{"id": 4, "name": "diana", "score": 70}' + ] + + data7.each { line -> + logger.info("Sending to Kafka: ${line}") + def record = new ProducerRecord<>(kafkaJsonTopic7, null, line) + producer.send(record).get() + } + producer.flush() + + // With skip_delete_bitmap=true and WHERE id > 1: + // - id=1: 1 version (not updated, filtered by WHERE) + // - id=2: 2 versions (original + partial update) + // - id=3: 2 versions (original + partial update) + // - id=4: 1 version (new row) + // Total: 6 rows, so expectedMinRows = 5 (waits for count > 5) + RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job7, tableName7, 5) + + // verify: id=1 should NOT be updated (filtered by WHERE), id=2,3,4 should be updated + qt_select_after_flex_where "SELECT id, name, score, age FROM ${tableName7} ORDER BY id" + } catch (Exception e) { + logger.error("Error during test: " + e.getMessage()) + throw e + } finally { + sql "STOP ROUTINE LOAD FOR ${job7}" + } + + // Test 8: Error case - table without skip_bitmap column + def kafkaJsonTopic8 = "test_routine_load_flexible_partial_update_no_skip_bitmap" + def tableName8 = "test_routine_load_flex_update_no_skip_bitmap" + def job8 = "test_flex_partial_update_job_no_skip_bitmap" + + sql """ DROP TABLE IF EXISTS ${tableName8} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName8} ( + `id` int NOT NULL, + `name` varchar(65533) NULL, + `score` int NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true", + "enable_unique_key_merge_on_write" = "true", + "enable_unique_key_skip_bitmap_column" = "false" + ); + """ + + test { + sql """ + CREATE ROUTINE LOAD ${job8} ON ${tableName8} + PROPERTIES + ( + "max_batch_interval" = "10", + "format" = "json", + "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaJsonTopic8}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + exception "Flexible partial update can only support table with skip bitmap hidden column" + } + + // Test 9: Error case - table with variant column + def kafkaJsonTopic9 = "test_routine_load_flexible_partial_update_variant" + def tableName9 = "test_routine_load_flex_update_variant" + def job9 = "test_flex_partial_update_job_variant" + + sql """ DROP TABLE IF EXISTS ${tableName9} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName9} ( + `id` int NOT NULL, + `name` varchar(65533) NULL, + `data` variant NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true", + "enable_unique_key_merge_on_write" = "true", + "enable_unique_key_skip_bitmap_column" = "true" + ); + """ + + test { + sql """ + CREATE ROUTINE LOAD ${job9} ON ${tableName9} + PROPERTIES + ( + "max_batch_interval" = "10", + "format" = "json", + "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaJsonTopic9}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + exception "Flexible partial update can only support table without variant columns" + } + + // Test 10: Error case - invalid unique_key_update_mode value + def kafkaJsonTopic10 = "test_routine_load_flexible_partial_update_invalid_mode" + def tableName10 = "test_routine_load_flex_update_invalid_mode" + def job10 = "test_flex_partial_update_job_invalid_mode" + + sql """ DROP TABLE IF EXISTS ${tableName10} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName10} ( + `id` int NOT NULL, + `name` varchar(65533) NULL, + `score` int NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true", + "enable_unique_key_merge_on_write" = "true", + "enable_unique_key_skip_bitmap_column" = "true" + ); + """ + + test { + sql """ + CREATE ROUTINE LOAD ${job10} ON ${tableName10} + PROPERTIES + ( + "max_batch_interval" = "10", + "format" = "json", + "unique_key_update_mode" = "INVALID_MODE" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaJsonTopic10}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + exception "unique_key_update_mode should be one of" + } + + // Test 11: UPDATE_FIXED_COLUMNS mode (backward compatibility) + def kafkaJsonTopic11 = "test_routine_load_fixed_columns_mode" + def tableName11 = "test_routine_load_fixed_columns_mode" + def job11 = "test_fixed_columns_mode_job" + + sql """ DROP TABLE IF EXISTS ${tableName11} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName11} ( + `id` int NOT NULL, + `name` varchar(65533) NULL, + `score` int NULL, + `age` int NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true", + "enable_unique_key_merge_on_write" = "true" + ); + """ + + // insert initial data + sql """ + INSERT INTO ${tableName11} VALUES + (1, 'alice', 100, 20), + (2, 'bob', 90, 21), + (3, 'charlie', 80, 22) + """ + + qt_select_initial11 "SELECT id, name, score, age FROM ${tableName11} ORDER BY id" + + try { + // create routine load with UPDATE_FIXED_COLUMNS mode + sql """ + CREATE ROUTINE LOAD ${job11} ON ${tableName11} + COLUMNS (id, score) + PROPERTIES + ( + "max_batch_interval" = "10", + "format" = "json", + "unique_key_update_mode" = "UPDATE_FIXED_COLUMNS" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaJsonTopic11}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + + def data11 = [ + '{"id": 1, "score": 150}', + '{"id": 2, "score": 95}', + '{"id": 4, "score": 85}' + ] + + data11.each { line -> + logger.info("Sending to Kafka: ${line}") + def record = new ProducerRecord<>(kafkaJsonTopic11, null, line) + producer.send(record).get() + } + producer.flush() + + // With skip_delete_bitmap=true, count = initial + kafka_messages = 3 + 3 = 6 + RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job11, tableName11, 5) + + qt_select_after_fixed_update "SELECT id, name, score, age FROM ${tableName11} ORDER BY id" + } catch (Exception e) { + logger.error("Error during test: " + e.getMessage()) + throw e + } finally { + sql "STOP ROUTINE LOAD FOR ${job11}" + } + + // Test 12: ALTER ROUTINE LOAD to change unique_key_update_mode + def kafkaJsonTopic12 = "test_routine_load_alter_flex_mode" + def tableName12 = "test_routine_load_alter_flex_mode" + def job12 = "test_alter_flex_mode_job" + + sql """ DROP TABLE IF EXISTS ${tableName12} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName12} ( + `id` int NOT NULL, + `name` varchar(65533) NULL, + `score` int NULL, + `age` int NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true", + "enable_unique_key_merge_on_write" = "true", + "light_schema_change" = "true", + "enable_unique_key_skip_bitmap_column" = "true" + ); + """ + + // insert initial data + sql """ + INSERT INTO ${tableName12} VALUES + (1, 'alice', 100, 20), + (2, 'bob', 90, 21), + (3, 'charlie', 80, 22) + """ + + qt_select_initial12 "SELECT id, name, score, age FROM ${tableName12} ORDER BY id" + + try { + // create routine load with UPSERT mode (default) + sql """ + CREATE ROUTINE LOAD ${job12} ON ${tableName12} + PROPERTIES + ( + "max_batch_interval" = "10", + "format" = "json" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaJsonTopic12}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + + // pause the job before altering + sql "PAUSE ROUTINE LOAD FOR ${job12}" + + // alter to UPDATE_FLEXIBLE_COLUMNS mode + sql """ + ALTER ROUTINE LOAD FOR ${job12} + PROPERTIES + ( + "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS" + ); + """ + + // verify the property was changed + def res = sql "SHOW ROUTINE LOAD FOR ${job12}" + def jobProperties = res[0][11].toString() + logger.info("Altered routine load job properties: ${jobProperties}") + assertTrue(jobProperties.contains("UPDATE_FLEXIBLE_COLUMNS")) + + // resume the job + sql "RESUME ROUTINE LOAD FOR ${job12}" + + // send JSON data with different columns per row + def data12 = [ + '{"id": 1, "score": 200}', + '{"id": 2, "age": 35}', + '{"id": 4, "name": "diana"}' + ] + + data12.each { line -> + logger.info("Sending to Kafka: ${line}") + def record = new ProducerRecord<>(kafkaJsonTopic12, null, line) + producer.send(record).get() + } + producer.flush() + + // With skip_delete_bitmap=true, count = initial + kafka_messages = 3 + 3 = 6 + RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job12, tableName12, 5) + + // verify flexible partial update results after alter + qt_select_after_alter_flex "SELECT id, name, score, age FROM ${tableName12} ORDER BY id" + } catch (Exception e) { + logger.error("Error during test: " + e.getMessage()) + throw e + } finally { + sql "STOP ROUTINE LOAD FOR ${job12}" + } + + // Test 13: ALTER ROUTINE LOAD - error when trying to change to flex mode with invalid settings + def kafkaJsonTopic13 = "test_routine_load_alter_flex_error" + def tableName13 = "test_routine_load_alter_flex_error" + def job13 = "test_alter_flex_error_job" + + sql """ DROP TABLE IF EXISTS ${tableName13} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName13} ( + `id` int NOT NULL, + `name` varchar(65533) NULL, + `score` int NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true", + "enable_unique_key_merge_on_write" = "true", + "enable_unique_key_skip_bitmap_column" = "false" + ); + """ + + try { + // create routine load with UPSERT mode (default) + sql """ + CREATE ROUTINE LOAD ${job13} ON ${tableName13} + PROPERTIES + ( + "max_batch_interval" = "10", + "format" = "json" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaJsonTopic13}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + + // pause the job before altering + sql "PAUSE ROUTINE LOAD FOR ${job13}" + + // try to alter to UPDATE_FLEXIBLE_COLUMNS mode - should fail because table doesn't have skip_bitmap + test { + sql """ + ALTER ROUTINE LOAD FOR ${job13} + PROPERTIES + ( + "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS" + ); + """ + exception "Flexible partial update can only support table with skip bitmap hidden column" + } + } catch (Exception e) { + logger.error("Error during test: " + e.getMessage()) + throw e + } finally { + sql "STOP ROUTINE LOAD FOR ${job13}" + } + + // Test 14: ALTER to flex mode fails when using CSV format + def kafkaJsonTopic14 = "test_routine_load_alter_flex_csv_error" + def tableName14 = "test_routine_load_alter_flex_csv_error" + def job14 = "test_alter_flex_csv_error_job" + + sql """ DROP TABLE IF EXISTS ${tableName14} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName14} ( + `id` int NOT NULL, + `name` varchar(65533) NULL, + `score` int NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true", + "enable_unique_key_merge_on_write" = "true", + "light_schema_change" = "true", + "enable_unique_key_skip_bitmap_column" = "true" + ); + """ + + try { + // create routine load with CSV format + sql """ + CREATE ROUTINE LOAD ${job14} ON ${tableName14} + COLUMNS TERMINATED BY "," + PROPERTIES + ( + "max_batch_interval" = "10", + "format" = "csv" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaJsonTopic14}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + + sql "PAUSE ROUTINE LOAD FOR ${job14}" + + // try to alter to UPDATE_FLEXIBLE_COLUMNS mode - should fail because CSV format + test { + sql """ + ALTER ROUTINE LOAD FOR ${job14} + PROPERTIES + ( + "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS" + ); + """ + exception "Flexible partial update only supports JSON format" + } + } catch (Exception e) { + logger.error("Error during test: " + e.getMessage()) + throw e + } finally { + sql "STOP ROUTINE LOAD FOR ${job14}" + } + + // Test 15: ALTER to flex mode fails when using fuzzy_parse + def kafkaJsonTopic15 = "test_routine_load_alter_flex_fuzzy_error" + def tableName15 = "test_routine_load_alter_flex_fuzzy_error" + def job15 = "test_alter_flex_fuzzy_error_job" + + sql """ DROP TABLE IF EXISTS ${tableName15} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName15} ( + `id` int NOT NULL, + `name` varchar(65533) NULL, + `score` int NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true", + "enable_unique_key_merge_on_write" = "true", + "light_schema_change" = "true", + "enable_unique_key_skip_bitmap_column" = "true" + ); + """ + + try { + // create routine load with fuzzy_parse enabled + sql """ + CREATE ROUTINE LOAD ${job15} ON ${tableName15} + PROPERTIES + ( + "max_batch_interval" = "10", + "format" = "json", + "fuzzy_parse" = "true" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaJsonTopic15}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + + sql "PAUSE ROUTINE LOAD FOR ${job15}" + + // try to alter to UPDATE_FLEXIBLE_COLUMNS mode - should fail because fuzzy_parse + test { + sql """ + ALTER ROUTINE LOAD FOR ${job15} + PROPERTIES + ( + "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS" + ); + """ + exception "Flexible partial update does not support fuzzy_parse" + } + } catch (Exception e) { + logger.error("Error during test: " + e.getMessage()) + throw e + } finally { + sql "STOP ROUTINE LOAD FOR ${job15}" + } + + // Test 16: ALTER to flex mode fails with jsonpaths + def kafkaJsonTopic16 = "test_routine_load_alter_flex_jsonpaths_error" + def tableName16 = "test_routine_load_alter_flex_jsonpaths_error" + def job16 = "test_alter_flex_jsonpaths_error_job" + + sql """ DROP TABLE IF EXISTS ${tableName16} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName16} ( + `id` int NOT NULL, + `name` varchar(65533) NULL, + `score` int NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true", + "enable_unique_key_merge_on_write" = "true", + "light_schema_change" = "true", + "enable_unique_key_skip_bitmap_column" = "true" + ); + """ + + try { + // create routine load with jsonpaths (UPSERT mode) + sql """ + CREATE ROUTINE LOAD ${job16} ON ${tableName16} + PROPERTIES + ( + "max_batch_interval" = "10", + "format" = "json", + "jsonpaths" = '[\"\$.id\", \"\$.name\", \"\$.score\"]' + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaJsonTopic16}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + + sql "PAUSE ROUTINE LOAD FOR ${job16}" + + // alter to UPDATE_FLEXIBLE_COLUMNS mode - should fail because jsonpaths is set + test { + sql """ + ALTER ROUTINE LOAD FOR ${job16} + PROPERTIES + ( + "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS" + ); + """ + exception "Flexible partial update does not support jsonpaths" + } + } catch (Exception e) { + logger.error("Error during test: " + e.getMessage()) + throw e + } finally { + sql "STOP ROUTINE LOAD FOR ${job16}" + } + + // Test 17: ALTER to flex mode fails when COLUMNS clause is specified + def kafkaJsonTopic17 = "test_routine_load_alter_flex_columns_error" + def tableName17 = "test_routine_load_alter_flex_columns_error" + def job17 = "test_alter_flex_columns_error_job" + + sql """ DROP TABLE IF EXISTS ${tableName17} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName17} ( + `id` int NOT NULL, + `name` varchar(65533) NULL, + `score` int NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true", + "enable_unique_key_merge_on_write" = "true", + "light_schema_change" = "true", + "enable_unique_key_skip_bitmap_column" = "true" + ); + """ + + try { + // create routine load with COLUMNS clause + sql """ + CREATE ROUTINE LOAD ${job17} ON ${tableName17} + COLUMNS (id, name, score) + PROPERTIES + ( + "max_batch_interval" = "10", + "format" = "json" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaJsonTopic17}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + + sql "PAUSE ROUTINE LOAD FOR ${job17}" + + // try to alter to UPDATE_FLEXIBLE_COLUMNS mode - should fail because COLUMNS clause + test { + sql """ + ALTER ROUTINE LOAD FOR ${job17} + PROPERTIES + ( + "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS" + ); + """ + exception "Flexible partial update does not support COLUMNS specification" + } + } catch (Exception e) { + logger.error("Error during test: " + e.getMessage()) + throw e + } finally { + sql "STOP ROUTINE LOAD FOR ${job17}" + } + + // Test 18: ALTER to flex mode succeeds with WHERE clause + def kafkaJsonTopic18 = "test_routine_load_alter_flex_where" + def tableName18 = "test_routine_load_alter_flex_where" + def job18 = "test_alter_flex_where_job" + + sql """ DROP TABLE IF EXISTS ${tableName18} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName18} ( + `id` int NOT NULL, + `name` varchar(65533) NULL, + `score` int NULL, + `age` int NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true", + "enable_unique_key_merge_on_write" = "true", + "light_schema_change" = "true", + "enable_unique_key_skip_bitmap_column" = "true" + ); + """ + + // insert initial data + sql """ + INSERT INTO ${tableName18} VALUES + (1, 'alice', 100, 20), + (2, 'bob', 90, 21) + """ + + qt_select_initial18 "SELECT id, name, score, age FROM ${tableName18} ORDER BY id" + + try { + // create routine load with WHERE clause (UPSERT mode) + sql """ + CREATE ROUTINE LOAD ${job18} ON ${tableName18} + WHERE id > 1 + PROPERTIES + ( + "max_batch_interval" = "10", + "format" = "json" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaJsonTopic18}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + + sql "PAUSE ROUTINE LOAD FOR ${job18}" + + // alter to UPDATE_FLEXIBLE_COLUMNS mode - should succeed + sql """ + ALTER ROUTINE LOAD FOR ${job18} + PROPERTIES + ( + "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS" + ); + """ + + // verify the property was changed + def res = sql "SHOW ROUTINE LOAD FOR ${job18}" + def jobProperties = res[0][11].toString() + logger.info("Altered routine load job properties: ${jobProperties}") + assertTrue(jobProperties.contains("UPDATE_FLEXIBLE_COLUMNS")) + + sql "RESUME ROUTINE LOAD FOR ${job18}" + + // send JSON data - WHERE clause filters id > 1 + def data18 = [ + '{"id": 1, "score": 999}', + '{"id": 2, "score": 95}', + '{"id": 3, "name": "charlie", "score": 80}' + ] + + data18.each { line -> + logger.info("Sending to Kafka: ${line}") + def record = new ProducerRecord<>(kafkaJsonTopic18, null, line) + producer.send(record).get() + } + producer.flush() + + // With skip_delete_bitmap=true, count = initial + kafka_messages = 2 + 3 = 5 + RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job18, tableName18, 4) + + // verify: id=1 should NOT be updated (filtered by WHERE), id=2,3 should be updated + qt_select_after_alter_flex_where "SELECT id, name, score, age FROM ${tableName18} ORDER BY id" + } catch (Exception e) { + logger.error("Error during test: " + e.getMessage()) + throw e + } finally { + sql "STOP ROUTINE LOAD FOR ${job18}" + } + + // Test 19: ALTER to flex mode fails on non-MoW table + def kafkaJsonTopic19 = "test_routine_load_alter_flex_non_mow_error" + def tableName19 = "test_routine_load_alter_flex_non_mow_error" + def job19 = "test_alter_flex_non_mow_error_job" + + sql """ DROP TABLE IF EXISTS ${tableName19} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName19} ( + `id` int NOT NULL, + `name` varchar(65533) NULL, + `score` int NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "enable_unique_key_merge_on_write" = "false" + ); + """ + + try { + // create routine load + sql """ + CREATE ROUTINE LOAD ${job19} ON ${tableName19} + PROPERTIES + ( + "max_batch_interval" = "10", + "format" = "json" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaJsonTopic19}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + + sql "PAUSE ROUTINE LOAD FOR ${job19}" + + // try to alter to UPDATE_FLEXIBLE_COLUMNS mode - should fail because non-MoW + test { + sql """ + ALTER ROUTINE LOAD FOR ${job19} + PROPERTIES + ( + "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS" + ); + """ + exception "Flexible partial update is only supported in unique table MoW" + } + } catch (Exception e) { + logger.error("Error during test: " + e.getMessage()) + throw e + } finally { + sql "STOP ROUTINE LOAD FOR ${job19}" + } + + // Test 20: ALTER from flex mode to UPSERT mode (success case) + def kafkaJsonTopic20 = "test_routine_load_alter_flex_to_upsert" + def tableName20 = "test_routine_load_alter_flex_to_upsert" + def job20 = "test_alter_flex_to_upsert_job" + + sql """ DROP TABLE IF EXISTS ${tableName20} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName20} ( + `id` int NOT NULL, + `name` varchar(65533) NULL, + `score` int NULL, + `age` int NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true", + "enable_unique_key_merge_on_write" = "true", + "light_schema_change" = "true", + "enable_unique_key_skip_bitmap_column" = "true" + ); + """ + + // insert initial data + sql """ + INSERT INTO ${tableName20} VALUES + (1, 'alice', 100, 20), + (2, 'bob', 90, 21) + """ + + qt_select_initial20 "SELECT id, name, score, age FROM ${tableName20} ORDER BY id" + + try { + // create routine load with flex mode + sql """ + CREATE ROUTINE LOAD ${job20} ON ${tableName20} + PROPERTIES + ( + "max_batch_interval" = "10", + "format" = "json", + "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaJsonTopic20}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + + sql "PAUSE ROUTINE LOAD FOR ${job20}" + + // alter to UPSERT mode + sql """ + ALTER ROUTINE LOAD FOR ${job20} + PROPERTIES + ( + "unique_key_update_mode" = "UPSERT" + ); + """ + + // verify the property was changed + def res = sql "SHOW ROUTINE LOAD FOR ${job20}" + def jobProperties = res[0][11].toString() + logger.info("Altered routine load job properties: ${jobProperties}") + assertTrue(jobProperties.contains("UPSERT")) + + sql "RESUME ROUTINE LOAD FOR ${job20}" + + // send JSON data - with UPSERT mode, missing columns should be NULL + def data20 = [ + '{"id": 1, "score": 200}', + '{"id": 3, "name": "charlie", "score": 80, "age": 22}' + ] + + data20.each { line -> + logger.info("Sending to Kafka: ${line}") + def record = new ProducerRecord<>(kafkaJsonTopic20, null, line) + producer.send(record).get() + } + producer.flush() + + // With skip_delete_bitmap=true, count = initial + kafka_messages = 2 + 2 = 4 + RoutineLoadTestUtils.waitForTaskFinishMoW(runSql, job20, tableName20, 3) + + // with UPSERT, id=1 should have NULL for name and age (full row replacement) + qt_select_after_alter_upsert "SELECT id, name, score, age FROM ${tableName20} ORDER BY id" + } catch (Exception e) { + logger.error("Error during test: " + e.getMessage()) + throw e + } finally { + sql "STOP ROUTINE LOAD FOR ${job20}" + } + + // Test 21: ALTER from flex mode to UPDATE_FIXED_COLUMNS mode (success case) + def kafkaJsonTopic21 = "test_routine_load_alter_flex_to_fixed" + def tableName21 = "test_routine_load_alter_flex_to_fixed" + def job21 = "test_alter_flex_to_fixed_job" + + sql """ DROP TABLE IF EXISTS ${tableName21} force;""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName21} ( + `id` int NOT NULL, + `name` varchar(65533) NULL, + `score` int NULL, + `age` int NULL + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true", + "enable_unique_key_merge_on_write" = "true", + "light_schema_change" = "true", + "enable_unique_key_skip_bitmap_column" = "true" + ); + """ + + // insert initial data + sql """ + INSERT INTO ${tableName21} VALUES + (1, 'alice', 100, 20), + (2, 'bob', 90, 21) + """ + + qt_select_initial21 "SELECT id, name, score, age FROM ${tableName21} ORDER BY id" + + try { + // create routine load with flex mode + sql """ + CREATE ROUTINE LOAD ${job21} ON ${tableName21} + PROPERTIES + ( + "max_batch_interval" = "10", + "format" = "json", + "unique_key_update_mode" = "UPDATE_FLEXIBLE_COLUMNS" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaJsonTopic21}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + + sql "PAUSE ROUTINE LOAD FOR ${job21}" + + // alter to UPDATE_FIXED_COLUMNS mode - need to add COLUMNS clause via ALTER + // Note: This changes from flexible to fixed partial update + sql """ + ALTER ROUTINE LOAD FOR ${job21} + PROPERTIES + ( + "unique_key_update_mode" = "UPDATE_FIXED_COLUMNS" + ); + """ + + // verify the property was changed + def res = sql "SHOW ROUTINE LOAD FOR ${job21}" + def jobProperties = res[0][11].toString() + logger.info("Altered routine load job properties: ${jobProperties}") + assertTrue(jobProperties.contains("UPDATE_FIXED_COLUMNS")) + + } catch (Exception e) { + logger.error("Error during test: " + e.getMessage()) + throw e + } finally { + sql "STOP ROUTINE LOAD FOR ${job21}" + } + } +} diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.groovy index a6a97253e986d0..eab01074ff3715 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_partial_update_new_key_behavior.groovy @@ -276,7 +276,7 @@ suite("test_routine_load_partial_update_new_key_behavior", "nonConcurrent") { "property.kafka_default_offsets" = "OFFSET_BEGINNING" ); """ - exception "partial_update_new_key_behavior can only be set when partial_columns is true" + exception "partial_update_new_key_behavior can only be set when partial update is enabled" } } }