Skip to content

Commit 6fdbe9f

Browse files
liaoxin01Your Name
authored andcommitted
[opt](rowset) Aggregate non-MOW segment key bounds to reduce rowset meta size (#62604)
For non-MOW (duplicate / aggregate key) tables, per-segment key bounds are not consumed on the read path — only the rowset-level [min, max] is used by the reader and ordered compaction. In cloud mode, persisting bounds for every segment can blow past FDB's value size limit on commit_rowset. Introduce an `enable_aggregate_non_mow_key_bounds` BE config (default off). When enabled, non-MOW rowsets collapse per-segment bounds into a single [overall_min, overall_max] entry at write time, and compaction preserves this behavior. MOW rowsets always retain per-segment bounds — their `lookup_row_key` path relies on them for delete bitmap computation, and is guarded by a new DCHECK against aggregated input. A new optional `segments_key_bounds_aggregated` flag is added to both RowsetMetaPB and RowsetMetaCloudPB so consumers can distinguish aggregated from per-segment layouts. Proto round-trip, pb_convert, snapshot restore, and index builder all preserve both this flag and the existing `segments_key_bounds_truncated` flag. Correctness notes: - `first_key/last_key` callers (`block_reader`, ordered compaction) already bail out on overlapping rowsets, so for non-overlapping rowsets the aggregated [min, max] equals seg[0].min / seg[last].max exactly. - `merge_rowset_meta` (MOW partial-update publish) DCHECKs both sides are non-aggregated.
1 parent 74a9944 commit 6fdbe9f

17 files changed

Lines changed: 478 additions & 20 deletions

be/src/cloud/cloud_snapshot_mgr.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,14 @@ Status CloudSnapshotMgr::_create_rowset_meta(
276276
for (const auto& key_bound : source_meta_pb.segments_key_bounds()) {
277277
*new_rowset_meta_pb->add_segments_key_bounds() = key_bound;
278278
}
279+
if (source_meta_pb.has_segments_key_bounds_truncated()) {
280+
new_rowset_meta_pb->set_segments_key_bounds_truncated(
281+
source_meta_pb.segments_key_bounds_truncated());
282+
}
283+
if (source_meta_pb.has_segments_key_bounds_aggregated()) {
284+
new_rowset_meta_pb->set_segments_key_bounds_aggregated(
285+
source_meta_pb.segments_key_bounds_aggregated());
286+
}
279287
if (source_meta_pb.has_delete_predicate()) {
280288
DeletePredicatePB* new_delete_condition = new_rowset_meta_pb->mutable_delete_predicate();
281289
*new_delete_condition = source_meta_pb.delete_predicate();

be/src/cloud/pb_convert.cpp

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,12 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, const RowsetMetaPB& in)
7979
}
8080
out->set_txn_expiration(in.txn_expiration());
8181
out->set_segments_overlap_pb(in.segments_overlap_pb());
82-
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
82+
if (in.has_segments_key_bounds_truncated()) {
83+
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
84+
}
85+
if (in.has_segments_key_bounds_aggregated()) {
86+
out->set_segments_key_bounds_aggregated(in.segments_key_bounds_aggregated());
87+
}
8388
out->mutable_num_segment_rows()->CopyFrom(in.num_segment_rows());
8489
out->mutable_segments_file_size()->CopyFrom(in.segments_file_size());
8590
out->set_index_id(in.index_id());
@@ -157,7 +162,12 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) {
157162
}
158163
out->set_txn_expiration(in.txn_expiration());
159164
out->set_segments_overlap_pb(in.segments_overlap_pb());
160-
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
165+
if (in.has_segments_key_bounds_truncated()) {
166+
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
167+
}
168+
if (in.has_segments_key_bounds_aggregated()) {
169+
out->set_segments_key_bounds_aggregated(in.segments_key_bounds_aggregated());
170+
}
161171
out->mutable_num_segment_rows()->Swap(in.mutable_num_segment_rows());
162172
out->mutable_segments_file_size()->Swap(in.mutable_segments_file_size());
163173
out->set_index_id(in.index_id());
@@ -247,7 +257,12 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in)
247257
}
248258
out->set_txn_expiration(in.txn_expiration());
249259
out->set_segments_overlap_pb(in.segments_overlap_pb());
250-
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
260+
if (in.has_segments_key_bounds_truncated()) {
261+
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
262+
}
263+
if (in.has_segments_key_bounds_aggregated()) {
264+
out->set_segments_key_bounds_aggregated(in.segments_key_bounds_aggregated());
265+
}
251266
out->mutable_num_segment_rows()->CopyFrom(in.num_segment_rows());
252267
out->mutable_segments_file_size()->CopyFrom(in.segments_file_size());
253268
out->set_index_id(in.index_id());
@@ -325,7 +340,12 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in) {
325340
}
326341
out->set_txn_expiration(in.txn_expiration());
327342
out->set_segments_overlap_pb(in.segments_overlap_pb());
328-
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
343+
if (in.has_segments_key_bounds_truncated()) {
344+
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
345+
}
346+
if (in.has_segments_key_bounds_aggregated()) {
347+
out->set_segments_key_bounds_aggregated(in.segments_key_bounds_aggregated());
348+
}
329349
out->mutable_num_segment_rows()->Swap(in.mutable_num_segment_rows());
330350
out->mutable_segments_file_size()->Swap(in.mutable_segments_file_size());
331351
out->set_index_id(in.index_id());

