Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.fs.FsPathAndFileName;
import org.apache.fluss.fs.token.ObtainedSecurityToken;
import org.apache.fluss.metadata.AggFunction;
import org.apache.fluss.metadata.DatabaseChange;
import org.apache.fluss.metadata.DatabaseSummary;
import org.apache.fluss.metadata.PartitionInfo;
Expand Down Expand Up @@ -653,6 +654,15 @@ public static PbAddColumn toPbAddColumn(TableChange.AddColumn addColumn) {
if (addColumn.getComment() != null) {
pbAddColumn.setComment(addColumn.getComment());
}
if (addColumn.getAggFunction().isPresent()) {
AggFunction aggFunction = addColumn.getAggFunction().get();
pbAddColumn.setAggFunctionType(aggFunction.getType().toString());
aggFunction
.getParameters()
.forEach(
(key, value) ->
pbAddColumn.addAggFunctionParam().setKey(key).setValue(value));
}

return pbAddColumn;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,62 @@ void testAlterTableColumn() throws Exception {
.hasMessageContaining("Column nested_row already exists");
}

@Test
void testAlterAggregationTableColumnWithAggFunction() throws Exception {
TablePath tablePath = TablePath.of("test_db", "alter_aggregation_table_column");
Map<String, String> properties = new HashMap<>();
properties.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "aggregation");
TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("value", DataTypes.BIGINT(), AggFunctions.SUM())
.primaryKey("id")
.build())
.distributedBy(3, "id")
.properties(properties)
.build();
admin.createTable(tablePath, tableDescriptor, false).get();

admin.alterTable(
tablePath,
Collections.singletonList(
TableChange.addColumn(
"new_value",
DataTypes.BIGINT(),
"new aggregate column",
TableChange.ColumnPosition.last(),
AggFunctions.SUM())),
false)
.get();

SchemaInfo schemaInfo = admin.getTableSchema(tablePath).get();
assertThat(schemaInfo.getSchema().getAggFunction("new_value")).hasValue(AggFunctions.SUM());
}

@Test
void testAlterNonAggregationTableColumnWithAggFunction() throws Exception {
TablePath tablePath = TablePath.of("test_db", "alter_non_aggregation_table_column");
admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();

assertThatThrownBy(
() ->
admin.alterTable(
tablePath,
Collections.singletonList(
TableChange.addColumn(
"new_value",
DataTypes.BIGINT(),
"new aggregate column",
TableChange.ColumnPosition.last(),
AggFunctions.SUM())),
false)
.get())
.hasMessageContaining(
"Aggregation function is only supported for aggregation merge engine table");
}

