Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 24 additions & 10 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,8 @@ Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletCo
std::unique_ptr<ColumnWriter> writer;
RETURN_IF_ERROR(ColumnWriter::create(opts, &column, _file_writer, &writer));
RETURN_IF_ERROR(writer->init());
_column_writers.push_back(std::move(writer));

_olap_data_convertor->add_column_data_convertor(column);
_column_writers[cid] = std::move(writer);
_olap_data_convertor->add_column_data_convertor_at(column, cid);
return Status::OK();
};

Expand All @@ -322,8 +321,8 @@ Status VerticalSegmentWriter::init() {
_opts.compression_type = _tablet_schema->compression_type();
}
_olap_data_convertor = std::make_unique<vectorized::OlapBlockDataConvertor>();
_olap_data_convertor->reserve(_tablet_schema->num_columns());
_column_writers.reserve(_tablet_schema->columns().size());
_olap_data_convertor->resize(_tablet_schema->num_columns());
_column_writers.resize(_tablet_schema->num_columns());
// we don't need the short key index for unique key merge on write table.
if (_is_mow()) {
size_t seq_col_length = 0;
Expand Down Expand Up @@ -535,6 +534,7 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
vectorized::IOlapColumnDataAccessor* seq_column = nullptr;
uint32_t segment_start_pos = 0;
for (auto cid : including_cids) {
RETURN_IF_ERROR(_create_column_writer(cid, _tablet_schema->column(cid), _tablet_schema));
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
&full_block, data.row_pos, data.num_rows, std::vector<uint32_t> {cid}));
// here we get segment column row num before append data.
Expand Down Expand Up @@ -639,6 +639,7 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
// convert missing columns and send to column writer
const auto& missing_cids = _opts.rowset_ctx->partial_update_info->missing_cids;
for (auto cid : missing_cids) {
RETURN_IF_ERROR(_create_column_writer(cid, _tablet_schema->column(cid), _tablet_schema));
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
&full_block, data.row_pos, data.num_rows, std::vector<uint32_t> {cid}));
auto [status, column] = _olap_data_convertor->convert_column_data(cid);
Expand Down Expand Up @@ -697,7 +698,9 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content(
// create full block and fill with sort key columns
full_block = _tablet_schema->create_block();

uint32_t segment_start_pos = cast_set<uint32_t>(_column_writers.front()->get_next_rowid());
// Use _num_rows_written instead of creating column writer 0, since all column writers
// should have the same row count, which equals _num_rows_written.
uint32_t segment_start_pos = cast_set<uint32_t>(_num_rows_written);

DCHECK(_tablet_schema->has_skip_bitmap_col());
auto skip_bitmap_col_idx = _tablet_schema->skip_bitmap_col_idx();
Expand All @@ -714,6 +717,17 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content(
const std::vector<RowsetSharedPtr>& specified_rowsets = _mow_context->rowset_ptrs;
std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());

// Ensure all primary key column writers and sequence column writer are created before
// aggregate_for_flexible_partial_update, because it internally calls convert_pk_columns
// and convert_seq_column which need the convertors in _olap_data_convertor
for (uint32_t cid = 0; cid < _tablet_schema->num_key_columns(); ++cid) {
RETURN_IF_ERROR(_create_column_writer(cid, _tablet_schema->column(cid), _tablet_schema));
}
if (schema_has_sequence_col) {
uint32_t cid = _tablet_schema->sequence_col_idx();
RETURN_IF_ERROR(_create_column_writer(cid, _tablet_schema->column(cid), _tablet_schema));
}

// 1. aggregate duplicate rows in block
RETURN_IF_ERROR(_block_aggregator.aggregate_for_flexible_partial_update(
const_cast<vectorized::Block*>(data.block), data.num_rows, specified_rowsets,
Expand Down Expand Up @@ -788,6 +802,10 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content(

// 8. encode and write all non-primary key columns(including sequence column if exists)
for (auto cid = _tablet_schema->num_key_columns(); cid < _tablet_schema->num_columns(); cid++) {
if (cid != _tablet_schema->sequence_col_idx()) {
RETURN_IF_ERROR(_create_column_writer(cast_set<uint32_t>(cid),
_tablet_schema->column(cid), _tablet_schema));
}
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
full_block.get_by_position(cid), data.row_pos, data.num_rows,
cast_set<uint32_t>(cid)));
Expand Down Expand Up @@ -953,10 +971,6 @@ Status VerticalSegmentWriter::write_batch() {
!_opts.rowset_ctx->is_transient_rowset_writer) {
bool is_flexible_partial_update =
_opts.rowset_ctx->partial_update_info->is_flexible_partial_update();
for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) {
RETURN_IF_ERROR(
_create_column_writer(cid, _tablet_schema->column(cid), _tablet_schema));
}
vectorized::Block full_block;
for (auto& data : _batched_blocks) {
if (is_flexible_partial_update) {
Expand Down
8 changes: 7 additions & 1 deletion be/src/vec/olap/olap_data_convertor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ void OlapBlockDataConvertor::add_column_data_convertor(const TabletColumn& colum
_convertors.emplace_back(create_olap_column_data_convertor(column));
}

void OlapBlockDataConvertor::add_column_data_convertor_at(const TabletColumn& column, size_t cid) {
_convertors[cid] = create_olap_column_data_convertor(column);
}

OlapBlockDataConvertor::OlapColumnDataConvertorBaseUPtr
OlapBlockDataConvertor::create_map_convertor(const TabletColumn& column) {
const auto& key_column = column.get_sub_column(0);
Expand Down Expand Up @@ -282,7 +286,9 @@ Status OlapBlockDataConvertor::set_source_content_with_specifid_columns(

void OlapBlockDataConvertor::clear_source_content() {
for (auto& convertor : _convertors) {
convertor->clear_source_column();
if (convertor != nullptr) {
convertor->clear_source_column();
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/olap/olap_data_convertor.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,11 @@ class OlapBlockDataConvertor {
void clear_source_content(size_t cid);
std::pair<Status, IOlapColumnDataAccessor*> convert_column_data(size_t cid);
void add_column_data_convertor(const TabletColumn& column);
void add_column_data_convertor_at(const TabletColumn& column, size_t cid);

bool empty() const { return _convertors.empty(); }
void reserve(size_t size) { _convertors.reserve(size); }
void resize(size_t size) { _convertors.resize(size); }
void reset() { _convertors.clear(); }

private:
Expand Down
Loading