Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions be/src/cloud/cloud_snapshot_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,14 @@ Status CloudSnapshotMgr::_create_rowset_meta(
for (const auto& key_bound : source_meta_pb.segments_key_bounds()) {
*new_rowset_meta_pb->add_segments_key_bounds() = key_bound;
}
if (source_meta_pb.has_segments_key_bounds_truncated()) {
new_rowset_meta_pb->set_segments_key_bounds_truncated(
source_meta_pb.segments_key_bounds_truncated());
}
if (source_meta_pb.has_segments_key_bounds_aggregated()) {
new_rowset_meta_pb->set_segments_key_bounds_aggregated(
source_meta_pb.segments_key_bounds_aggregated());
}
if (source_meta_pb.has_delete_predicate()) {
DeletePredicatePB* new_delete_condition = new_rowset_meta_pb->mutable_delete_predicate();
*new_delete_condition = source_meta_pb.delete_predicate();
Expand Down
28 changes: 24 additions & 4 deletions be/src/cloud/pb_convert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,12 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, const RowsetMetaPB& in)
}
out->set_txn_expiration(in.txn_expiration());
out->set_segments_overlap_pb(in.segments_overlap_pb());
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
if (in.has_segments_key_bounds_truncated()) {
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
}
if (in.has_segments_key_bounds_aggregated()) {
out->set_segments_key_bounds_aggregated(in.segments_key_bounds_aggregated());
}
out->mutable_num_segment_rows()->CopyFrom(in.num_segment_rows());
out->mutable_segments_file_size()->CopyFrom(in.segments_file_size());
out->set_index_id(in.index_id());
Expand Down Expand Up @@ -160,7 +165,12 @@ void doris_rowset_meta_to_cloud(RowsetMetaCloudPB* out, RowsetMetaPB&& in) {
}
out->set_txn_expiration(in.txn_expiration());
out->set_segments_overlap_pb(in.segments_overlap_pb());
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
if (in.has_segments_key_bounds_truncated()) {
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
}
if (in.has_segments_key_bounds_aggregated()) {
out->set_segments_key_bounds_aggregated(in.segments_key_bounds_aggregated());
}
out->mutable_num_segment_rows()->Swap(in.mutable_num_segment_rows());
out->mutable_segments_file_size()->Swap(in.mutable_segments_file_size());
out->set_index_id(in.index_id());
Expand Down Expand Up @@ -253,7 +263,12 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, const RowsetMetaCloudPB& in)
}
out->set_txn_expiration(in.txn_expiration());
out->set_segments_overlap_pb(in.segments_overlap_pb());
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
if (in.has_segments_key_bounds_truncated()) {
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
}
if (in.has_segments_key_bounds_aggregated()) {
out->set_segments_key_bounds_aggregated(in.segments_key_bounds_aggregated());
}
out->mutable_num_segment_rows()->CopyFrom(in.num_segment_rows());
out->mutable_segments_file_size()->CopyFrom(in.segments_file_size());
out->set_index_id(in.index_id());
Expand Down Expand Up @@ -334,7 +349,12 @@ void cloud_rowset_meta_to_doris(RowsetMetaPB* out, RowsetMetaCloudPB&& in) {
}
out->set_txn_expiration(in.txn_expiration());
out->set_segments_overlap_pb(in.segments_overlap_pb());
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
if (in.has_segments_key_bounds_truncated()) {
out->set_segments_key_bounds_truncated(in.segments_key_bounds_truncated());
}
if (in.has_segments_key_bounds_aggregated()) {
out->set_segments_key_bounds_aggregated(in.segments_key_bounds_aggregated());
}
out->mutable_num_segment_rows()->Swap(in.mutable_num_segment_rows());
out->mutable_segments_file_size()->Swap(in.mutable_segments_file_size());
out->set_index_id(in.index_id());
Expand Down
4 changes: 4 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1715,6 +1715,10 @@ DEFINE_mBool(enable_fetch_rowsets_from_peer_replicas, "false");
DEFINE_mInt32(segments_key_bounds_truncation_threshold, "36");
// ATTENTION: for test only, use random segments key bounds truncation threshold every time
DEFINE_mBool(random_segments_key_bounds_truncation, "false");

