Skip to content

Commit 73404df

Browse files
authored
[opt](nereids) support order by clause in create table (#60684)
### What problem does this PR solve? Problem Summary: before in pr : #24788 Previously, CLUSTER BY was used to define sort columns but with limited syntax (ASC only, no sort order control). This PR changes it to ORDER BY, which is more intuitive and flexible. Users can now explicitly specify sort direction and nulls order for each column. The default remains ASC with NULLS FIRST for column order. and support order by clause in iceberg table ``` CREATE TABLE `test_table2` ( `id` int NULL, `name` text NULL, `score` double NULL, `create_time` datetimev2(6) NULL ) ENGINE=ICEBERG_EXTERNAL_TABLE ORDER BY (`id` ASC NULLS FIRST, `score` DESC NULLS LAST) LOCATION 's3a://warehouse/wh/test_with_sr/test_table2' PROPERTIES ( "write-format" = "ORC", "doris.version" = "doris-0.0.0-2fa88d38b0", "write.parquet.compression-codec" = "zstd" ); ```
1 parent eccb882 commit 73404df

File tree

141 files changed

+650
-325
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

141 files changed

+650
-325
lines changed

fe/fe-common/src/main/java/org/apache/doris/common/Config.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -644,9 +644,9 @@ public class Config extends ConfigBase {
644644
"Default timeout for insert load job, in seconds."})
645645
public static int insert_load_default_timeout_second = 14400; // 4 hour
646646

647-
@ConfField(mutable = true, masterOnly = true, description = {"对 mow 表随机设置 cluster keys,用于测试",
648-
"random set cluster keys for mow table for test"})
649-
public static boolean random_add_cluster_keys_for_mow = false;
647+
@ConfField(mutable = true, masterOnly = true, description = {"对 mow 表随机设置 order by keys,用于测试",
648+
"random set order by keys for mow table for test"})
649+
public static boolean random_add_order_by_keys_for_mow = false;
650650