be/src/common/config.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1618,6 +1618,10 @@ DEFINE_mBool(enable_fetch_rowsets_from_peer_replicas, "false");
16181618
DEFINE_mInt32(segments_key_bounds_truncation_threshold, "-1");
16191619
// ATTENTION: for test only, use random segments key bounds truncation threshold every time
16201620
DEFINE_mBool(random_segments_key_bounds_truncation, "false");
1621+
1622+
// If true, non-MOW rowsets store a single aggregated [rowset_min, rowset_max]
1623+
// key-bounds entry instead of per-segment bounds, to reduce meta size on cloud FDB.
1624+
DEFINE_mBool(enable_aggregate_non_mow_key_bounds, "true");
16211625
// p0, daily, rqg, external
16221626
DEFINE_String(fuzzy_test_type, "");
16231627

be/src/common/config.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1709,6 +1709,10 @@ DECLARE_mInt32(segments_key_bounds_truncation_threshold);
17091709
// ATTENTION: for test only, use random segments key bounds truncation threshold every time
17101710
DECLARE_mBool(random_segments_key_bounds_truncation);
17111711

1712+
// If true, non-MOW rowsets store a single aggregated [rowset_min, rowset_max]
1713+
// key-bounds entry instead of per-segment bounds, to reduce meta size on cloud FDB.
1714+
DECLARE_mBool(enable_aggregate_non_mow_key_bounds);
1715+
17121716
DECLARE_mBool(enable_auto_clone_on_compaction_missing_version);
17131717

17141718
DECLARE_mBool(enable_auto_clone_on_mow_publish_missing_version);