// If true, non-MOW rowsets store a single aggregated [rowset_min, rowset_max]
// key-bounds entry instead of per-segment bounds, to reduce meta size on cloud FDB.
DEFINE_mBool(enable_aggregate_non_mow_key_bounds, "true");
// p0, daily, rqg, external
DEFINE_String(fuzzy_test_type, "");

Expand Down
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1772,6 +1772,10 @@ DECLARE_mInt32(segments_key_bounds_truncation_threshold);
// ATTENTION: for test only, use random segments key bounds truncation threshold every time
DECLARE_mBool(random_segments_key_bounds_truncation);

// If true, non-MOW rowsets store a single aggregated [rowset_min, rowset_max]
// key-bounds entry instead of per-segment bounds, to reduce meta size on cloud FDB.
DECLARE_mBool(enable_aggregate_non_mow_key_bounds);

DECLARE_mBool(enable_auto_clone_on_compaction_missing_version);

DECLARE_mBool(enable_auto_clone_on_mow_publish_missing_version);
Expand Down
10 changes: 9 additions & 1 deletion be/src/storage/compaction/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,13 +419,15 @@ Status CompactionMixin::do_compact_ordered_rowsets() {
// link data to new rowset
auto seg_id = 0;
bool segments_key_bounds_truncated {false};
bool any_input_aggregated {false};
std::vector<KeyBoundsPB> segment_key_bounds;
std::vector<uint32_t> num_segment_rows;
for (auto rowset : _input_rowsets) {
RETURN_IF_ERROR(rowset->link_files_to(tablet()->tablet_path(),
_output_rs_writer->rowset_id(), seg_id));
seg_id += rowset->num_segments();
segments_key_bounds_truncated |= rowset->is_segments_key_bounds_truncated();
any_input_aggregated |= rowset->rowset_meta()->is_segments_key_bounds_aggregated();
std::vector<KeyBoundsPB> key_bounds;
RETURN_IF_ERROR(rowset->get_segments_key_bounds(&key_bounds));
segment_key_bounds.insert(segment_key_bounds.end(), key_bounds.begin(), key_bounds.end());
Expand All @@ -445,7 +447,13 @@ Status CompactionMixin::do_compact_ordered_rowsets() {
rowset_meta->set_segments_overlap(NONOVERLAPPING);
rowset_meta->set_rowset_state(VISIBLE);
rowset_meta->set_segments_key_bounds_truncated(segments_key_bounds_truncated);
rowset_meta->set_segments_key_bounds(segment_key_bounds);
// If any input was already aggregated we have no way to recover per-segment
// bounds, so force aggregation on the output to keep the layout consistent
// with `num_segments` / the aggregated flag, even if the config is off now.
bool aggregate_key_bounds =
any_input_aggregated || (config::enable_aggregate_non_mow_key_bounds &&
!_tablet->enable_unique_key_merge_on_write());
rowset_meta->set_segments_key_bounds(segment_key_bounds, aggregate_key_bounds);
rowset_meta->set_num_segment_rows(num_segment_rows);

_output_rowset = _output_rs_writer->manual_build(rowset_meta);
Expand Down
9 changes: 7 additions & 2 deletions be/src/storage/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ void build_rowset_meta_with_spec_field(RowsetMeta& rowset_meta,
spec_rowset_meta.is_segments_key_bounds_truncated());
std::vector<KeyBoundsPB> segments_key_bounds;
spec_rowset_meta.get_segments_key_bounds(&segments_key_bounds);
rowset_meta.set_segments_key_bounds(segments_key_bounds);
// Preserve source layout: if source was aggregated (size 1), re-aggregating
// the single entry is a no-op that also keeps the flag consistent.
rowset_meta.set_segments_key_bounds(segments_key_bounds,
spec_rowset_meta.is_segments_key_bounds_aggregated());
std::vector<uint32_t> num_segment_rows;
spec_rowset_meta.get_num_segment_rows(&num_segment_rows);
rowset_meta.set_num_segment_rows(num_segment_rows);
Expand Down Expand Up @@ -1019,7 +1022,9 @@ Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool ch
_total_index_size);
rowset_meta->set_data_disk_size(total_data_size + _total_data_size);
rowset_meta->set_index_disk_size(total_index_size + _total_index_size);
rowset_meta->set_segments_key_bounds(segments_encoded_key_bounds);
bool aggregate_key_bounds = config::enable_aggregate_non_mow_key_bounds &&
!_context.enable_unique_key_merge_on_write;
rowset_meta->set_segments_key_bounds(segments_encoded_key_bounds, aggregate_key_bounds);
// TODO write zonemap to meta
rowset_meta->set_empty((num_rows_written + _num_rows_written) == 0);
rowset_meta->set_creation_time(time(nullptr));
Expand Down
4 changes: 4 additions & 0 deletions be/src/storage/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,10 @@ class Rowset : public std::enable_shared_from_this<Rowset>, public MetadataAdder
return _rowset_meta->is_segments_key_bounds_truncated();
}

bool is_segments_key_bounds_aggregated() const {
return _rowset_meta->is_segments_key_bounds_aggregated();
}

bool check_rowset_segment();

[[nodiscard]] virtual Status add_to_binlog() { return Status::OK(); }
Expand Down
33 changes: 29 additions & 4 deletions be/src/storage/rowset/rowset_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,31 @@ int64_t RowsetMeta::segment_file_size(int seg_id) const {
: -1;
}

void RowsetMeta::set_segments_key_bounds(const std::vector<KeyBoundsPB>& segments_key_bounds) {
for (const KeyBoundsPB& key_bounds : segments_key_bounds) {
KeyBoundsPB* new_key_bounds = _rowset_meta_pb.add_segments_key_bounds();
*new_key_bounds = key_bounds;
void RowsetMeta::set_segments_key_bounds(const std::vector<KeyBoundsPB>& segments_key_bounds,
bool aggregate_into_single) {
_rowset_meta_pb.clear_segments_key_bounds();
bool did_aggregate = aggregate_into_single && !segments_key_bounds.empty();
if (did_aggregate) {
const std::string* overall_min = &segments_key_bounds.front().min_key();
const std::string* overall_max = &segments_key_bounds.front().max_key();
for (const KeyBoundsPB& key_bounds : segments_key_bounds) {
if (key_bounds.min_key() < *overall_min) {
overall_min = &key_bounds.min_key();
}
if (key_bounds.max_key() > *overall_max) {
overall_max = &key_bounds.max_key();
}
}
KeyBoundsPB* aggregated = _rowset_meta_pb.add_segments_key_bounds();
aggregated->set_min_key(*overall_min);
aggregated->set_max_key(*overall_max);
} else {
for (const KeyBoundsPB& key_bounds : segments_key_bounds) {
KeyBoundsPB* new_key_bounds = _rowset_meta_pb.add_segments_key_bounds();
*new_key_bounds = key_bounds;
}
}
set_segments_key_bounds_aggregated(did_aggregate);

int32_t truncation_threshold = config::segments_key_bounds_truncation_threshold;
if (config::random_segments_key_bounds_truncation) {
Expand Down Expand Up @@ -326,6 +346,11 @@ void RowsetMeta::merge_rowset_meta(const RowsetMeta& other) {
set_total_disk_size(data_disk_size() + index_disk_size());
set_segments_key_bounds_truncated(is_segments_key_bounds_truncated() ||
other.is_segments_key_bounds_truncated());
// merge_rowset_meta is used in the MOW partial-update publish path, which relies
// on per-segment bounds. Aggregation should never be enabled for MOW rowsets,
// so we do not expect either side to be aggregated here.
DCHECK(!is_segments_key_bounds_aggregated() && !other.is_segments_key_bounds_aggregated())
<< "merge_rowset_meta encountered aggregated key bounds";
if (_rowset_meta_pb.num_segment_rows_size() > 0) {
if (other.num_segments() > 0) {
if (other._rowset_meta_pb.num_segment_rows_size() > 0) {
Expand Down
16 changes: 15 additions & 1 deletion be/src/storage/rowset/rowset_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,17 @@ class RowsetMeta : public MetadataAdder<RowsetMeta> {
_rowset_meta_pb.set_segments_key_bounds_truncated(truncated);
}

// When true, `segments_key_bounds` holds a single aggregated
// [rowset_min, rowset_max] entry instead of per-segment bounds.
bool is_segments_key_bounds_aggregated() const {
return _rowset_meta_pb.has_segments_key_bounds_aggregated() &&
_rowset_meta_pb.segments_key_bounds_aggregated();
}

void set_segments_key_bounds_aggregated(bool aggregated) {
_rowset_meta_pb.set_segments_key_bounds_aggregated(aggregated);
}

bool get_first_segment_key_bound(KeyBoundsPB* key_bounds) {
// for compatibility, old version has not segment key bounds
if (_rowset_meta_pb.segments_key_bounds_size() == 0) {
Expand All @@ -370,7 +381,10 @@ class RowsetMeta : public MetadataAdder<RowsetMeta> {
return true;
}

void set_segments_key_bounds(const std::vector<KeyBoundsPB>& segments_key_bounds);
// If `aggregate_into_single` is true, collapse per-segment bounds into a single
// [rowset_min, rowset_max] entry and mark this rowset as aggregated.
void set_segments_key_bounds(const std::vector<KeyBoundsPB>& segments_key_bounds,
bool aggregate_into_single = false);

void add_segment_key_bounds(KeyBoundsPB segments_key_bounds) {
*_rowset_meta_pb.add_segments_key_bounds() = std::move(segments_key_bounds);
Expand Down
16 changes: 11 additions & 5 deletions be/src/storage/snapshot/snapshot_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,9 @@ Result<std::vector<PendingRowsetGuard>> SnapshotManager::convert_rowset_ids(
// src be local rowset
RowsetId rowset_id = _engine.next_rowset_id();
guards.push_back(_engine.pending_local_rowsets().add(rowset_id));
RETURN_IF_ERROR_RESULT(_rename_rowset_id(visible_rowset, clone_dir, tablet_schema,
rowset_id, rowset_meta));
RETURN_IF_ERROR_RESULT(_rename_rowset_id(
visible_rowset, clone_dir, tablet_schema, rowset_id, rowset_meta,
new_tablet_meta_pb.enable_unique_key_merge_on_write()));
RowsetId src_rs_id;
if (visible_rowset.rowset_id() > 0) {
src_rs_id.init(visible_rowset.rowset_id());
Expand Down Expand Up @@ -268,8 +269,9 @@ Result<std::vector<PendingRowsetGuard>> SnapshotManager::convert_rowset_ids(
// src be local rowset
RowsetId rowset_id = _engine.next_rowset_id();
guards.push_back(_engine.pending_local_rowsets().add(rowset_id));
RETURN_IF_ERROR_RESULT(_rename_rowset_id(stale_rowset, clone_dir, tablet_schema,
rowset_id, rowset_meta));
RETURN_IF_ERROR_RESULT(_rename_rowset_id(
stale_rowset, clone_dir, tablet_schema, rowset_id, rowset_meta,
new_tablet_meta_pb.enable_unique_key_merge_on_write()));
RowsetId src_rs_id;
if (stale_rowset.rowset_id() > 0) {
src_rs_id.init(stale_rowset.rowset_id());
Expand Down Expand Up @@ -323,7 +325,8 @@ Result<std::vector<PendingRowsetGuard>> SnapshotManager::convert_rowset_ids(
Status SnapshotManager::_rename_rowset_id(const RowsetMetaPB& rs_meta_pb,
const std::string& new_tablet_path,
TabletSchemaSPtr tablet_schema, const RowsetId& rowset_id,
RowsetMetaPB* new_rs_meta_pb) {
RowsetMetaPB* new_rs_meta_pb,
bool enable_unique_key_merge_on_write) {
Status st = Status::OK();
RowsetMetaSharedPtr rowset_meta(new RowsetMeta());
rowset_meta->init_from_pb(rs_meta_pb);
Expand All @@ -349,6 +352,9 @@ Status SnapshotManager::_rename_rowset_id(const RowsetMetaPB& rs_meta_pb,
context.newest_write_timestamp = org_rowset_meta->newest_write_timestamp();
// keep segments_overlap same as origin rowset
context.segments_overlap = rowset_meta->segments_overlap();
// propagate MOW flag so that non-MOW key-bounds aggregation is not applied
// when restoring a MOW tablet's rowset
context.enable_unique_key_merge_on_write = enable_unique_key_merge_on_write;

auto rs_writer = DORIS_TRY(RowsetFactory::create_rowset_writer(_engine, context, false));

Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/snapshot/snapshot_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class SnapshotManager {

Status _rename_rowset_id(const RowsetMetaPB& rs_meta_pb, const std::string& new_tablet_path,
TabletSchemaSPtr tablet_schema, const RowsetId& next_id,
RowsetMetaPB* new_rs_meta_pb);
RowsetMetaPB* new_rs_meta_pb, bool enable_unique_key_merge_on_write);

Status _rename_index_ids(TabletSchemaPB& schema_pb,
const TabletSchemaSPtr& tablet_schema) const;
Expand Down
13 changes: 12 additions & 1 deletion be/src/storage/tablet/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,18 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema* latest
std::vector<KeyBoundsPB> segments_key_bounds;
rs->rowset_meta()->get_segments_key_bounds(&segments_key_bounds);
int num_segments = cast_set<int>(rs->num_segments());
DCHECK_EQ(segments_key_bounds.size(), num_segments);
// MOW lookup requires per-segment bounds. Aggregation must be disabled
// for MOW writers, but enforce at runtime too — indexing segments_key_bounds[j]
// below would be out-of-bounds otherwise.
if (UNLIKELY(rs->rowset_meta()->is_segments_key_bounds_aggregated() ||
static_cast<int>(segments_key_bounds.size()) != num_segments)) {
return Status::InternalError(
"MOW lookup got rowset with inconsistent segments_key_bounds, rowset_id={}, "
"aggregated={}, bounds_size={}, num_segments={}",
rs->rowset_id().to_string(),
rs->rowset_meta()->is_segments_key_bounds_aggregated(),
segments_key_bounds.size(), num_segments);
}
std::vector<uint32_t> picked_segments;
for (int j = num_segments - 1; j >= 0; j--) {
if (_key_is_not_in_segment(key_without_seq, segments_key_bounds[j],
Expand Down
5 changes: 4 additions & 1 deletion be/src/storage/task/index_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,10 @@ Status IndexBuilder::update_inverted_index_info() {
RETURN_IF_ERROR(input_rowset->get_segments_key_bounds(&key_bounds));
rowset_meta->set_segments_key_bounds_truncated(
input_rowset_meta->is_segments_key_bounds_truncated());
rowset_meta->set_segments_key_bounds(key_bounds);
// preserve aggregated layout via the setter so the aggregated flag is not
// clobbered by set_segments_key_bounds's default reset path.
rowset_meta->set_segments_key_bounds(
key_bounds, input_rowset_meta->is_segments_key_bounds_aggregated());
std::vector<uint32_t> num_segment_rows;
input_rowset_meta->get_num_segment_rows(&num_segment_rows);
rowset_meta->set_num_segment_rows(num_segment_rows);
Expand Down
Loading
Loading