@Test
void testCreateInvalidDatabaseAndTable() throws Exception {
assertThatThrownBy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import javax.annotation.Nullable;

import java.util.Objects;
import java.util.Optional;

/** {@link TableChange} represents the modification of the Fluss Table. */
public interface TableChange {
Expand All @@ -42,7 +43,21 @@ static AddColumn addColumn(
DataType dataType,
@Nullable String comment,
ColumnPosition position) {
return new AddColumn(columnName, dataType, comment, position);
return new AddColumn(columnName, dataType, comment, position, null);
}

/**
* A table change to add the column with specified position and aggregation function.
*
* @return a TableChange represents the modification.
*/
static AddColumn addColumn(
String columnName,
DataType dataType,
@Nullable String comment,
ColumnPosition position,
@Nullable AggFunction aggFunction) {
return new AddColumn(columnName, dataType, comment, position, aggFunction);
}

/**
Expand Down Expand Up @@ -230,15 +245,21 @@ class AddColumn implements SchemaChange {
private final String name;
private final DataType dataType;
private final @Nullable String comment;
private final @Nullable AggFunction aggFunction;

private final ColumnPosition position;

private AddColumn(
String name, DataType dataType, @Nullable String comment, ColumnPosition position) {
String name,
DataType dataType,
@Nullable String comment,
ColumnPosition position,
@Nullable AggFunction aggFunction) {
this.name = name;
this.dataType = dataType;
this.comment = comment;
this.position = position;
this.aggFunction = aggFunction;
}

public String getName() {
Expand All @@ -258,6 +279,10 @@ public ColumnPosition getPosition() {
return position;
}

public Optional<AggFunction> getAggFunction() {
return Optional.ofNullable(aggFunction);
}

@Override
public String toString() {
return "AddColumn{"
Expand All @@ -269,6 +294,8 @@ public String toString() {
+ ", comment='"
+ comment
+ '\''
+ ", aggFunction="
+ aggFunction
+ ", position="
+ position
+ '}';
Expand Down
2 changes: 2 additions & 0 deletions fluss-rpc/src/main/proto/FlussApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1154,6 +1154,8 @@ message PbAddColumn {
required bytes data_type_json = 2;
optional string comment = 3;
required int32 column_position_type = 4; // LAST=0,FIRST=1,AFTER=3
optional string agg_function_type = 5;
repeated PbKeyValue agg_function_params = 6;
}

message PbDropColumn {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import java.util.concurrent.Callable;

import static org.apache.fluss.server.utils.TableDescriptorValidation.validateAlterTableProperties;
import static org.apache.fluss.server.utils.TableDescriptorValidation.validateAlterTableSchema;

/** A manager for metadata. */
public class MetadataManager {
Expand Down Expand Up @@ -440,6 +441,7 @@ public void alterTableSchema(
if (!schemaChanges.isEmpty()) {
Schema newSchema =
SchemaUpdate.applySchemaChanges(table.getSchema(), schemaChanges);
validateAlterTableSchema(table, newSchema);
LakeCatalog.Context lakeCatalogContext =
new CoordinatorService.DefaultLakeCatalogContext(
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,12 @@ private SchemaUpdate addColumn(TableChange.AddColumn addColumn) {
}

// Delegate the actual addition to the builder
builder.column(addColumn.getName(), addColumn.getDataType());
if (addColumn.getAggFunction().isPresent()) {
builder.column(
addColumn.getName(), addColumn.getDataType(), addColumn.getAggFunction().get());
} else {
builder.column(addColumn.getName(), addColumn.getDataType());
}

// Fixed: Use null check for the String comment
String comment = addColumn.getComment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@
import org.apache.fluss.config.cluster.AlterConfigOpType;
import org.apache.fluss.config.cluster.ColumnPositionType;
import org.apache.fluss.config.cluster.ConfigEntry;
import org.apache.fluss.exception.InvalidConfigException;
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.fs.token.ObtainedSecurityToken;
import org.apache.fluss.lake.committer.LakeCommitResult;
import org.apache.fluss.metadata.AggFunction;
import org.apache.fluss.metadata.AggFunctionType;
import org.apache.fluss.metadata.AggFunctions;
import org.apache.fluss.metadata.DatabaseChange;
import org.apache.fluss.metadata.DatabaseSummary;
import org.apache.fluss.metadata.PartitionSpec;
Expand Down Expand Up @@ -355,17 +359,40 @@ public static List<TableChange> toAddColumns(List<PbAddColumn> addColumns) {
return addColumns.stream()
.filter(Objects::nonNull)
.map(
pbAddColumn ->
TableChange.addColumn(
pbAddColumn.getColumnName(),
JsonSerdeUtils.readValue(
pbAddColumn.getDataTypeJson(),
DataTypeJsonSerde.INSTANCE),
pbAddColumn.hasComment() ? pbAddColumn.getComment() : null,
toColumnPosition(pbAddColumn.getColumnPositionType())))
pbAddColumn -> {
AggFunction aggFunction = toAggFunction(pbAddColumn);
return TableChange.addColumn(
pbAddColumn.getColumnName(),
JsonSerdeUtils.readValue(
pbAddColumn.getDataTypeJson(),
DataTypeJsonSerde.INSTANCE),
pbAddColumn.hasComment() ? pbAddColumn.getComment() : null,
toColumnPosition(pbAddColumn.getColumnPositionType()),
aggFunction);
})
.collect(Collectors.toList());
}

private static AggFunction toAggFunction(PbAddColumn pbAddColumn) {
if (!pbAddColumn.hasAggFunctionType()) {
return null;
}

AggFunctionType type = AggFunctionType.fromString(pbAddColumn.getAggFunctionType());
if (type == null) {
throw new InvalidConfigException(
String.format(
"Unknown aggregation function type: %s",
pbAddColumn.getAggFunctionType()));
}

Map<String, String> parameters = new HashMap<>();
for (PbKeyValue parameter : pbAddColumn.getAggFunctionParamsList()) {
parameters.put(parameter.getKey(), parameter.getValue());
}
return AggFunctions.of(type, parameters);
}

public static List<TableChange.SchemaChange> toDropColumns(List<PbDropColumn> dropColumns) {
return dropColumns.stream()
.filter(Objects::nonNull)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.fluss.server.utils;

import org.apache.fluss.annotation.Internal;
import org.apache.fluss.config.ConfigOption;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
Expand Down Expand Up @@ -129,6 +130,19 @@ public static void validateTableDescriptor(
checkTableLakeFormatMatchesCluster(tableConf, clusterDataLakeFormat);
}

/** Validates the schema after altering table columns. */
@Internal
public static void validateAlterTableSchema(TableInfo table, Schema newSchema) {
if (table.getTableConfig()
.getMergeEngineType()
.map(MergeEngineType.AGGREGATION::equals)
.orElse(false)) {
validateAggregationFunctionParameters(newSchema);
} else {
validateNoAggregationFunctions(newSchema);
}
}

private static void checkTableLakeFormatMatchesCluster(
Configuration tableConf, @Nullable DataLakeFormat clusterDataLakeFormat) {
if (clusterDataLakeFormat == null) {
Expand Down Expand Up @@ -323,6 +337,9 @@ private static void checkArrowCompression(Configuration tableConf) {
private static void checkMergeEngine(
Configuration tableConf, boolean hasPrimaryKey, Schema schema) {
MergeEngineType mergeEngine = tableConf.get(ConfigOptions.TABLE_MERGE_ENGINE);
if (mergeEngine != MergeEngineType.AGGREGATION) {
validateNoAggregationFunctions(schema);
}
Comment thread
wzx140 marked this conversation as resolved.
if (mergeEngine != null) {
if (!hasPrimaryKey) {
throw new InvalidConfigException(
Expand Down Expand Up @@ -377,6 +394,20 @@ private static void checkMergeEngine(
}
}

/** Validates that the schema doesn't contain any aggregation functions. */
private static void validateNoAggregationFunctions(Schema schema) {
for (Schema.Column column : schema.getColumns()) {
Optional<AggFunction> aggFunction = column.getAggFunction();
if (aggFunction.isPresent()) {
throw new InvalidConfigException(
String.format(
"Aggregation function is only supported for aggregation merge engine table, "
+ "but column '%s' has aggregation function '%s'.",
column.getName(), aggFunction.get()));
}
}
}

/**
* Validates aggregation function parameters in the schema.
*
Expand Down
Loading