be/src/olap/base_tablet.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,18 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema* latest
465465
std::vector<KeyBoundsPB> segments_key_bounds;
466466
rs->rowset_meta()->get_segments_key_bounds(&segments_key_bounds);
467467
int num_segments = cast_set<int>(rs->num_segments());
468-
DCHECK_EQ(segments_key_bounds.size(), num_segments);
468+
// MOW lookup requires per-segment bounds. Aggregation must be disabled
469+
// for MOW writers, but enforce at runtime too — indexing segments_key_bounds[j]
470+
// below would be out-of-bounds otherwise.
471+
if (UNLIKELY(rs->rowset_meta()->is_segments_key_bounds_aggregated() ||
472+
static_cast<int>(segments_key_bounds.size()) != num_segments)) {
473+
return Status::InternalError(
474+
"MOW lookup got rowset with inconsistent segments_key_bounds, rowset_id={}, "
475+
"aggregated={}, bounds_size={}, num_segments={}",
476+
rs->rowset_id().to_string(),
477+
rs->rowset_meta()->is_segments_key_bounds_aggregated(),
478+
segments_key_bounds.size(), num_segments);
479+
}
469480
std::vector<uint32_t> picked_segments;
470481
for (int j = num_segments - 1; j >= 0; j--) {
471482
if (key_is_not_in_segment(key_without_seq, segments_key_bounds[j],

be/src/olap/compaction.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,13 +363,15 @@ Status CompactionMixin::do_compact_ordered_rowsets() {
363363
// link data to new rowset
364364
auto seg_id = 0;
365365
bool segments_key_bounds_truncated {false};
366+
bool any_input_aggregated {false};
366367
std::vector<KeyBoundsPB> segment_key_bounds;
367368
std::vector<uint32_t> num_segment_rows;
368369
for (auto rowset : _input_rowsets) {
369370
RETURN_IF_ERROR(rowset->link_files_to(tablet()->tablet_path(),
370371
_output_rs_writer->rowset_id(), seg_id));
371372
seg_id += rowset->num_segments();
372373
segments_key_bounds_truncated |= rowset->is_segments_key_bounds_truncated();
374+
any_input_aggregated |= rowset->rowset_meta()->is_segments_key_bounds_aggregated();
373375
std::vector<KeyBoundsPB> key_bounds;
374376
RETURN_IF_ERROR(rowset->get_segments_key_bounds(&key_bounds));
375377
segment_key_bounds.insert(segment_key_bounds.end(), key_bounds.begin(), key_bounds.end());
@@ -389,7 +391,13 @@ Status CompactionMixin::do_compact_ordered_rowsets() {
389391
rowset_meta->set_segments_overlap(NONOVERLAPPING);
390392
rowset_meta->set_rowset_state(VISIBLE);
391393
rowset_meta->set_segments_key_bounds_truncated(segments_key_bounds_truncated);
392-
rowset_meta->set_segments_key_bounds(segment_key_bounds);
394+
// If any input was already aggregated we have no way to recover per-segment
395+
// bounds, so force aggregation on the output to keep the layout consistent
396+
// with `num_segments` / the aggregated flag, even if the config is off now.
397+
bool aggregate_key_bounds =
398+
any_input_aggregated || (config::enable_aggregate_non_mow_key_bounds &&
399+
!_tablet->enable_unique_key_merge_on_write());
400+
rowset_meta->set_segments_key_bounds(segment_key_bounds, aggregate_key_bounds);
393401
rowset_meta->set_num_segment_rows(num_segment_rows);
394402

395403
_output_rowset = _output_rs_writer->manual_build(rowset_meta);

be/src/olap/rowset/beta_rowset_writer.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,10 @@ void build_rowset_meta_with_spec_field(RowsetMeta& rowset_meta,
9898
spec_rowset_meta.is_segments_key_bounds_truncated());
9999
std::vector<KeyBoundsPB> segments_key_bounds;
100100
spec_rowset_meta.get_segments_key_bounds(&segments_key_bounds);
101-
rowset_meta.set_segments_key_bounds(segments_key_bounds);
101+
// Preserve source layout: if source was aggregated (size 1), re-aggregating
102+
// the single entry is a no-op that also keeps the flag consistent.
103+
rowset_meta.set_segments_key_bounds(segments_key_bounds,
104+
spec_rowset_meta.is_segments_key_bounds_aggregated());
102105
std::vector<uint32_t> num_segment_rows;
103106
spec_rowset_meta.get_num_segment_rows(&num_segment_rows);
104107
rowset_meta.set_num_segment_rows(num_segment_rows);
@@ -1021,7 +1024,9 @@ Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool ch
10211024
_total_index_size);
10221025
rowset_meta->set_data_disk_size(total_data_size + _total_data_size);
10231026
rowset_meta->set_index_disk_size(total_index_size + _total_index_size);
1024-
rowset_meta->set_segments_key_bounds(segments_encoded_key_bounds);
1027+
bool aggregate_key_bounds = config::enable_aggregate_non_mow_key_bounds &&
1028+
!_context.enable_unique_key_merge_on_write;
1029+
rowset_meta->set_segments_key_bounds(segments_encoded_key_bounds, aggregate_key_bounds);
10251030
// TODO write zonemap to meta
10261031
rowset_meta->set_empty((num_rows_written + _num_rows_written) == 0);
10271032
rowset_meta->set_creation_time(time(nullptr));

be/src/olap/rowset/rowset.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,10 @@ class Rowset : public std::enable_shared_from_this<Rowset>, public MetadataAdder
302302
return _rowset_meta->is_segments_key_bounds_truncated();
303303
}
304304

305+
bool is_segments_key_bounds_aggregated() const {
306+
return _rowset_meta->is_segments_key_bounds_aggregated();
307+
}
308+
305309
bool check_rowset_segment();
306310

307311
[[nodiscard]] virtual Status add_to_binlog() { return Status::OK(); }

be/src/olap/rowset/rowset_meta.cpp

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -291,11 +291,31 @@ int64_t RowsetMeta::segment_file_size(int seg_id) const {
291291
: -1;
292292
}
293293

294-
void RowsetMeta::set_segments_key_bounds(const std::vector<KeyBoundsPB>& segments_key_bounds) {
295-
for (const KeyBoundsPB& key_bounds : segments_key_bounds) {
296-
KeyBoundsPB* new_key_bounds = _rowset_meta_pb.add_segments_key_bounds();
297-
*new_key_bounds = key_bounds;
294+
void RowsetMeta::set_segments_key_bounds(const std::vector<KeyBoundsPB>& segments_key_bounds,
295+
bool aggregate_into_single) {
296+
_rowset_meta_pb.clear_segments_key_bounds();
297+
bool did_aggregate = aggregate_into_single && !segments_key_bounds.empty();
298+
if (did_aggregate) {
299+
const std::string* overall_min = &segments_key_bounds.front().min_key();
300+
const std::string* overall_max = &segments_key_bounds.front().max_key();
301+
for (const KeyBoundsPB& key_bounds : segments_key_bounds) {
302+
if (key_bounds.min_key() < *overall_min) {
303+
overall_min = &key_bounds.min_key();
304+
}
305+
if (key_bounds.max_key() > *overall_max) {
306+
overall_max = &key_bounds.max_key();
307+
}
308+
}
309+
KeyBoundsPB* aggregated = _rowset_meta_pb.add_segments_key_bounds();
310+
aggregated->set_min_key(*overall_min);
311+
aggregated->set_max_key(*overall_max);
312+
} else {
313+
for (const KeyBoundsPB& key_bounds : segments_key_bounds) {
314+
KeyBoundsPB* new_key_bounds = _rowset_meta_pb.add_segments_key_bounds();
315+
*new_key_bounds = key_bounds;
316+
}
298317
}
318+
set_segments_key_bounds_aggregated(did_aggregate);
299319

300320
int32_t truncation_threshold = config::segments_key_bounds_truncation_threshold;
301321
if (config::random_segments_key_bounds_truncation) {
@@ -328,6 +348,11 @@ void RowsetMeta::merge_rowset_meta(const RowsetMeta& other) {
328348
set_total_disk_size(data_disk_size() + index_disk_size());
329349
set_segments_key_bounds_truncated(is_segments_key_bounds_truncated() ||
330350
other.is_segments_key_bounds_truncated());
351+
// merge_rowset_meta is used in the MOW partial-update publish path, which relies
352+
// on per-segment bounds. Aggregation should never be enabled for MOW rowsets,
353+
// so we do not expect either side to be aggregated here.
354+
DCHECK(!is_segments_key_bounds_aggregated() && !other.is_segments_key_bounds_aggregated())
355+
<< "merge_rowset_meta encountered aggregated key bounds";
331356
if (_rowset_meta_pb.num_segment_rows_size() > 0) {
332357
if (other.num_segments() > 0) {
333358
if (other._rowset_meta_pb.num_segment_rows_size() > 0) {

be/src/olap/rowset/rowset_meta.h

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,17 @@ class RowsetMeta : public MetadataAdder<RowsetMeta> {
355355
_rowset_meta_pb.set_segments_key_bounds_truncated(truncated);
356356
}
357357

358+
// When true, `segments_key_bounds` holds a single aggregated
359+
// [rowset_min, rowset_max] entry instead of per-segment bounds.
360+
bool is_segments_key_bounds_aggregated() const {
361+
return _rowset_meta_pb.has_segments_key_bounds_aggregated() &&
362+
_rowset_meta_pb.segments_key_bounds_aggregated();
363+
}
364+
365+
void set_segments_key_bounds_aggregated(bool aggregated) {
366+
_rowset_meta_pb.set_segments_key_bounds_aggregated(aggregated);
367+
}
368+
358369
bool get_first_segment_key_bound(KeyBoundsPB* key_bounds) {
359370
// for compatibility, old version has not segment key bounds
360371
if (_rowset_meta_pb.segments_key_bounds_size() == 0) {
@@ -372,7 +383,10 @@ class RowsetMeta : public MetadataAdder<RowsetMeta> {
372383
return true;
373384
}
374385

375-
void set_segments_key_bounds(const std::vector<KeyBoundsPB>& segments_key_bounds);
386+
// If `aggregate_into_single` is true, collapse per-segment bounds into a single
387+
// [rowset_min, rowset_max] entry and mark this rowset as aggregated.
388+
void set_segments_key_bounds(const std::vector<KeyBoundsPB>& segments_key_bounds,
389+
bool aggregate_into_single = false);
376390

377391
void add_segment_key_bounds(KeyBoundsPB segments_key_bounds) {
378392
*_rowset_meta_pb.add_segments_key_bounds() = std::move(segments_key_bounds);

0 commit comments

Comments
 (0)