diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 7e9a1cdf1e9659..c50a03cb845a7c 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -310,9 +310,8 @@ Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletCo std::unique_ptr writer; RETURN_IF_ERROR(ColumnWriter::create(opts, &column, _file_writer, &writer)); RETURN_IF_ERROR(writer->init()); - _column_writers.push_back(std::move(writer)); - - _olap_data_convertor->add_column_data_convertor(column); + _column_writers[cid] = std::move(writer); + _olap_data_convertor->add_column_data_convertor_at(column, cid); return Status::OK(); }; @@ -322,8 +321,8 @@ Status VerticalSegmentWriter::init() { _opts.compression_type = _tablet_schema->compression_type(); } _olap_data_convertor = std::make_unique(); - _olap_data_convertor->reserve(_tablet_schema->num_columns()); - _column_writers.reserve(_tablet_schema->columns().size()); + _olap_data_convertor->resize(_tablet_schema->num_columns()); + _column_writers.resize(_tablet_schema->num_columns()); // we don't need the short key index for unique key merge on write table. if (_is_mow()) { size_t seq_col_length = 0; @@ -535,6 +534,7 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da vectorized::IOlapColumnDataAccessor* seq_column = nullptr; uint32_t segment_start_pos = 0; for (auto cid : including_cids) { + RETURN_IF_ERROR(_create_column_writer(cid, _tablet_schema->column(cid), _tablet_schema)); RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns( &full_block, data.row_pos, data.num_rows, std::vector {cid})); // here we get segment column row num before append data. @@ -639,6 +639,7 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da // convert missing columns and send to column writer const auto& missing_cids = _opts.rowset_ctx->partial_update_info->missing_cids; for (auto cid : missing_cids) { + RETURN_IF_ERROR(_create_column_writer(cid, _tablet_schema->column(cid), _tablet_schema)); RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns( &full_block, data.row_pos, data.num_rows, std::vector {cid})); auto [status, column] = _olap_data_convertor->convert_column_data(cid); @@ -697,7 +698,9 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content( // create full block and fill with sort key columns full_block = _tablet_schema->create_block(); - uint32_t segment_start_pos = cast_set(_column_writers.front()->get_next_rowid()); + // Use _num_rows_written instead of creating column writer 0, since all column writers + // should have the same row count, which equals _num_rows_written. + uint32_t segment_start_pos = cast_set(_num_rows_written); DCHECK(_tablet_schema->has_skip_bitmap_col()); auto skip_bitmap_col_idx = _tablet_schema->skip_bitmap_col_idx(); @@ -714,6 +717,17 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content( const std::vector& specified_rowsets = _mow_context->rowset_ptrs; std::vector> segment_caches(specified_rowsets.size()); + // Ensure all primary key column writers and sequence column writer are created before + // aggregate_for_flexible_partial_update, because it internally calls convert_pk_columns + // and convert_seq_column which need the convertors in _olap_data_convertor + for (uint32_t cid = 0; cid < _tablet_schema->num_key_columns(); ++cid) { + RETURN_IF_ERROR(_create_column_writer(cid, _tablet_schema->column(cid), _tablet_schema)); + } + if (schema_has_sequence_col) { + uint32_t cid = _tablet_schema->sequence_col_idx(); + RETURN_IF_ERROR(_create_column_writer(cid, _tablet_schema->column(cid), _tablet_schema)); + } + // 1. aggregate duplicate rows in block RETURN_IF_ERROR(_block_aggregator.aggregate_for_flexible_partial_update( const_cast(data.block), data.num_rows, specified_rowsets, @@ -788,6 +802,10 @@ Status VerticalSegmentWriter::_append_block_with_flexible_partial_content( // 8. encode and write all non-primary key columns(including sequence column if exists) for (auto cid = _tablet_schema->num_key_columns(); cid < _tablet_schema->num_columns(); cid++) { + if (cid != _tablet_schema->sequence_col_idx()) { + RETURN_IF_ERROR(_create_column_writer(cast_set(cid), + _tablet_schema->column(cid), _tablet_schema)); + } RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column( full_block.get_by_position(cid), data.row_pos, data.num_rows, cast_set(cid))); @@ -953,10 +971,6 @@ Status VerticalSegmentWriter::write_batch() { !_opts.rowset_ctx->is_transient_rowset_writer) { bool is_flexible_partial_update = _opts.rowset_ctx->partial_update_info->is_flexible_partial_update(); - for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) { - RETURN_IF_ERROR( - _create_column_writer(cid, _tablet_schema->column(cid), _tablet_schema)); - } vectorized::Block full_block; for (auto& data : _batched_blocks) { if (is_flexible_partial_update) { diff --git a/be/src/vec/olap/olap_data_convertor.cpp b/be/src/vec/olap/olap_data_convertor.cpp index cd04c3a4ab7fb0..89e7c8750ed92a 100644 --- a/be/src/vec/olap/olap_data_convertor.cpp +++ b/be/src/vec/olap/olap_data_convertor.cpp @@ -76,6 +76,10 @@ void OlapBlockDataConvertor::add_column_data_convertor(const TabletColumn& colum _convertors.emplace_back(create_olap_column_data_convertor(column)); } +void OlapBlockDataConvertor::add_column_data_convertor_at(const TabletColumn& column, size_t cid) { + _convertors[cid] = create_olap_column_data_convertor(column); +} + OlapBlockDataConvertor::OlapColumnDataConvertorBaseUPtr OlapBlockDataConvertor::create_map_convertor(const TabletColumn& column) { const auto& key_column = column.get_sub_column(0); @@ -282,7 +286,9 @@ Status OlapBlockDataConvertor::set_source_content_with_specifid_columns( void OlapBlockDataConvertor::clear_source_content() { for (auto& convertor : _convertors) { - convertor->clear_source_column(); + if (convertor != nullptr) { + convertor->clear_source_column(); + } } } diff --git a/be/src/vec/olap/olap_data_convertor.h b/be/src/vec/olap/olap_data_convertor.h index 7768dbcf0bd70a..6c98af658ea03e 100644 --- a/be/src/vec/olap/olap_data_convertor.h +++ b/be/src/vec/olap/olap_data_convertor.h @@ -93,9 +93,11 @@ class OlapBlockDataConvertor { void clear_source_content(size_t cid); std::pair convert_column_data(size_t cid); void add_column_data_convertor(const TabletColumn& column); + void add_column_data_convertor_at(const TabletColumn& column, size_t cid); bool empty() const { return _convertors.empty(); } void reserve(size_t size) { _convertors.reserve(size); } + void resize(size_t size) { _convertors.resize(size); } void reset() { _convertors.clear(); } private: