Skip to content

Commit 5b8b125

Browse files
committed
fix fe iceberg checkstyle
1 parent c31d2ec commit 5b8b125

File tree

54 files changed

+3158
-177
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+3158
-177
lines changed

be/src/exec/scan/file_scanner.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1205,6 +1205,16 @@ Status FileScanner::_init_parquet_reader(FileMetaCache* file_meta_cache_ptr,
12051205
RETURN_IF_ERROR(_create_row_id_column_iterator());
12061206
iceberg_reader->set_row_id_column_iterator(_row_id_column_iterator_pair);
12071207
}
1208+
if (_row_lineage_columns != nullptr) {
1209+
auto row_lineage_columns = std::make_shared<RowGroupReader::RowLineageColumns>();
1210+
row_lineage_columns->row_id_column_idx = _row_lineage_columns->row_id_column_idx;
1211+
row_lineage_columns->last_updated_sequence_number_column_idx =
1212+
_row_lineage_columns->last_updated_sequence_number_column_idx;
1213+
row_lineage_columns->first_row_id = range.table_format_params.iceberg_params.first_row_id;
1214+
row_lineage_columns->last_updated_sequence_number =
1215+
range.table_format_params.iceberg_params.last_updated_sequence_number;
1216+
iceberg_reader->set_row_lineage_columns(row_lineage_columns);
1217+
}
12081218
iceberg_reader->set_push_down_agg_type(_get_push_down_agg_type());
12091219

12101220
init_status = iceberg_reader->init_reader(
@@ -1761,6 +1771,19 @@ Status FileScanner::_init_expr_ctxes() {
17611771
_row_id_column_iterator_pair.second = _default_val_row_desc->get_column_id(slot_id);
17621772
continue;
17631773
}
1774+
if (it->second->col_name().starts_with("_row_id")) {
1775+
if (_row_lineage_columns == nullptr) {
1776+
_row_lineage_columns = std::make_shared<RowGroupReader::RowLineageColumns>();
1777+
}
1778+
_row_lineage_columns->row_id_column_idx = _default_val_row_desc->get_column_id(slot_id);
1779+
}
1780+
if (it->second->col_name().starts_with("_last_updated_sequence_number")) {
1781+
if (_row_lineage_columns == nullptr) {
1782+
_row_lineage_columns = std::make_shared<RowGroupReader::RowLineageColumns>();
1783+
}
1784+
_row_lineage_columns->last_updated_sequence_number_column_idx =
1785+
_default_val_row_desc->get_column_id(slot_id);
1786+
}
17641787

17651788
LOG(INFO) << "[DEBUG _init_expr_ctxes] slot: '" << it->second->col_name()
17661789
<< "' is_file_slot=" << slot_info.is_file_slot << " is_partition="

be/src/exec/scan/file_scanner.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@ class FileScanner : public Scanner {
231231

232232
std::pair<std::shared_ptr<RowIdColumnIteratorV2>, int> _row_id_column_iterator_pair = {nullptr,
233233
-1};
234+
std::shared_ptr<RowGroupReader::RowLineageColumns> _row_lineage_columns = nullptr;
234235
int64_t _last_bytes_read_from_local = 0;
235236
int64_t _last_bytes_read_from_remote = 0;
236237

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "exec/sink/viceberg_delete_rewrite_loader.h"
19+
20+
#include "common/status.h"
21+
#include "format/table/iceberg_delete_file_reader_helper.h"
22+
#include "io/hdfs_builder.h"
23+
#include "runtime/runtime_state.h"
24+
25+
namespace doris {
26+
27+
namespace {
28+
class RewriteBitmapVisitor final : public IcebergPositionDeleteVisitor {
29+
public:
30+
RewriteBitmapVisitor(const std::string& referenced_data_file_path,
31+
roaring::Roaring64Map* rows_to_delete)
32+
: _referenced_data_file_path(referenced_data_file_path),
33+
_rows_to_delete(rows_to_delete) {}
34+
35+
Status visit(const std::string& file_path, int64_t pos) override {
36+
if (_rows_to_delete == nullptr) {
37+
return Status::InvalidArgument("rows_to_delete is null");
38+
}
39+
if (file_path == _referenced_data_file_path) {
40+
_rows_to_delete->add(static_cast<uint64_t>(pos));
41+
}
42+
return Status::OK();
43+
}
44+
45+
private:
46+
const std::string& _referenced_data_file_path;
47+
roaring::Roaring64Map* _rows_to_delete;
48+
};
49+
} // namespace
50+
51+
Status VIcebergDeleteRewriteLoader::load(RuntimeState* state, RuntimeProfile* profile,
52+
const std::string& referenced_data_file_path,
53+
const std::vector<TIcebergDeleteFileDesc>& delete_files,
54+
const std::map<std::string, std::string>& hadoop_conf,
55+
TFileType::type file_type,
56+
const std::vector<TNetworkAddress>& broker_addresses,
57+
roaring::Roaring64Map* rows_to_delete) {
58+
if (rows_to_delete == nullptr) {
59+
return Status::InvalidArgument("rows_to_delete is null");
60+
}
61+
if (state == nullptr || profile == nullptr || delete_files.empty()) {
62+
return Status::OK();
63+
}
64+
65+
TFileScanRangeParams params =
66+
build_iceberg_delete_scan_range_params(hadoop_conf, file_type, broker_addresses);
67+
IcebergDeleteFileIOContext delete_file_io_ctx(state);
68+
IcebergDeleteFileReaderOptions options;
69+
options.state = state;
70+
options.profile = profile;
71+
options.scan_params = &params;
72+
options.io_ctx = &delete_file_io_ctx.io_ctx;
73+
options.batch_size = 102400;
74+
75+
for (const auto& delete_file : delete_files) {
76+
if (is_iceberg_deletion_vector(delete_file)) {
77+
RETURN_IF_ERROR(read_iceberg_deletion_vector(delete_file, options, rows_to_delete));
78+
continue;
79+
}
80+
RewriteBitmapVisitor visitor(referenced_data_file_path, rows_to_delete);
81+
RETURN_IF_ERROR(read_iceberg_position_delete_file(delete_file, options, &visitor));
82+
}
83+
return Status::OK();
84+
}
85+
86+
} // namespace doris
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include <gen_cpp/PlanNodes_types.h>
21+
22+
#include <map>
23+
#include <string>
24+
#include <vector>
25+
26+
#include "common/status.h"
27+
#include "roaring/roaring64map.hh"
28+
29+
namespace doris {
30+
31+
class RuntimeState;
32+
class RuntimeProfile;
33+
34+
class VIcebergDeleteRewriteLoader {
35+
public:
36+
static Status load(RuntimeState* state, RuntimeProfile* profile,
37+
const std::string& referenced_data_file_path,
38+
const std::vector<TIcebergDeleteFileDesc>& delete_files,
39+
const std::map<std::string, std::string>& hadoop_conf,
40+
TFileType::type file_type,
41+
const std::vector<TNetworkAddress>& broker_addresses,
42+
roaring::Roaring64Map* rows_to_delete);
43+
};
44+
45+
} // namespace doris

0 commit comments

Comments
 (0)