651651
@ConfField(mutable = true, masterOnly = true, description = {
652652
"在 fuzzy 测试中随机选择部分表使用 V3 storage_format(ext_meta),用于增强覆盖",

fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,8 @@ supportedCreateStatement
185185
: CREATE (EXTERNAL | TEMPORARY)? TABLE (IF NOT EXISTS)? name=multipartIdentifier
186186
((ctasCols=identifierList)? | (LEFT_PAREN columnDefs (COMMA indexDefs)? COMMA? RIGHT_PAREN))
187187
(ENGINE EQ engine=identifier)?
188-
((AGGREGATE | UNIQUE | DUPLICATE) KEY keys=identifierList
189-
(CLUSTER BY clusterKeys=identifierList)?)?
188+
((AGGREGATE | UNIQUE | DUPLICATE) KEY keys=identifierList)?
189+
(ORDER BY LEFT_PAREN sortItems+=sortItem (COMMA sortItems+=sortItem)* RIGHT_PAREN)?
190190
(COMMENT STRING_LITERAL)?
191191
(partition=partitionTable)?
192192
(DISTRIBUTED BY (HASH hashKeys=identifierList | RANDOM)

fe/fe-core/src/main/java/org/apache/doris/DorisFE.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -593,8 +593,8 @@ private static void fuzzyConfigs() {
593593
// Keep global fuzzy knobs that are not session-based.
594594
if (Config.fuzzy_test_type.equalsIgnoreCase("daily")
595595
|| Config.fuzzy_test_type.equalsIgnoreCase("rqg")) {
596-
Config.random_add_cluster_keys_for_mow = (LocalDate.now().getDayOfMonth() % 2 == 0);
597-
LOG.info("fuzzy set random_add_cluster_keys_for_mow={}", Config.random_add_cluster_keys_for_mow);
596+
Config.random_add_order_by_keys_for_mow = (LocalDate.now().getDayOfMonth() % 2 == 0);
597+
LOG.info("fuzzy set random_add_order_by_keys_for_mow={}", Config.random_add_order_by_keys_for_mow);
598598
}
599599

600600
Config.enable_txn_log_outside_lock = new Random().nextBoolean();

fe/fe-core/src/main/java/org/apache/doris/analysis/KeysDesc.java

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
public class KeysDesc {
2929
private KeysType type;
3030
private List<String> keysColumnNames;
31-
private List<String> clusterKeysColumnNames;
31+
private List<String> orderByKeysColumnNames;
3232

3333
public KeysDesc() {
3434
this.type = KeysType.AGG_KEYS;
@@ -40,9 +40,9 @@ public KeysDesc(KeysType type, List<String> keysColumnNames) {
4040
this.keysColumnNames = keysColumnNames;
4141
}
4242

43-
public KeysDesc(KeysType type, List<String> keysColumnNames, List<String> clusterKeyColumnNames) {
43+
public KeysDesc(KeysType type, List<String> keysColumnNames, List<String> orderByKeysColumnNames) {
4444
this(type, keysColumnNames);
45-
this.clusterKeysColumnNames = clusterKeyColumnNames;
45+
this.orderByKeysColumnNames = orderByKeysColumnNames;
4646
}
4747

4848
public KeysType getKeysType() {
@@ -53,8 +53,8 @@ public int keysColumnSize() {
5353
return keysColumnNames.size();
5454
}
5555

56-
public List<String> getClusterKeysColumnNames() {
57-
return clusterKeysColumnNames;
56+
public List<String> getOrderByKeysColumnNames() {
57+
return orderByKeysColumnNames;
5858
}
5959

6060
public boolean containsCol(String colName) {
@@ -107,39 +107,39 @@ public void analyze(List<ColumnDef> cols) throws AnalysisException {
107107
}
108108
}
109109

110-
if (clusterKeysColumnNames != null) {
111-
analyzeClusterKeys(cols);
110+
if (orderByKeysColumnNames != null) {
111+
analyzeOrderByKeys(cols);
112112
}
113113
}
114114

115-
private void analyzeClusterKeys(List<ColumnDef> cols) throws AnalysisException {
115+
private void analyzeOrderByKeys(List<ColumnDef> cols) throws AnalysisException {
116116
if (type != KeysType.UNIQUE_KEYS) {
117-
throw new AnalysisException("Cluster keys only support unique keys table");
117+
throw new AnalysisException("Order by keys only support unique keys table");
118118
}
119-
// check that cluster keys is not duplicated
120-
for (int i = 0; i < clusterKeysColumnNames.size(); i++) {
121-
String name = clusterKeysColumnNames.get(i);
119+
// check that order by keys is not duplicated
120+
for (int i = 0; i < orderByKeysColumnNames.size(); i++) {
121+
String name = orderByKeysColumnNames.get(i);
122122
for (int j = 0; j < i; j++) {
123-
if (clusterKeysColumnNames.get(j).equalsIgnoreCase(name)) {
124-
throw new AnalysisException("Duplicate cluster key column[" + name + "].");
123+
if (orderByKeysColumnNames.get(j).equalsIgnoreCase(name)) {
124+
throw new AnalysisException("Duplicate order by key column[" + name + "].");
125125
}
126126
}
127127
}
128-
// check that cluster keys is not equal to primary keys
129-
int minKeySize = Math.min(keysColumnNames.size(), clusterKeysColumnNames.size());
128+
// check that order by keys is not equal to primary keys
129+
int minKeySize = Math.min(keysColumnNames.size(), orderByKeysColumnNames.size());
130130
boolean sameKey = true;
131131
for (int i = 0; i < minKeySize; i++) {
132-
if (!keysColumnNames.get(i).equalsIgnoreCase(clusterKeysColumnNames.get(i))) {
132+
if (!keysColumnNames.get(i).equalsIgnoreCase(orderByKeysColumnNames.get(i))) {
133133
sameKey = false;
134134
break;
135135
}
136136
}
137-
if (sameKey && !Config.random_add_cluster_keys_for_mow) {
137+
if (sameKey && !Config.random_add_order_by_keys_for_mow) {
138138
throw new AnalysisException("Unique keys and cluster keys should be different.");
139139
}
140140
// check that cluster key column exists
141-
for (int i = 0; i < clusterKeysColumnNames.size(); i++) {
142-
String name = clusterKeysColumnNames.get(i);
141+
for (int i = 0; i < orderByKeysColumnNames.size(); i++) {
142+
String name = orderByKeysColumnNames.get(i);
143143
for (int j = 0; j < cols.size(); j++) {
144144
if (cols.get(j).getName().equalsIgnoreCase(name)) {
145145
cols.get(j).setClusterKeyId(i);
@@ -164,10 +164,10 @@ public String toSql() {
164164
i++;
165165
}
166166
stringBuilder.append(")");
167-
if (clusterKeysColumnNames != null) {
168-
stringBuilder.append("\nCLUSTER BY (");
167+
if (orderByKeysColumnNames != null) {
168+
stringBuilder.append("\nORDER BY (");
169169
i = 0;
170-
for (String columnName : clusterKeysColumnNames) {
170+
for (String columnName : orderByKeysColumnNames) {
171171
if (i != 0) {
172172
stringBuilder.append(", ");
173173
}

fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4037,9 +4037,9 @@ public static void getCreateTableLikeStmt(CreateTableLikeInfo createTableLikeInf
40374037
}
40384038
}
40394039
sb.append(Joiner.on(", ").join(keysColumnNames)).append(")");
4040-
// show cluster keys
4040+
// show order keys
40414041
if (!clusterKeysColumnNamesToId.isEmpty()) {
4042-
sb.append("\n").append("CLUSTER BY (`");
4042+
sb.append("\n").append("ORDER BY (`");
40434043
sb.append(Joiner.on("`, `").join(clusterKeysColumnNamesToId.values())).append("`)");
40444044
}
40454045
}
@@ -4264,6 +4264,9 @@ public static void getCreateTableLikeStmt(CreateTableLikeInfo createTableLikeInf
42644264
} else if (table.getType() == TableType.ICEBERG_EXTERNAL_TABLE) {
42654265
addTableComment(table, sb);
42664266
IcebergExternalTable icebergExternalTable = (IcebergExternalTable) table;
4267+
if (icebergExternalTable.hasSortOrder()) {
4268+
sb.append("\n").append(icebergExternalTable.getSortOrderSql());
4269+
}
42674270
sb.append("\nLOCATION '").append(icebergExternalTable.location()).append("'");
42684271
sb.append("\nPROPERTIES (");
42694272
Iterator<Entry<String, String>> iterator = icebergExternalTable.properties().entrySet().iterator();
@@ -4446,9 +4449,9 @@ public static void getDdlStmt(Command command, String dbName, TableIf table, Lis
44464449
}
44474450
}
44484451
sb.append(Joiner.on(", ").join(keysColumnNames)).append(")");
4449-
// show cluster keys
4452+
// show order keys
44504453
if (!clusterKeysColumnNamesToId.isEmpty()) {
4451-
sb.append("\n").append("CLUSTER BY (`");
4454+
sb.append("\n").append("ORDER BY (`");
44524455
sb.append(Joiner.on("`, `").join(clusterKeysColumnNamesToId.values())).append("`)");
44534456
}
44544457
}
@@ -4679,6 +4682,9 @@ public static void getDdlStmt(Command command, String dbName, TableIf table, Lis
46794682
} else if (table.getType() == TableType.ICEBERG_EXTERNAL_TABLE) {
46804683
addTableComment(table, sb);
46814684
IcebergExternalTable icebergExternalTable = (IcebergExternalTable) table;
4685+
if (icebergExternalTable.hasSortOrder()) {
4686+
sb.append("\n").append(icebergExternalTable.getSortOrderSql());
4687+
}
46824688
sb.append("\nLOCATION '").append(icebergExternalTable.location()).append("'");
46834689
sb.append("\nPROPERTIES (");
46844690
Iterator<Entry<String, String>> iterator = icebergExternalTable.properties().entrySet().iterator();
@@ -5383,7 +5389,7 @@ public static short calcShortKeyColumnCount(List<Column> columns, Map<String, St
53835389
break;
53845390
}
53855391
}
5386-
if (sameKey && !Config.random_add_cluster_keys_for_mow) {
5392+
if (sameKey && !Config.random_add_order_by_keys_for_mow) {
53875393
throw new DdlException(shortKeyColumnCount + " short keys is a part of unique keys");
53885394
}
53895395
}

fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2549,8 +2549,8 @@ private boolean createOlapTable(Database db, CreateTableInfo createTableInfo) th
25492549
olapTable.setStorageDictPageSize(storageDictPageSize);
25502550

25512551
// check data sort properties
2552-
int keyColumnSize = CollectionUtils.isEmpty(keysDesc.getClusterKeysColumnNames()) ? keysDesc.keysColumnSize() :
2553-
keysDesc.getClusterKeysColumnNames().size();
2552+
int keyColumnSize = CollectionUtils.isEmpty(keysDesc.getOrderByKeysColumnNames()) ? keysDesc.keysColumnSize() :
2553+
keysDesc.getOrderByKeysColumnNames().size();
25542554
DataSortInfo dataSortInfo = PropertyAnalyzer.analyzeDataSortInfo(properties, keysType,
25552555
keyColumnSize, storageFormat);
25562556
olapTable.setDataSortInfo(dataSortInfo);
@@ -2563,7 +2563,7 @@ private boolean createOlapTable(Database db, CreateTableInfo createTableInfo) th
25632563
throw new DdlException(e.getMessage());
25642564
}
25652565
if (enableUniqueKeyMergeOnWrite && !enableLightSchemaChange && !CollectionUtils.isEmpty(
2566-
keysDesc.getClusterKeysColumnNames())) {
2566+
keysDesc.getOrderByKeysColumnNames())) {
25672567
throw new DdlException(
25682568
"Unique merge-on-write tables with cluster keys require light schema change to be enabled.");
25692569
}

fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.doris.mtmv.MTMVRelatedTableIf;
3939
import org.apache.doris.mtmv.MTMVSnapshotIdSnapshot;
4040
import org.apache.doris.mtmv.MTMVSnapshotIf;
41+
import org.apache.doris.nereids.trees.plans.commands.info.SortFieldInfo;
4142
import org.apache.doris.statistics.AnalysisInfo;
4243
import org.apache.doris.statistics.BaseAnalysisTask;
4344
import org.apache.doris.statistics.ExternalAnalysisTask;
@@ -377,4 +378,38 @@ public boolean isPartitionedTable() {
377378
Table table = getIcebergTable();
378379
return table.spec().isPartitioned();
379380
}
381+
382+
/**
383+
* Get sort order SQL clause from iceberg table
384+
* @return SQL string representing ORDER BY clause, or empty string if no sort order
385+
*/
386+
public String getSortOrderSql() {
387+
Table table = getIcebergTable();
388+
org.apache.iceberg.SortOrder sortOrder = table.sortOrder();
389+
if (sortOrder == null || sortOrder.isUnsorted() || sortOrder.fields().isEmpty()) {
390+
return "";
391+
}
392+
393+
List<String> sortItems = new java.util.ArrayList<>();
394+
for (org.apache.iceberg.SortField sortField : sortOrder.fields()) {
395+
String columnName = table.schema().findColumnName(sortField.sourceId());
396+
if (columnName != null) {
397+
boolean isAscending = sortField.direction() != org.apache.iceberg.SortDirection.DESC;
398+
boolean isNullFirst = sortField.nullOrder() == org.apache.iceberg.NullOrder.NULLS_FIRST;
399+
SortFieldInfo sortFieldInfo = new SortFieldInfo(columnName, isAscending, isNullFirst);
400+
sortItems.add(sortFieldInfo.toSql());
401+
}
402+
}
403+
return "ORDER BY (" + String.join(", ", sortItems) + ")";
404+
}
405+
406+
/**
407+
* Check if table has sort order defined
408+
* @return true if table has sort order
409+
*/
410+
public boolean hasSortOrder() {
411+
Table table = getIcebergTable();
412+
org.apache.iceberg.SortOrder sortOrder = table.sortOrder();
413+
return sortOrder != null && !sortOrder.isUnsorted();
414+
}
380415
}

fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.doris.nereids.trees.plans.commands.info.DropPartitionFieldOp;
4545
import org.apache.doris.nereids.trees.plans.commands.info.DropTagInfo;
4646
import org.apache.doris.nereids.trees.plans.commands.info.ReplacePartitionFieldOp;
47+
import org.apache.doris.nereids.trees.plans.commands.info.SortFieldInfo;
4748
import org.apache.doris.nereids.trees.plans.commands.info.TagOptions;
4849

4950
import com.google.common.annotations.VisibleForTesting;
@@ -355,7 +356,17 @@ public boolean performCreateTable(CreateTableInfo createTableInfo) throws UserEx
355356
properties.put(ExternalCatalog.DORIS_VERSION, ExternalCatalog.DORIS_VERSION_VALUE);
356357
PartitionSpec partitionSpec = IcebergUtils.solveIcebergPartitionSpec(createTableInfo.getPartitionDesc(),
357358
schema);
358-
catalog.createTable(getTableIdentifier(dbName, tableName), schema, partitionSpec, properties);
359+
// Build and create table with optional sort order
360+
org.apache.iceberg.SortOrder sortOrder = buildSortOrder(createTableInfo.getSortOrderFields(), schema);
361+
if (sortOrder != null && !sortOrder.isUnsorted()) {
362+
catalog.buildTable(getTableIdentifier(dbName, tableName), schema)
363+
.withPartitionSpec(partitionSpec)
364+
.withProperties(properties)
365+
.withSortOrder(sortOrder)
366+
.create();
367+
} else {
368+
catalog.createTable(getTableIdentifier(dbName, tableName), schema, partitionSpec, properties);
369+
}
359370
return false;
360371
}
361372

@@ -1012,6 +1023,34 @@ private void performDropView(String remoteDbName, String remoteViewName) throws
10121023
ViewCatalog viewCatalog = (ViewCatalog) catalog;
10131024
viewCatalog.dropView(getTableIdentifier(remoteDbName, remoteViewName));
10141025
}
1026+
1027+
/**
1028+
* Build Iceberg SortOrder from SortFieldInfo list
1029+
*/
1030+
private org.apache.iceberg.SortOrder buildSortOrder(List<SortFieldInfo> sortFields, Schema schema) {
1031+
if (sortFields == null || sortFields.isEmpty()) {
1032+
return null;
1033+
}
1034+
1035+
org.apache.iceberg.SortOrder.Builder builder = org.apache.iceberg.SortOrder.builderFor(schema);
1036+
for (SortFieldInfo sortField : sortFields) {
1037+
String columnName = sortField.getColumnName();
1038+
if (sortField.isAscending()) {
1039+
if (sortField.isNullFirst()) {
1040+
builder.asc(columnName, org.apache.iceberg.NullOrder.NULLS_FIRST);
1041+
} else {
1042+
builder.asc(columnName, org.apache.iceberg.NullOrder.NULLS_LAST);
1043+
}
1044+
} else {
1045+
if (sortField.isNullFirst()) {
1046+
builder.desc(columnName, org.apache.iceberg.NullOrder.NULLS_FIRST);
1047+
} else {
1048+
builder.desc(columnName, org.apache.iceberg.NullOrder.NULLS_LAST);
1049+
}
1050+
}
1051+
}
1052+
return builder.build();
1053+
}
10151054
}
10161055

10171056

0 commit comments

Comments
 (0)