1818#include " exec/sink/viceberg_delete_sink.h"
1919
2020#include < fmt/format.h>
21+ #include < rapidjson/stringbuffer.h>
22+ #include < rapidjson/writer.h>
23+ #include < zlib.h>
2124
2225#include " common/logging.h"
2326#include " core/block/column_with_type_and_name.h"
3033#include " core/data_type/data_type_number.h"
3134#include " core/data_type/data_type_string.h"
3235#include " core/data_type/data_type_struct.h"
36+ #include " exec/common/endian.h"
3337#include " exprs/vexpr.h"
38+ #include " format/table/iceberg_delete_file_reader_helper.h"
3439#include " format/transformer/vfile_format_transformer.h"
40+ #include " io/file_factory.h"
3541#include " runtime/runtime_state.h"
42+ #include " util/slice.h"
3643#include " util/string_util.h"
3744#include " util/uid_util.h"
3845
3946namespace doris {
4047
48+ namespace {
49+
50+ class RewriteBitmapVisitor final : public IcebergPositionDeleteVisitor {
51+ public:
52+ RewriteBitmapVisitor (const std::string& referenced_data_file_path,
53+ roaring::Roaring64Map* rows_to_delete)
54+ : _referenced_data_file_path(referenced_data_file_path),
55+ _rows_to_delete (rows_to_delete) {}
56+
57+ Status visit (const std::string& file_path, int64_t pos) override {
58+ if (_rows_to_delete == nullptr ) {
59+ return Status::InvalidArgument (" rows_to_delete is null" );
60+ }
61+ if (file_path == _referenced_data_file_path) {
62+ _rows_to_delete->add (static_cast <uint64_t >(pos));
63+ }
64+ return Status::OK ();
65+ }
66+
67+ private:
68+ const std::string& _referenced_data_file_path;
69+ roaring::Roaring64Map* _rows_to_delete;
70+ };
71+
72+ Status load_rewritable_delete_rows (RuntimeState* state, RuntimeProfile* profile,
73+ const std::string& referenced_data_file_path,
74+ const std::vector<TIcebergDeleteFileDesc>& delete_files,
75+ const std::map<std::string, std::string>& hadoop_conf,
76+ TFileType::type file_type,
77+ const std::vector<TNetworkAddress>& broker_addresses,
78+ roaring::Roaring64Map* rows_to_delete) {
79+ if (rows_to_delete == nullptr ) {
80+ return Status::InvalidArgument (" rows_to_delete is null" );
81+ }
82+ if (state == nullptr || profile == nullptr || delete_files.empty ()) {
83+ return Status::OK ();
84+ }
85+
86+ TFileScanRangeParams params =
87+ build_iceberg_delete_scan_range_params (hadoop_conf, file_type, broker_addresses);
88+ IcebergDeleteFileIOContext delete_file_io_ctx (state);
89+ IcebergDeleteFileReaderOptions options;
90+ options.state = state;
91+ options.profile = profile;
92+ options.scan_params = ¶ms;
93+ options.io_ctx = &delete_file_io_ctx.io_ctx ;
94+ options.batch_size = 102400 ;
95+
96+ for (const auto & delete_file : delete_files) {
97+ if (is_iceberg_deletion_vector (delete_file)) {
98+ RETURN_IF_ERROR (read_iceberg_deletion_vector (delete_file, options, rows_to_delete));
99+ continue ;
100+ }
101+ RewriteBitmapVisitor visitor (referenced_data_file_path, rows_to_delete);
102+ RETURN_IF_ERROR (read_iceberg_position_delete_file (delete_file, options, &visitor));
103+ }
104+ return Status::OK ();
105+ }
106+
107+ } // namespace
108+
41109VIcebergDeleteSink::VIcebergDeleteSink (const TDataSink& t_sink,
42110 const VExprContextSPtrs& output_exprs,
43111 std::shared_ptr<Dependency> dep,
@@ -95,6 +163,24 @@ Status VIcebergDeleteSink::init_properties(ObjectPool* pool) {
95163 _partition_data_json = delete_sink.partition_data_json ;
96164 }
97165
166+ if (delete_sink.__isset .format_version ) {
167+ _format_version = delete_sink.format_version ;
168+ }
169+
170+ // for merge old deletion vector and old position delete to a new deletion vector.
171+ if (_format_version >= 3 && delete_sink.__isset .rewritable_delete_file_sets ) {
172+ for (const auto & delete_file_set : delete_sink.rewritable_delete_file_sets ) {
173+ if (!delete_file_set.__isset .referenced_data_file_path
174+ || !delete_file_set.__isset .delete_files
175+ || delete_file_set.referenced_data_file_path .empty ()
176+ || delete_file_set.delete_files .empty ()) {
177+ continue ;
178+ }
179+ _rewritable_delete_files.emplace (delete_file_set.referenced_data_file_path ,
180+ delete_file_set.delete_files );
181+ }
182+ }
183+
98184 return Status::OK ();
99185}
100186
@@ -111,10 +197,13 @@ Status VIcebergDeleteSink::open(RuntimeState* state, RuntimeProfile* profile) {
111197
112198 SCOPED_TIMER (_open_timer);
113199
114- RETURN_IF_ERROR (_init_position_delete_output_exprs ());
200+ if (_format_version < 3 ) {
201+ RETURN_IF_ERROR (_init_position_delete_output_exprs ());
202+ }
115203
116- LOG (INFO) << fmt::format (" VIcebergDeleteSink opened: delete_type={}, output_path={}" ,
117- to_string (_delete_type), _output_path);
204+ LOG (INFO) << fmt::format (
205+ " VIcebergDeleteSink opened: delete_type={}, output_path={}, format_version={}" ,
206+ to_string (_delete_type), _output_path, _format_version);
118207
119208 return Status::OK ();
120209}
@@ -153,7 +242,11 @@ Status VIcebergDeleteSink::close(Status close_status) {
153242
154243 if (_delete_type == TFileContent::POSITION_DELETES && !_file_deletions.empty ()) {
155244 SCOPED_TIMER (_write_delete_files_timer);
156- RETURN_IF_ERROR (_write_position_delete_files (_file_deletions));
245+ if (_format_version >= 3 ) {
246+ RETURN_IF_ERROR (_write_deletion_vector_files (_file_deletions));
247+ } else {
248+ RETURN_IF_ERROR (_write_position_delete_files (_file_deletions));
249+ }
157250 }
158251
159252 // Update counters
@@ -475,6 +568,161 @@ std::string VIcebergDeleteSink::_get_file_extension() const {
475568 return fmt::format (" {}{}" , compress_name, file_format_name);
476569}
477570
571+
572+ Status VIcebergDeleteSink::_write_deletion_vector_files (
573+ const std::map<std::string, IcebergFileDeletion>& file_deletions) {
574+ std::vector<DeletionVectorBlob> blobs;
575+ for (const auto & [data_file_path, deletion] : file_deletions) {
576+ if (deletion.rows_to_delete .isEmpty ()) {
577+ continue ;
578+ }
579+
580+ roaring::Roaring64Map merged_rows = deletion.rows_to_delete ;
581+ auto previous_delete_it = _rewritable_delete_files.find (data_file_path);
582+ if (previous_delete_it != _rewritable_delete_files.end ()) {
583+ roaring::Roaring64Map previous_rows;
584+ RETURN_IF_ERROR (load_rewritable_delete_rows (
585+ _state, _state->runtime_profile (), data_file_path, previous_delete_it->second ,
586+ _hadoop_conf, _file_type, _broker_addresses, &previous_rows));
587+ merged_rows |= previous_rows;
588+ }
589+
590+ size_t bitmap_size = merged_rows.getSizeInBytes ();
591+ DeletionVectorBlob blob;
592+ blob.referenced_data_file = data_file_path;
593+ blob.partition_spec_id = deletion.partition_spec_id ;
594+ blob.partition_data_json = deletion.partition_data_json ;
595+ blob.row_count = static_cast <int64_t >(merged_rows.cardinality ());
596+ blob.content_size_in_bytes = static_cast <int64_t >(4 + 4 + bitmap_size + 4 );
597+ blob.blob_data .resize (static_cast <size_t >(blob.content_size_in_bytes ));
598+ merged_rows.write (blob.blob_data .data () + 8 );
599+
600+ uint32_t total_length = static_cast <uint32_t >(4 + bitmap_size);
601+ BigEndian::Store32 (blob.blob_data .data (), total_length);
602+
603+ constexpr char DV_MAGIC[] = {' \xD1 ' , ' \xD3 ' , ' \x39 ' , ' \x64 ' };
604+ memcpy (blob.blob_data .data () + 4 , DV_MAGIC, 4 );
605+
606+ uint32_t crc = static_cast <uint32_t >(
607+ ::crc32 (0 , reinterpret_cast <const Bytef*>(blob.blob_data.data() + 4),
608+ 4 + (uInt)bitmap_size));
609+ BigEndian::Store32 (blob.blob_data .data () + 8 + bitmap_size, crc);
610+ blobs.emplace_back (std::move (blob));
611+ }
612+
613+ if (blobs.empty ()) {
614+ return Status::OK ();
615+ }
616+
617+ std::string puffin_path = _generate_puffin_file_path ();
618+ int64_t puffin_file_size = 0 ;
619+ RETURN_IF_ERROR (_write_puffin_file (puffin_path, &blobs, &puffin_file_size));
620+
621+ for (const auto & blob : blobs) {
622+ TIcebergCommitData commit_data;
623+ commit_data.__set_file_path (puffin_path);
624+ commit_data.__set_row_count (blob.row_count );
625+ commit_data.__set_file_size (puffin_file_size);
626+ commit_data.__set_file_content (TFileContent::DELETION_VECTOR);
627+ commit_data.__set_content_offset (blob.content_offset );
628+ commit_data.__set_content_size_in_bytes (blob.content_size_in_bytes );
629+ commit_data.__set_referenced_data_file_path (blob.referenced_data_file );
630+ if (blob.partition_spec_id != 0 || !blob.partition_data_json .empty ()) {
631+ commit_data.__set_partition_spec_id (blob.partition_spec_id );
632+ commit_data.__set_partition_data_json (blob.partition_data_json );
633+ }
634+
635+ _commit_data_list.push_back (commit_data);
636+ _delete_file_count++;
637+ }
638+ return Status::OK ();
639+ }
640+
641+ Status VIcebergDeleteSink::_write_puffin_file (const std::string& puffin_path,
642+ std::vector<DeletionVectorBlob>* blobs,
643+ int64_t * out_file_size) {
644+ DCHECK (blobs != nullptr );
645+ DCHECK (!blobs->empty ());
646+
647+ io::FSPropertiesRef fs_properties (_file_type);
648+ fs_properties.properties = &_hadoop_conf;
649+ if (!_broker_addresses.empty ()) {
650+ fs_properties.broker_addresses = &_broker_addresses;
651+ }
652+ io::FileDescription file_description = {.path = puffin_path, .fs_name {}};
653+ auto fs = DORIS_TRY (FileFactory::create_fs (fs_properties, file_description));
654+ io::FileWriterOptions file_writer_options = {.used_by_s3_committer = false };
655+ io::FileWriterPtr file_writer;
656+ RETURN_IF_ERROR (fs->create_file (file_description.path , &file_writer, &file_writer_options));
657+
658+ constexpr char PUFFIN_MAGIC[] = {' \x50 ' , ' \x46 ' , ' \x41 ' , ' \x31 ' };
659+ RETURN_IF_ERROR (file_writer->append (Slice (reinterpret_cast <const uint8_t *>(PUFFIN_MAGIC), 4 )));
660+ int64_t current_offset = 4 ;
661+ for (auto & blob : *blobs) {
662+ blob.content_offset = current_offset;
663+ RETURN_IF_ERROR (file_writer->append (Slice (
664+ reinterpret_cast <const uint8_t *>(blob.blob_data .data ()), blob.blob_data .size ())));
665+ current_offset += static_cast <int64_t >(blob.blob_data .size ());
666+ }
667+ RETURN_IF_ERROR (file_writer->append (Slice (reinterpret_cast <const uint8_t *>(PUFFIN_MAGIC), 4 )));
668+
669+ std::string footer_json = _build_puffin_footer_json (*blobs);
670+ RETURN_IF_ERROR (file_writer->append (
671+ Slice (reinterpret_cast <const uint8_t *>(footer_json.data ()), footer_json.size ())));
672+
673+ char footer_size_buf[4 ];
674+ LittleEndian::Store32 (footer_size_buf, static_cast <uint32_t >(footer_json.size ()));
675+ RETURN_IF_ERROR (file_writer->append (
676+ Slice (reinterpret_cast <const uint8_t *>(footer_size_buf), sizeof (footer_size_buf))));
677+
678+ char flags[4 ] = {0 , 0 , 0 , 0 };
679+ RETURN_IF_ERROR (file_writer->append (
680+ Slice (reinterpret_cast <const uint8_t *>(flags), sizeof (flags))));
681+ RETURN_IF_ERROR (file_writer->append (Slice (reinterpret_cast <const uint8_t *>(PUFFIN_MAGIC), 4 )));
682+ RETURN_IF_ERROR (file_writer->close ());
683+
684+ *out_file_size = current_offset + 4 + static_cast <int64_t >(footer_json.size ()) + 4 + 4 + 4 ;
685+ return Status::OK ();
686+ }
687+
688+ std::string VIcebergDeleteSink::_build_puffin_footer_json (
689+ const std::vector<DeletionVectorBlob>& blobs) {
690+ rapidjson::StringBuffer buffer;
691+ rapidjson::Writer<rapidjson::StringBuffer> writer (buffer);
692+ writer.StartObject ();
693+ writer.Key (" blobs" );
694+ writer.StartArray ();
695+ for (const auto & blob : blobs) {
696+ writer.StartObject ();
697+ writer.Key (" type" );
698+ writer.String (" deletion-vector-v1" );
699+ writer.Key (" fields" );
700+ writer.StartArray ();
701+ writer.EndArray ();
702+ writer.Key (" snapshot-id" );
703+ writer.Int64 (-1 );
704+ writer.Key (" sequence-number" );
705+ writer.Int64 (-1 );
706+ writer.Key (" offset" );
707+ writer.Int64 (blob.content_offset );
708+ writer.Key (" length" );
709+ writer.Int64 (blob.content_size_in_bytes );
710+ writer.Key (" properties" );
711+ writer.StartObject ();
712+ writer.Key (" referenced-data-file" );
713+ writer.String (blob.referenced_data_file .c_str (),
714+ static_cast <rapidjson::SizeType>(blob.referenced_data_file .size ()));
715+ std::string cardinality = std::to_string (blob.row_count );
716+ writer.Key (" cardinality" );
717+ writer.String (cardinality.c_str (), static_cast <rapidjson::SizeType>(cardinality.size ()));
718+ writer.EndObject ();
719+ writer.EndObject ();
720+ }
721+ writer.EndArray ();
722+ writer.EndObject ();
723+ return {buffer.GetString (), buffer.GetSize ()};
724+ }
725+
478726std::string VIcebergDeleteSink::_generate_delete_file_path (
479727 const std::string& referenced_data_file) {
480728 // Generate unique delete file name using UUID
@@ -498,4 +746,14 @@ std::string VIcebergDeleteSink::_generate_delete_file_path(
498746 return fmt::format (" {}{}" , base_path, file_name);
499747}
500748
749+ std::string VIcebergDeleteSink::_generate_puffin_file_path () {
750+ std::string uuid = generate_uuid_string ();
751+ std::string file_name = fmt::format (" delete_dv_{}.puffin" , uuid);
752+ std::string base_path = _output_path.empty () ? _table_location : _output_path;
753+ if (!base_path.empty () && base_path.back () != ' /' ) {
754+ base_path += ' /' ;
755+ }
756+ return fmt::format (" {}{}" , base_path, file_name);
757+ }
758+
501759} // namespace doris
0 commit comments