-
Notifications
You must be signed in to change notification settings - Fork 2.4k
[Enhancement] Add StarOSAgent PACK shard group support and ColocateRangeUtils for range colocate #71221
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[Enhancement] Add StarOSAgent PACK shard group support and ColocateRangeUtils for range colocate #71221
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,91 @@ | ||
| // Copyright 2021-present StarRocks, Inc. All rights reserved. | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // https://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| package com.starrocks.catalog; | ||
|
|
||
| import com.starrocks.common.Range; | ||
| import com.starrocks.type.Type; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
|
|
||
| /** | ||
| * Utility methods for range distribution colocate operations. | ||
| */ | ||
| public class ColocateRangeUtils { | ||
|
|
||
| /** | ||
| * Expands a colocate range (on colocate column prefix) to a full sort key range | ||
| * by appending sentinel variant values for the remaining sort key columns. | ||
| * | ||
| * <p>The sentinel choice depends on bound inclusiveness to preserve equivalence | ||
| * between the prefix range and the expanded full-key range: | ||
| * <ul> | ||
| * <li>Inclusive lower bound: append MIN (include from the first prefix-matching tuple)</li> | ||
| * <li>Exclusive lower bound: append MAX (skip past all prefix-matching tuples)</li> | ||
| * <li>Exclusive upper bound: append MIN (exclude from the first prefix-matching tuple)</li> | ||
| * <li>Inclusive upper bound: append MAX (include up to the last prefix-matching tuple)</li> | ||
| * </ul> | ||
| * | ||
| * <p>For example, with sort key (k1, k2, k3), colocate columns (k1): | ||
| * <ul> | ||
| * <li>[100, 200) -> [(100, MIN, MIN), (200, MIN, MIN))</li> | ||
| * <li>(100, 200] -> ((100, MAX, MAX), (200, MAX, MAX)]</li> | ||
| * </ul> | ||
| * | ||
| * <p>For ALL range (initial state), returns ALL directly without expansion. | ||
| * | ||
| * @param colocateRange the colocate range to expand | ||
| * @param sortKeyColumns the full sort key columns | ||
| * @param colocateColumnCount the number of colocate columns (prefix of sort key) | ||
| * @return the expanded range covering the full sort key | ||
| */ | ||
| public static Range<Tuple> expandToFullSortKey(Range<Tuple> colocateRange, | ||
| List<Column> sortKeyColumns, | ||
| int colocateColumnCount) { | ||
| if (colocateRange.isAll()) { | ||
| return Range.all(); | ||
| } | ||
| int remainingColumns = sortKeyColumns.size() - colocateColumnCount; | ||
| // Inclusive lower → extend with MIN; exclusive lower → extend with MAX | ||
| Tuple lowerBound = colocateRange.isMinimum() ? null | ||
| : extendTuple(colocateRange.getLowerBound(), sortKeyColumns, | ||
| colocateColumnCount, remainingColumns, | ||
| colocateRange.isLowerBoundIncluded()); | ||
| // Exclusive upper → extend with MIN; inclusive upper → extend with MAX | ||
| Tuple upperBound = colocateRange.isMaximum() ? null | ||
| : extendTuple(colocateRange.getUpperBound(), sortKeyColumns, | ||
| colocateColumnCount, remainingColumns, | ||
| !colocateRange.isUpperBoundIncluded()); | ||
| return Range.of(lowerBound, upperBound, | ||
| colocateRange.isLowerBoundIncluded(), | ||
| colocateRange.isUpperBoundIncluded()); | ||
| } | ||
|
|
||
| /** | ||
| * Extends a tuple with MIN or MAX sentinel values for the remaining sort key columns. | ||
| * | ||
| * @param useMin true to append MIN sentinels, false to append MAX sentinels | ||
| */ | ||
| private static Tuple extendTuple(Tuple tuple, List<Column> sortKeyColumns, | ||
| int colocateColumnCount, int remainingColumns, | ||
| boolean useMin) { | ||
| List<Variant> values = new ArrayList<>(tuple.getValues()); | ||
| for (int i = 0; i < remainingColumns; i++) { | ||
| Type type = sortKeyColumns.get(colocateColumnCount + i).getType(); | ||
| values.add(useMin ? Variant.minVariant(type) : Variant.maxVariant(type)); | ||
| } | ||
| return new Tuple(values); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -493,12 +493,17 @@ private long getWorkerIdByNodeIdInternal(long nodeId) { | |
| } | ||
|
|
||
| public long createShardGroup(long dbId, long tableId, long partitionId, long indexId) throws DdlException { | ||
| return createShardGroup(dbId, tableId, partitionId, indexId, PlacementPolicy.SPREAD); | ||
| } | ||
|
|
||
| public long createShardGroup(long dbId, long tableId, long partitionId, long indexId, | ||
| PlacementPolicy placementPolicy) throws DdlException { | ||
| prepare(); | ||
| List<ShardGroupInfo> shardGroupInfos = null; | ||
| try { | ||
| List<CreateShardGroupInfo> createShardGroupInfos = new ArrayList<>(); | ||
| createShardGroupInfos.add(CreateShardGroupInfo.newBuilder() | ||
| .setPolicy(PlacementPolicy.SPREAD) | ||
| .setPolicy(placementPolicy) | ||
| .putLabels("dbId", String.valueOf(dbId)) | ||
|
Comment on lines
+499
to
507
|
||
| .putLabels("tableId", String.valueOf(tableId)) | ||
| .putLabels("partitionId", String.valueOf(partitionId)) | ||
|
|
@@ -568,6 +573,15 @@ public List<Long> createShards(int numShards, FilePathInfo pathInfo, FileCacheIn | |
| @Nullable List<Long> matchShardIds, @NotNull Map<String, String> properties, | ||
| ComputeResource computeResource) | ||
| throws DdlException { | ||
| return createShards(numShards, pathInfo, cacheInfo, List.of(groupId), matchShardIds, properties, | ||
| computeResource); | ||
| } | ||
|
|
||
| public List<Long> createShards(int numShards, FilePathInfo pathInfo, FileCacheInfo cacheInfo, | ||
| List<Long> groupIds, @Nullable List<Long> matchShardIds, | ||
| @NotNull Map<String, String> properties, | ||
| ComputeResource computeResource) | ||
| throws DdlException { | ||
| if (matchShardIds != null) { | ||
| Preconditions.checkState(numShards == matchShardIds.size()); | ||
| } | ||
|
|
@@ -579,11 +593,13 @@ public List<Long> createShards(int numShards, FilePathInfo pathInfo, FileCacheIn | |
|
|
||
| CreateShardInfo.Builder builder = CreateShardInfo.newBuilder(); | ||
| builder.setReplicaCount(1) | ||
| .addGroupIds(groupId) | ||
| .setPathInfo(pathInfo) | ||
| .setCacheInfo(cacheInfo) | ||
| .putAllShardProperties(properties) | ||
| .setScheduleToWorkerGroup(workerGroupId); | ||
| for (long groupId : groupIds) { | ||
| builder.addGroupIds(groupId); | ||
| } | ||
|
Comment on lines
580
to
+602
|
||
|
|
||
| for (int i = 0; i < numShards; ++i) { | ||
| builder.setShardId(GlobalStateMgr.getCurrentState().getNextId()); | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,171 @@ | ||||||||||||||
| // Copyright 2021-present StarRocks, Inc. All rights reserved. | ||||||||||||||
| // | ||||||||||||||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||||||||||||||
| // you may not use this file except in compliance with the License. | ||||||||||||||
| // You may obtain a copy of the License at | ||||||||||||||
| // | ||||||||||||||
| // https://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||
| // | ||||||||||||||
| // Unless required by applicable law or agreed to in writing, software | ||||||||||||||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||||
| // See the License for the specific language governing permissions and | ||||||||||||||
| // limitations under the License. | ||||||||||||||
|
|
||||||||||||||
| package com.starrocks.catalog; | ||||||||||||||
|
|
||||||||||||||
| import com.starrocks.common.Range; | ||||||||||||||
| import com.starrocks.type.IntegerType; | ||||||||||||||
| import com.starrocks.type.VarcharType; | ||||||||||||||
| import org.junit.jupiter.api.Assertions; | ||||||||||||||
| import org.junit.jupiter.api.Test; | ||||||||||||||
|
|
||||||||||||||
| import java.util.Arrays; | ||||||||||||||
| import java.util.List; | ||||||||||||||
|
|
||||||||||||||
| public class ColocateRangeUtilsTest { | ||||||||||||||
|
|
||||||||||||||
| private static final List<Column> SORT_KEY_COLUMNS = Arrays.asList( | ||||||||||||||
| new Column("k1", IntegerType.INT), | ||||||||||||||
| new Column("k2", VarcharType.VARCHAR), | ||||||||||||||
| new Column("k3", IntegerType.BIGINT)); | ||||||||||||||
|
|
||||||||||||||
| private static Tuple makeTuple(int value) { | ||||||||||||||
| return new Tuple(Arrays.asList(Variant.of(IntegerType.INT, String.valueOf(value)))); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Test | ||||||||||||||
| public void testExpandAllRange() { | ||||||||||||||
| Range<Tuple> result = ColocateRangeUtils.expandToFullSortKey( | ||||||||||||||
| Range.all(), SORT_KEY_COLUMNS, 1); | ||||||||||||||
| Assertions.assertTrue(result.isAll()); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Test | ||||||||||||||
| public void testExpandAllRangeWithAllColocateColumns() { | ||||||||||||||
| Range<Tuple> result = ColocateRangeUtils.expandToFullSortKey( | ||||||||||||||
| Range.all(), SORT_KEY_COLUMNS, 3); | ||||||||||||||
| Assertions.assertTrue(result.isAll()); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // [100, 200) -> [(100, MIN, MIN), (200, MIN, MIN)) | ||||||||||||||
| @Test | ||||||||||||||
| public void testExpandInclusiveLowerExclusiveUpper() { | ||||||||||||||
| Range<Tuple> colocateRange = Range.gelt(makeTuple(100), makeTuple(200)); | ||||||||||||||
| Range<Tuple> result = ColocateRangeUtils.expandToFullSortKey( | ||||||||||||||
| colocateRange, SORT_KEY_COLUMNS, 1); | ||||||||||||||
|
|
||||||||||||||
| Assertions.assertTrue(result.isLowerBoundIncluded()); | ||||||||||||||
| Assertions.assertFalse(result.isUpperBoundIncluded()); | ||||||||||||||
|
|
||||||||||||||
| // Lower: (100, MIN, MIN) | ||||||||||||||
| Tuple lower = result.getLowerBound(); | ||||||||||||||
| Assertions.assertEquals(3, lower.getValues().size()); | ||||||||||||||
| Assertions.assertEquals("100", lower.getValues().get(0).getStringValue()); | ||||||||||||||
| Assertions.assertTrue(lower.getValues().get(1) instanceof MinVariant); | ||||||||||||||
| Assertions.assertTrue(lower.getValues().get(2) instanceof MinVariant); | ||||||||||||||
|
|
||||||||||||||
| // Upper: (200, MIN, MIN) | ||||||||||||||
| Tuple upper = result.getUpperBound(); | ||||||||||||||
| Assertions.assertEquals(3, upper.getValues().size()); | ||||||||||||||
| Assertions.assertEquals("200", upper.getValues().get(0).getStringValue()); | ||||||||||||||
| Assertions.assertTrue(upper.getValues().get(1) instanceof MinVariant); | ||||||||||||||
| Assertions.assertTrue(upper.getValues().get(2) instanceof MinVariant); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // (100, 200] -> ((100, MAX, MAX), (200, MAX, MAX)] | ||||||||||||||
| @Test | ||||||||||||||
| public void testExpandExclusiveLowerInclusiveUpper() { | ||||||||||||||
| Range<Tuple> colocateRange = Range.gtle(makeTuple(100), makeTuple(200)); | ||||||||||||||
| Range<Tuple> result = ColocateRangeUtils.expandToFullSortKey( | ||||||||||||||
| colocateRange, SORT_KEY_COLUMNS, 1); | ||||||||||||||
|
|
||||||||||||||
| Assertions.assertFalse(result.isLowerBoundIncluded()); | ||||||||||||||
| Assertions.assertTrue(result.isUpperBoundIncluded()); | ||||||||||||||
|
|
||||||||||||||
| // Lower: (100, MAX, MAX) | ||||||||||||||
| Tuple lower = result.getLowerBound(); | ||||||||||||||
| Assertions.assertEquals(3, lower.getValues().size()); | ||||||||||||||
| Assertions.assertEquals("100", lower.getValues().get(0).getStringValue()); | ||||||||||||||
| Assertions.assertTrue(lower.getValues().get(1) instanceof MaxVariant); | ||||||||||||||
| Assertions.assertTrue(lower.getValues().get(2) instanceof MaxVariant); | ||||||||||||||
|
|
||||||||||||||
| // Upper: (200, MAX, MAX) | ||||||||||||||
| Tuple upper = result.getUpperBound(); | ||||||||||||||
| Assertions.assertEquals(3, upper.getValues().size()); | ||||||||||||||
| Assertions.assertEquals("200", upper.getValues().get(0).getStringValue()); | ||||||||||||||
| Assertions.assertTrue(upper.getValues().get(1) instanceof MaxVariant); | ||||||||||||||
| Assertions.assertTrue(upper.getValues().get(2) instanceof MaxVariant); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // (-inf, 200) -> (-inf, (200, MIN, MIN)) | ||||||||||||||
| @Test | ||||||||||||||
| public void testExpandLowerUnboundedExclusiveUpper() { | ||||||||||||||
| Range<Tuple> colocateRange = Range.lt(makeTuple(200)); | ||||||||||||||
| Range<Tuple> result = ColocateRangeUtils.expandToFullSortKey( | ||||||||||||||
| colocateRange, SORT_KEY_COLUMNS, 1); | ||||||||||||||
|
|
||||||||||||||
| Assertions.assertTrue(result.isMinimum()); | ||||||||||||||
| Assertions.assertFalse(result.isUpperBoundIncluded()); | ||||||||||||||
|
|
||||||||||||||
| Tuple upper = result.getUpperBound(); | ||||||||||||||
| Assertions.assertEquals(3, upper.getValues().size()); | ||||||||||||||
| Assertions.assertTrue(upper.getValues().get(1) instanceof MinVariant); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // (-inf, 200] -> (-inf, (200, MAX, MAX)] | ||||||||||||||
| @Test | ||||||||||||||
| public void testExpandLowerUnboundedInclusiveUpper() { | ||||||||||||||
| Range<Tuple> colocateRange = Range.le(makeTuple(200)); | ||||||||||||||
| Range<Tuple> result = ColocateRangeUtils.expandToFullSortKey( | ||||||||||||||
| colocateRange, SORT_KEY_COLUMNS, 1); | ||||||||||||||
|
|
||||||||||||||
| Assertions.assertTrue(result.isMinimum()); | ||||||||||||||
| Assertions.assertTrue(result.isUpperBoundIncluded()); | ||||||||||||||
|
|
||||||||||||||
| Tuple upper = result.getUpperBound(); | ||||||||||||||
| Assertions.assertEquals(3, upper.getValues().size()); | ||||||||||||||
| Assertions.assertTrue(upper.getValues().get(1) instanceof MaxVariant); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // [100, +inf) -> [(100, MIN, MIN), +inf) | ||||||||||||||
| @Test | ||||||||||||||
| public void testExpandInclusiveLowerUpperUnbounded() { | ||||||||||||||
| Range<Tuple> colocateRange = Range.ge(makeTuple(100)); | ||||||||||||||
| Range<Tuple> result = ColocateRangeUtils.expandToFullSortKey( | ||||||||||||||
| colocateRange, SORT_KEY_COLUMNS, 1); | ||||||||||||||
|
|
||||||||||||||
| Assertions.assertTrue(result.isMaximum()); | ||||||||||||||
| Assertions.assertTrue(result.isLowerBoundIncluded()); | ||||||||||||||
|
|
||||||||||||||
| Tuple lower = result.getLowerBound(); | ||||||||||||||
| Assertions.assertEquals(3, lower.getValues().size()); | ||||||||||||||
| Assertions.assertTrue(lower.getValues().get(1) instanceof MinVariant); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // (100, +inf) -> ((100, MAX, MAX), +inf) | ||||||||||||||
| @Test | ||||||||||||||
| public void testExpandExclusiveLowerUpperUnbounded() { | ||||||||||||||
| Range<Tuple> colocateRange = Range.gt(makeTuple(100)); | ||||||||||||||
| Range<Tuple> result = ColocateRangeUtils.expandToFullSortKey( | ||||||||||||||
| colocateRange, SORT_KEY_COLUMNS, 1); | ||||||||||||||
|
|
||||||||||||||
| Assertions.assertTrue(result.isMaximum()); | ||||||||||||||
| Assertions.assertFalse(result.isLowerBoundIncluded()); | ||||||||||||||
|
|
||||||||||||||
| Tuple lower = result.getLowerBound(); | ||||||||||||||
| Assertions.assertEquals(3, lower.getValues().size()); | ||||||||||||||
| Assertions.assertTrue(lower.getValues().get(1) instanceof MaxVariant); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Test | ||||||||||||||
| public void testExpandNoRemainingColumns() { | ||||||||||||||
| Range<Tuple> colocateRange = Range.gelt(makeTuple(100), makeTuple(200)); | ||||||||||||||
| Range<Tuple> result = ColocateRangeUtils.expandToFullSortKey( | ||||||||||||||
| colocateRange, SORT_KEY_COLUMNS, 3); | ||||||||||||||
|
|
||||||||||||||
| // No expansion, tuple size unchanged | ||||||||||||||
|
Comment on lines
+165
to
+167
|
||||||||||||||
| colocateRange, SORT_KEY_COLUMNS, 3); | |
| // No expansion, tuple size unchanged | |
| colocateRange, SORT_KEY_COLUMNS.subList(0, 1), 1); | |
| // No expansion, tuple size unchanged for a single-column sort key |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expandToFullSortKeydoes not validatecolocateColumnCountrelative tosortKeyColumns.size(). IfcolocateColumnCountis negative or greater than the sort-key length,remainingColumnsbecomes invalid and the method silently returns an incorrectly-shaped range. Add argument checks (0 <= colocateColumnCount <= sortKeyColumns.size()) and consider validating the provided bound tuple lengths matchcolocateColumnCountwhen the corresponding side is bounded.