From b9e72bcfcf1a145db0ca5a5ff1b731c83d1ea3bd Mon Sep 17 00:00:00 2001 From: wzx140 Date: Mon, 8 Jun 2026 14:29:52 +0800 Subject: [PATCH 1/2] [client][server] Support adding aggregation columns Support carrying aggregation functions through alter table add column requests. Validate aggregation functions during schema alter and reject them on non-aggregation tables. --- .../client/utils/ClientRpcMessageUtils.java | 16 ++++++ .../fluss/client/admin/FlussAdminITCase.java | 56 +++++++++++++++++++ .../apache/fluss/metadata/TableChange.java | 31 +++++++++- fluss-rpc/src/main/proto/FlussApi.proto | 1 + .../server/coordinator/MetadataManager.java | 15 +++++ .../server/coordinator/SchemaUpdate.java | 7 ++- .../server/utils/ServerRpcMessageUtils.java | 36 +++++++++--- .../utils/TableDescriptorValidation.java | 19 ++++++- 8 files changed, 169 insertions(+), 12 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java index 0bd67da17d..1051613987 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java @@ -34,9 +34,11 @@ import org.apache.fluss.config.cluster.AlterConfigOpType; import org.apache.fluss.config.cluster.ColumnPositionType; import org.apache.fluss.config.cluster.ConfigEntry; +import org.apache.fluss.exception.FlussRuntimeException; import org.apache.fluss.fs.FsPath; import org.apache.fluss.fs.FsPathAndFileName; import org.apache.fluss.fs.token.ObtainedSecurityToken; +import org.apache.fluss.metadata.AggFunction; import org.apache.fluss.metadata.DatabaseChange; import org.apache.fluss.metadata.DatabaseSummary; import org.apache.fluss.metadata.PartitionInfo; @@ -94,11 +96,13 @@ import org.apache.fluss.rpc.messages.RegisterProducerOffsetsRequest; import org.apache.fluss.rpc.messages.ReleaseKvSnapshotLeaseRequest; import org.apache.fluss.rpc.protocol.MergeMode; +import org.apache.fluss.utils.InstantiationUtils; import org.apache.fluss.utils.json.DataTypeJsonSerde; import org.apache.fluss.utils.json.JsonSerdeUtils; import javax.annotation.Nullable; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -653,10 +657,22 @@ public static PbAddColumn toPbAddColumn(TableChange.AddColumn addColumn) { if (addColumn.getComment() != null) { pbAddColumn.setComment(addColumn.getComment()); } + if (addColumn.getAggFunction().isPresent()) { + pbAddColumn.setSerializedAggFunction( + serializeAggFunction(addColumn.getAggFunction().get())); + } return pbAddColumn; } + private static byte[] serializeAggFunction(AggFunction aggFunction) { + try { + return InstantiationUtils.serializeObject(aggFunction); + } catch (IOException e) { + throw new FlussRuntimeException("Failed to serialize aggregation function.", e); + } + } + public static PbDropColumn toPbDropColumn(TableChange.DropColumn dropColumn) { return new PbDropColumn().setColumnName(dropColumn.getName()); } diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java index 65cca8195a..a12a43cde4 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java @@ -568,6 +568,62 @@ void testAlterTableColumn() throws Exception { .hasMessageContaining("Column nested_row already exists"); } + @Test + void testAlterAggregationTableColumnWithAggFunction() throws Exception { + TablePath tablePath = TablePath.of("test_db", "alter_aggregation_table_column"); + Map properties = new HashMap<>(); + properties.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "aggregation"); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("value", DataTypes.BIGINT(), AggFunctions.SUM()) + .primaryKey("id") + .build()) + .distributedBy(3, "id") + .properties(properties) + .build(); + admin.createTable(tablePath, tableDescriptor, false).get(); + + admin.alterTable( + tablePath, + Collections.singletonList( + TableChange.addColumn( + "new_value", + DataTypes.BIGINT(), + "new aggregate column", + TableChange.ColumnPosition.last(), + AggFunctions.SUM())), + false) + .get(); + + SchemaInfo schemaInfo = admin.getTableSchema(tablePath).get(); + assertThat(schemaInfo.getSchema().getAggFunction("new_value")).hasValue(AggFunctions.SUM()); + } + + @Test + void testAlterNonAggregationTableColumnWithAggFunction() throws Exception { + TablePath tablePath = TablePath.of("test_db", "alter_non_aggregation_table_column"); + admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get(); + + assertThatThrownBy( + () -> + admin.alterTable( + tablePath, + Collections.singletonList( + TableChange.addColumn( + "new_value", + DataTypes.BIGINT(), + "new aggregate column", + TableChange.ColumnPosition.last(), + AggFunctions.SUM())), + false) + .get()) + .hasMessageContaining( + "Aggregation function is only supported for aggregation merge engine table"); + } + @Test void testCreateInvalidDatabaseAndTable() throws Exception { assertThatThrownBy( diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java b/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java index bb027d6905..0edb5e31bc 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java @@ -22,6 +22,7 @@ import javax.annotation.Nullable; import java.util.Objects; +import java.util.Optional; /** {@link TableChange} represents the modification of the Fluss Table. */ public interface TableChange { @@ -42,7 +43,21 @@ static AddColumn addColumn( DataType dataType, @Nullable String comment, ColumnPosition position) { - return new AddColumn(columnName, dataType, comment, position); + return new AddColumn(columnName, dataType, comment, position, null); + } + + /** + * A table change to add the column with specified position and aggregation function. + * + * @return a TableChange represents the modification. + */ + static AddColumn addColumn( + String columnName, + DataType dataType, + @Nullable String comment, + ColumnPosition position, + @Nullable AggFunction aggFunction) { + return new AddColumn(columnName, dataType, comment, position, aggFunction); } /** @@ -230,15 +245,21 @@ class AddColumn implements SchemaChange { private final String name; private final DataType dataType; private final @Nullable String comment; + private final @Nullable AggFunction aggFunction; private final ColumnPosition position; private AddColumn( - String name, DataType dataType, @Nullable String comment, ColumnPosition position) { + String name, + DataType dataType, + @Nullable String comment, + ColumnPosition position, + @Nullable AggFunction aggFunction) { this.name = name; this.dataType = dataType; this.comment = comment; this.position = position; + this.aggFunction = aggFunction; } public String getName() { @@ -258,6 +279,10 @@ public ColumnPosition getPosition() { return position; } + public Optional getAggFunction() { + return Optional.ofNullable(aggFunction); + } + @Override public String toString() { return "AddColumn{" @@ -269,6 +294,8 @@ public String toString() { + ", comment='" + comment + '\'' + + ", aggFunction=" + + aggFunction + ", position=" + position + '}'; diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index e8381215fc..77f21cd100 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -1154,6 +1154,7 @@ message PbAddColumn { required bytes data_type_json = 2; optional string comment = 3; required int32 column_position_type = 4; // LAST=0,FIRST=1,AFTER=3 + optional bytes serialized_agg_function = 5; } message PbDropColumn { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index 106d4ef66f..08b8ef42a9 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -39,6 +39,7 @@ import org.apache.fluss.metadata.DatabaseDescriptor; import org.apache.fluss.metadata.DatabaseInfo; import org.apache.fluss.metadata.DatabaseSummary; +import org.apache.fluss.metadata.MergeEngineType; import org.apache.fluss.metadata.ResolvedPartitionSpec; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.SchemaInfo; @@ -75,7 +76,9 @@ import java.util.Set; import java.util.concurrent.Callable; +import static org.apache.fluss.server.utils.TableDescriptorValidation.validateAggregationFunctionParameters; import static org.apache.fluss.server.utils.TableDescriptorValidation.validateAlterTableProperties; +import static org.apache.fluss.server.utils.TableDescriptorValidation.validateNoAggregationFunctions; /** A manager for metadata. */ public class MetadataManager { @@ -440,6 +443,7 @@ public void alterTableSchema( if (!schemaChanges.isEmpty()) { Schema newSchema = SchemaUpdate.applySchemaChanges(table.getSchema(), schemaChanges); + validateAlterTableSchema(table, newSchema); LakeCatalog.Context lakeCatalogContext = new CoordinatorService.DefaultLakeCatalogContext( false, @@ -473,6 +477,17 @@ public void alterTableSchema( } } + static void validateAlterTableSchema(TableInfo table, Schema newSchema) { + if (table.getTableConfig() + .getMergeEngineType() + .map(MergeEngineType.AGGREGATION::equals) + .orElse(false)) { + validateAggregationFunctionParameters(newSchema); + } else { + validateNoAggregationFunctions(newSchema); + } + } + private void syncSchemaChangesToLake( TablePath tablePath, TableInfo tableInfo, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java index d1ad0c16c2..e85219a3b3 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java @@ -82,7 +82,12 @@ private SchemaUpdate addColumn(TableChange.AddColumn addColumn) { } // Delegate the actual addition to the builder - builder.column(addColumn.getName(), addColumn.getDataType()); + if (addColumn.getAggFunction().isPresent()) { + builder.column( + addColumn.getName(), addColumn.getDataType(), addColumn.getAggFunction().get()); + } else { + builder.column(addColumn.getName(), addColumn.getDataType()); + } // Fixed: Use null check for the String comment String comment = addColumn.getComment(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java index f7a89828ab..1d5e5b043f 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java @@ -27,9 +27,11 @@ import org.apache.fluss.config.cluster.AlterConfigOpType; import org.apache.fluss.config.cluster.ColumnPositionType; import org.apache.fluss.config.cluster.ConfigEntry; +import org.apache.fluss.exception.FlussRuntimeException; import org.apache.fluss.fs.FsPath; import org.apache.fluss.fs.token.ObtainedSecurityToken; import org.apache.fluss.lake.committer.LakeCommitResult; +import org.apache.fluss.metadata.AggFunction; import org.apache.fluss.metadata.DatabaseChange; import org.apache.fluss.metadata.DatabaseSummary; import org.apache.fluss.metadata.PartitionSpec; @@ -197,6 +199,7 @@ import org.apache.fluss.server.zk.data.PartitionRegistration; import org.apache.fluss.server.zk.data.lake.LakeTable; import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot; +import org.apache.fluss.utils.InstantiationUtils; import org.apache.fluss.utils.json.DataTypeJsonSerde; import org.apache.fluss.utils.json.JsonSerdeUtils; import org.apache.fluss.utils.json.TableBucketOffsets; @@ -206,6 +209,7 @@ import javax.annotation.Nullable; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -355,17 +359,33 @@ public static List toAddColumns(List addColumns) { return addColumns.stream() .filter(Objects::nonNull) .map( - pbAddColumn -> - TableChange.addColumn( - pbAddColumn.getColumnName(), - JsonSerdeUtils.readValue( - pbAddColumn.getDataTypeJson(), - DataTypeJsonSerde.INSTANCE), - pbAddColumn.hasComment() ? pbAddColumn.getComment() : null, - toColumnPosition(pbAddColumn.getColumnPositionType()))) + pbAddColumn -> { + AggFunction aggFunction = + pbAddColumn.hasSerializedAggFunction() + ? deserializeAggFunction( + pbAddColumn.getSerializedAggFunction()) + : null; + return TableChange.addColumn( + pbAddColumn.getColumnName(), + JsonSerdeUtils.readValue( + pbAddColumn.getDataTypeJson(), + DataTypeJsonSerde.INSTANCE), + pbAddColumn.hasComment() ? pbAddColumn.getComment() : null, + toColumnPosition(pbAddColumn.getColumnPositionType()), + aggFunction); + }) .collect(Collectors.toList()); } + private static AggFunction deserializeAggFunction(byte[] serializedAggFunction) { + try { + return InstantiationUtils.deserializeObject( + serializedAggFunction, AggFunction.class.getClassLoader()); + } catch (IOException | ClassNotFoundException e) { + throw new FlussRuntimeException("Failed to deserialize aggregation function.", e); + } + } + public static List toDropColumns(List dropColumns) { return dropColumns.stream() .filter(Objects::nonNull) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java index 949afd8c67..fd11419dbd 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java @@ -323,6 +323,9 @@ private static void checkArrowCompression(Configuration tableConf) { private static void checkMergeEngine( Configuration tableConf, boolean hasPrimaryKey, Schema schema) { MergeEngineType mergeEngine = tableConf.get(ConfigOptions.TABLE_MERGE_ENGINE); + if (mergeEngine != MergeEngineType.AGGREGATION) { + validateNoAggregationFunctions(schema); + } if (mergeEngine != null) { if (!hasPrimaryKey) { throw new InvalidConfigException( @@ -377,6 +380,20 @@ private static void checkMergeEngine( } } + /** Validates that the schema doesn't contain any aggregation functions. */ + public static void validateNoAggregationFunctions(Schema schema) { + for (Schema.Column column : schema.getColumns()) { + Optional aggFunction = column.getAggFunction(); + if (aggFunction.isPresent()) { + throw new InvalidConfigException( + String.format( + "Aggregation function is only supported for aggregation merge engine table, " + + "but column '%s' has aggregation function '%s'.", + column.getName(), aggFunction.get())); + } + } + } + /** * Validates aggregation function parameters in the schema. * @@ -388,7 +405,7 @@ private static void checkMergeEngine( * @throws InvalidConfigException if any aggregation function has invalid parameters or data * types */ - private static void validateAggregationFunctionParameters(Schema schema) { + public static void validateAggregationFunctionParameters(Schema schema) { // Get primary key columns for early exit List primaryKeys = schema.getPrimaryKeyColumnNames(); From 0875f8a84b18fbb84488e399a883c8902e1dba00 Mon Sep 17 00:00:00 2001 From: wzx140 Date: Fri, 12 Jun 2026 14:32:07 +0800 Subject: [PATCH 2/2] [rpc][server] Use structured agg function in add column RPC --- .../client/utils/ClientRpcMessageUtils.java | 20 ++++------- fluss-rpc/src/main/proto/FlussApi.proto | 3 +- .../server/coordinator/MetadataManager.java | 15 +------- .../server/utils/ServerRpcMessageUtils.java | 35 +++++++++++-------- .../utils/TableDescriptorValidation.java | 18 ++++++++-- 5 files changed, 47 insertions(+), 44 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java index 1051613987..1273c12172 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java @@ -34,7 +34,6 @@ import org.apache.fluss.config.cluster.AlterConfigOpType; import org.apache.fluss.config.cluster.ColumnPositionType; import org.apache.fluss.config.cluster.ConfigEntry; -import org.apache.fluss.exception.FlussRuntimeException; import org.apache.fluss.fs.FsPath; import org.apache.fluss.fs.FsPathAndFileName; import org.apache.fluss.fs.token.ObtainedSecurityToken; @@ -96,13 +95,11 @@ import org.apache.fluss.rpc.messages.RegisterProducerOffsetsRequest; import org.apache.fluss.rpc.messages.ReleaseKvSnapshotLeaseRequest; import org.apache.fluss.rpc.protocol.MergeMode; -import org.apache.fluss.utils.InstantiationUtils; import org.apache.fluss.utils.json.DataTypeJsonSerde; import org.apache.fluss.utils.json.JsonSerdeUtils; import javax.annotation.Nullable; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -658,21 +655,18 @@ public static PbAddColumn toPbAddColumn(TableChange.AddColumn addColumn) { pbAddColumn.setComment(addColumn.getComment()); } if (addColumn.getAggFunction().isPresent()) { - pbAddColumn.setSerializedAggFunction( - serializeAggFunction(addColumn.getAggFunction().get())); + AggFunction aggFunction = addColumn.getAggFunction().get(); + pbAddColumn.setAggFunctionType(aggFunction.getType().toString()); + aggFunction + .getParameters() + .forEach( + (key, value) -> + pbAddColumn.addAggFunctionParam().setKey(key).setValue(value)); } return pbAddColumn; } - private static byte[] serializeAggFunction(AggFunction aggFunction) { - try { - return InstantiationUtils.serializeObject(aggFunction); - } catch (IOException e) { - throw new FlussRuntimeException("Failed to serialize aggregation function.", e); - } - } - public static PbDropColumn toPbDropColumn(TableChange.DropColumn dropColumn) { return new PbDropColumn().setColumnName(dropColumn.getName()); } diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index 77f21cd100..356913ccc1 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -1154,7 +1154,8 @@ message PbAddColumn { required bytes data_type_json = 2; optional string comment = 3; required int32 column_position_type = 4; // LAST=0,FIRST=1,AFTER=3 - optional bytes serialized_agg_function = 5; + optional string agg_function_type = 5; + repeated PbKeyValue agg_function_params = 6; } message PbDropColumn { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index 08b8ef42a9..65b2ee24bd 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -39,7 +39,6 @@ import org.apache.fluss.metadata.DatabaseDescriptor; import org.apache.fluss.metadata.DatabaseInfo; import org.apache.fluss.metadata.DatabaseSummary; -import org.apache.fluss.metadata.MergeEngineType; import org.apache.fluss.metadata.ResolvedPartitionSpec; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.SchemaInfo; @@ -76,9 +75,8 @@ import java.util.Set; import java.util.concurrent.Callable; -import static org.apache.fluss.server.utils.TableDescriptorValidation.validateAggregationFunctionParameters; import static org.apache.fluss.server.utils.TableDescriptorValidation.validateAlterTableProperties; -import static org.apache.fluss.server.utils.TableDescriptorValidation.validateNoAggregationFunctions; +import static org.apache.fluss.server.utils.TableDescriptorValidation.validateAlterTableSchema; /** A manager for metadata. */ public class MetadataManager { @@ -477,17 +475,6 @@ public void alterTableSchema( } } - static void validateAlterTableSchema(TableInfo table, Schema newSchema) { - if (table.getTableConfig() - .getMergeEngineType() - .map(MergeEngineType.AGGREGATION::equals) - .orElse(false)) { - validateAggregationFunctionParameters(newSchema); - } else { - validateNoAggregationFunctions(newSchema); - } - } - private void syncSchemaChangesToLake( TablePath tablePath, TableInfo tableInfo, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java index 1d5e5b043f..4dfd2a62d1 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java @@ -27,11 +27,13 @@ import org.apache.fluss.config.cluster.AlterConfigOpType; import org.apache.fluss.config.cluster.ColumnPositionType; import org.apache.fluss.config.cluster.ConfigEntry; -import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.exception.InvalidConfigException; import org.apache.fluss.fs.FsPath; import org.apache.fluss.fs.token.ObtainedSecurityToken; import org.apache.fluss.lake.committer.LakeCommitResult; import org.apache.fluss.metadata.AggFunction; +import org.apache.fluss.metadata.AggFunctionType; +import org.apache.fluss.metadata.AggFunctions; import org.apache.fluss.metadata.DatabaseChange; import org.apache.fluss.metadata.DatabaseSummary; import org.apache.fluss.metadata.PartitionSpec; @@ -199,7 +201,6 @@ import org.apache.fluss.server.zk.data.PartitionRegistration; import org.apache.fluss.server.zk.data.lake.LakeTable; import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot; -import org.apache.fluss.utils.InstantiationUtils; import org.apache.fluss.utils.json.DataTypeJsonSerde; import org.apache.fluss.utils.json.JsonSerdeUtils; import org.apache.fluss.utils.json.TableBucketOffsets; @@ -209,7 +210,6 @@ import javax.annotation.Nullable; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -360,11 +360,7 @@ public static List toAddColumns(List addColumns) { .filter(Objects::nonNull) .map( pbAddColumn -> { - AggFunction aggFunction = - pbAddColumn.hasSerializedAggFunction() - ? deserializeAggFunction( - pbAddColumn.getSerializedAggFunction()) - : null; + AggFunction aggFunction = toAggFunction(pbAddColumn); return TableChange.addColumn( pbAddColumn.getColumnName(), JsonSerdeUtils.readValue( @@ -377,13 +373,24 @@ public static List toAddColumns(List addColumns) { .collect(Collectors.toList()); } - private static AggFunction deserializeAggFunction(byte[] serializedAggFunction) { - try { - return InstantiationUtils.deserializeObject( - serializedAggFunction, AggFunction.class.getClassLoader()); - } catch (IOException | ClassNotFoundException e) { - throw new FlussRuntimeException("Failed to deserialize aggregation function.", e); + private static AggFunction toAggFunction(PbAddColumn pbAddColumn) { + if (!pbAddColumn.hasAggFunctionType()) { + return null; } + + AggFunctionType type = AggFunctionType.fromString(pbAddColumn.getAggFunctionType()); + if (type == null) { + throw new InvalidConfigException( + String.format( + "Unknown aggregation function type: %s", + pbAddColumn.getAggFunctionType())); + } + + Map parameters = new HashMap<>(); + for (PbKeyValue parameter : pbAddColumn.getAggFunctionParamsList()) { + parameters.put(parameter.getKey(), parameter.getValue()); + } + return AggFunctions.of(type, parameters); } public static List toDropColumns(List dropColumns) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java index fd11419dbd..6b60a94099 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java @@ -17,6 +17,7 @@ package org.apache.fluss.server.utils; +import org.apache.fluss.annotation.Internal; import org.apache.fluss.config.ConfigOption; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; @@ -129,6 +130,19 @@ public static void validateTableDescriptor( checkTableLakeFormatMatchesCluster(tableConf, clusterDataLakeFormat); } + /** Validates the schema after altering table columns. */ + @Internal + public static void validateAlterTableSchema(TableInfo table, Schema newSchema) { + if (table.getTableConfig() + .getMergeEngineType() + .map(MergeEngineType.AGGREGATION::equals) + .orElse(false)) { + validateAggregationFunctionParameters(newSchema); + } else { + validateNoAggregationFunctions(newSchema); + } + } + private static void checkTableLakeFormatMatchesCluster( Configuration tableConf, @Nullable DataLakeFormat clusterDataLakeFormat) { if (clusterDataLakeFormat == null) { @@ -381,7 +395,7 @@ private static void checkMergeEngine( } /** Validates that the schema doesn't contain any aggregation functions. */ - public static void validateNoAggregationFunctions(Schema schema) { + private static void validateNoAggregationFunctions(Schema schema) { for (Schema.Column column : schema.getColumns()) { Optional aggFunction = column.getAggFunction(); if (aggFunction.isPresent()) { @@ -405,7 +419,7 @@ public static void validateNoAggregationFunctions(Schema schema) { * @throws InvalidConfigException if any aggregation function has invalid parameters or data * types */ - public static void validateAggregationFunctionParameters(Schema schema) { + private static void validateAggregationFunctionParameters(Schema schema) { // Get primary key columns for early exit List primaryKeys = schema.getPrimaryKeyColumnNames();