diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 04a73ca4c..a68424d96 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -94,6 +94,7 @@ set(ICEBERG_SOURCES transform.cc transform_function.cc type.cc + update/delete_files.cc update/expire_snapshots.cc update/fast_append.cc update/merging_snapshot_update.cc diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 15fd5d79d..0d3142eb7 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -116,6 +116,7 @@ iceberg_sources = files( 'transform.cc', 'transform_function.cc', 'type.cc', + 'update/delete_files.cc', 'update/expire_snapshots.cc', 'update/fast_append.cc', 'update/merging_snapshot_update.cc', diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index 1255871c3..fe7df2719 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -31,6 +31,7 @@ #include "iceberg/table_properties.h" #include "iceberg/table_scan.h" #include "iceberg/transaction.h" +#include "iceberg/update/delete_files.h" #include "iceberg/update/expire_snapshots.h" #include "iceberg/update/fast_append.h" #include "iceberg/update/set_snapshot.h" @@ -217,6 +218,12 @@ Result> Table::NewFastAppend() { return FastAppend::Make(name().name, std::move(ctx)); } +Result> Table::NewDeleteFiles() { + ICEBERG_ASSIGN_OR_RAISE( + auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate)); + return DeleteFiles::Make(name().name, std::move(ctx)); +} + Result> Table::NewUpdateStatistics() { ICEBERG_ASSIGN_OR_RAISE( auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate)); diff --git a/src/iceberg/table.h b/src/iceberg/table.h index 8d8849f37..246470b3a 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -176,6 +176,9 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this { /// \brief Create a new FastAppend to append data files and commit the changes. virtual Result> NewFastAppend(); + /// \brief Create a new DeleteFiles to delete data files and commit the changes. + virtual Result> NewDeleteFiles(); + /// \brief Create a new SnapshotManager to manage snapshots and snapshot references. virtual Result> NewSnapshotManager(); diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index e18b63d5c..7a8774c9b 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -215,6 +215,7 @@ if(ICEBERG_BUILD_BUNDLE) add_iceberg_test(table_update_test USE_BUNDLE SOURCES + delete_files_test.cc expire_snapshots_test.cc fast_append_test.cc manifest_filter_manager_test.cc diff --git a/src/iceberg/test/delete_files_test.cc b/src/iceberg/test/delete_files_test.cc new file mode 100644 index 000000000..7c547ac49 --- /dev/null +++ b/src/iceberg/test/delete_files_test.cc @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/delete_files.h" + +#include +#include +#include + +#include +#include + +#include "iceberg/avro/avro_register.h" +#include "iceberg/expression/expressions.h" +#include "iceberg/expression/literal.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/partition_spec.h" +#include "iceberg/row/partition_values.h" +#include "iceberg/schema.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/update_test_base.h" +#include "iceberg/update/fast_append.h" + +namespace iceberg { + +class DeleteFilesTest : public MinimalUpdateTestBase { + protected: + static void SetUpTestSuite() { avro::RegisterAll(); } + + void SetUp() override { + MinimalUpdateTestBase::SetUp(); + + ICEBERG_UNWRAP_OR_FAIL(spec_, table_->spec()); + ICEBERG_UNWRAP_OR_FAIL(schema_, table_->schema()); + file_a_ = MakeDataFile("/data/file_a.parquet", /*partition_x=*/1L); + file_b_ = MakeDataFile("/data/file_b.parquet", /*partition_x=*/2L); + } + + std::shared_ptr MakeDataFile(const std::string& path, int64_t partition_x) { + auto file = std::make_shared(); + file->content = DataFile::Content::kData; + file->file_path = table_location_ + path; + file->file_format = FileFormatType::kParquet; + file->partition = PartitionValues(std::vector{Literal::Long(partition_x)}); + file->file_size_in_bytes = 1024; + file->record_count = 100; + file->partition_spec_id = spec_->spec_id(); + return file; + } + + void SetLongBounds(const std::shared_ptr& file, int32_t field_id, + int64_t lower, int64_t upper) { + ASSERT_NE(file, nullptr); + ICEBERG_UNWRAP_OR_FAIL(auto lower_bound, Literal::Long(lower).Serialize()); + ICEBERG_UNWRAP_OR_FAIL(auto upper_bound, Literal::Long(upper).Serialize()); + file->value_counts[field_id] = file->record_count; + file->null_value_counts[field_id] = 0; + file->lower_bounds[field_id] = lower_bound; + file->upper_bounds[field_id] = upper_bound; + } + + void CommitFiles(const std::vector>& files) { + ICEBERG_UNWRAP_OR_FAIL(auto append, table_->NewFastAppend()); + for (const auto& file : files) { + append->AppendFile(file); + } + ASSERT_THAT(append->Commit(), IsOk()); + ASSERT_THAT(table_->Refresh(), IsOk()); + } + + void CommitInitialFiles() { CommitFiles({file_a_, file_b_}); } + + void ExpectOneFileDeleted() { + ASSERT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kOperation), + DataOperation::kDelete); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles), "1"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kDeletedRecords), "100"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kRemovedFileSize), "1024"); + } + + std::shared_ptr spec_; + std::shared_ptr schema_; + std::shared_ptr file_a_; + std::shared_ptr file_b_; + + static constexpr int32_t kYFieldId = 2; +}; + +TEST_F(DeleteFilesTest, DeleteFileByPath) { + CommitInitialFiles(); + + ICEBERG_UNWRAP_OR_FAIL(auto delete_files, table_->NewDeleteFiles()); + delete_files->DeleteFile(file_a_->file_path); + + EXPECT_THAT(delete_files->Commit(), IsOk()); + ExpectOneFileDeleted(); +} + +TEST_F(DeleteFilesTest, DeleteFileByDataFile) { + CommitInitialFiles(); + + ICEBERG_UNWRAP_OR_FAIL(auto delete_files, table_->NewDeleteFiles()); + delete_files->DeleteFile(file_a_); + + EXPECT_THAT(delete_files->Commit(), IsOk()); + ExpectOneFileDeleted(); +} + +TEST_F(DeleteFilesTest, DeleteFromRowFilterCaseInsensitive) { + CommitInitialFiles(); + + ICEBERG_UNWRAP_OR_FAIL(auto delete_files, table_->NewDeleteFiles()); + delete_files->CaseSensitive(false).DeleteFromRowFilter( + Expressions::Equal("X", Literal::Long(1L))); + + EXPECT_THAT(delete_files->Commit(), IsOk()); + ExpectOneFileDeleted(); +} + +TEST_F(DeleteFilesTest, EmptyDeleteCommit) { + CommitInitialFiles(); + ICEBERG_UNWRAP_OR_FAIL(auto previous_snapshot, table_->current_snapshot()); + + ICEBERG_UNWRAP_OR_FAIL(auto delete_files, table_->NewDeleteFiles()); + + EXPECT_THAT(delete_files->Commit(), IsOk()); + + ASSERT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + ASSERT_TRUE(snapshot->parent_snapshot_id.has_value()); + EXPECT_EQ(snapshot->parent_snapshot_id.value(), previous_snapshot->snapshot_id); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kOperation), + DataOperation::kDelete); + EXPECT_EQ(snapshot->summary.count(SnapshotSummaryFields::kDeletedDataFiles), 0U); + EXPECT_EQ(snapshot->summary.count(SnapshotSummaryFields::kDeletedRecords), 0U); + EXPECT_EQ(snapshot->summary.count(SnapshotSummaryFields::kRemovedFileSize), 0U); +} + +TEST_F(DeleteFilesTest, DeleteFromRowFilter) { + CommitInitialFiles(); + + ICEBERG_UNWRAP_OR_FAIL(auto delete_files, table_->NewDeleteFiles()); + delete_files->DeleteFromRowFilter(Expressions::Equal("x", Literal::Long(1L))); + + EXPECT_THAT(delete_files->Commit(), IsOk()); + ExpectOneFileDeleted(); +} + +TEST_F(DeleteFilesTest, DeleteFromRowFilterRejectsPartialMatchFile) { + auto partial_match_file = MakeDataFile("/data/partial_match.parquet", + /*partition_x=*/1L); + SetLongBounds(partial_match_file, kYFieldId, /*lower=*/0L, /*upper=*/10L); + CommitFiles({partial_match_file}); + ICEBERG_UNWRAP_OR_FAIL(auto previous_snapshot, table_->current_snapshot()); + + ICEBERG_UNWRAP_OR_FAIL(auto delete_files, table_->NewDeleteFiles()); + delete_files->DeleteFromRowFilter(Expressions::Equal("y", Literal::Long(5L))); + + auto status = delete_files->Commit(); + EXPECT_THAT(status, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(status, + HasErrorMessage("Cannot delete file where some, but not all, rows match " + "filter")); + EXPECT_THAT(status, HasErrorMessage(partial_match_file->file_path)); + + ASSERT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->snapshot_id, previous_snapshot->snapshot_id); +} + +TEST_F(DeleteFilesTest, ValidateFilesExistRejectsMissingPath) { + CommitInitialFiles(); + + ICEBERG_UNWRAP_OR_FAIL(auto delete_files, table_->NewDeleteFiles()); + delete_files->DeleteFile(table_location_ + "/data/missing.parquet") + .ValidateFilesExist(); + + auto status = delete_files->Commit(); + EXPECT_THAT(status, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(status, HasErrorMessage("Missing required files to delete")); +} + +} // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 049b0f49d..d6acbf29c 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -32,6 +32,7 @@ #include "iceberg/table_requirement.h" #include "iceberg/table_requirements.h" #include "iceberg/table_update.h" +#include "iceberg/update/delete_files.h" #include "iceberg/update/expire_snapshots.h" #include "iceberg/update/fast_append.h" #include "iceberg/update/pending_update.h" @@ -478,6 +479,13 @@ Result> Transaction::NewFastAppend() { return fast_append; } +Result> Transaction::NewDeleteFiles() { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr delete_files, + DeleteFiles::Make(ctx_->table->name().name, ctx_)); + ICEBERG_RETURN_UNEXPECTED(AddUpdate(delete_files)); + return delete_files; +} + Result> Transaction::NewUpdateStatistics() { ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_statistics, UpdateStatistics::Make(ctx_)); diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 60fe935f3..6ddbb7a32 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -106,6 +106,9 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> NewFastAppend(); + /// \brief Create a new DeleteFiles to delete data files and commit the changes. + Result> NewDeleteFiles(); + /// \brief Create a new SnapshotManager to manage snapshots. Result> NewSnapshotManager(); diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 745c63acb..289606775 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -221,6 +221,7 @@ class Transaction; class TransactionContext; /// \brief Update family. +class DeleteFiles; class ExpireSnapshots; class FastAppend; class PendingUpdate; diff --git a/src/iceberg/update/delete_files.cc b/src/iceberg/update/delete_files.cc new file mode 100644 index 000000000..9759e3eb9 --- /dev/null +++ b/src/iceberg/update/delete_files.cc @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/delete_files.h" + +#include +#include +#include + +#include "iceberg/snapshot.h" +#include "iceberg/transaction.h" +#include "iceberg/util/error_collector.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result> DeleteFiles::Make( + std::string table_name, std::shared_ptr ctx) { + ICEBERG_PRECHECK(!table_name.empty(), "Table name cannot be empty"); + ICEBERG_PRECHECK(ctx != nullptr, "Cannot create DeleteFiles without a context"); + return std::unique_ptr( + new DeleteFiles(std::move(table_name), std::move(ctx))); +} + +DeleteFiles::DeleteFiles(std::string table_name, std::shared_ptr ctx) + : MergingSnapshotUpdate(std::move(table_name), std::move(ctx)) {} + +DeleteFiles& DeleteFiles::DeleteFile(std::string_view path) { + ICEBERG_BUILDER_CHECK(!path.empty(), "Cannot delete an empty file path"); + ICEBERG_BUILDER_RETURN_IF_ERROR(DeleteByPath(path)); + return *this; +} + +DeleteFiles& DeleteFiles::DeleteFile(const std::shared_ptr& file) { + ICEBERG_BUILDER_RETURN_IF_ERROR(DeleteDataFile(file)); + return *this; +} + +DeleteFiles& DeleteFiles::DeleteFromRowFilter(std::shared_ptr expr) { + ICEBERG_BUILDER_RETURN_IF_ERROR(DeleteByRowFilter(std::move(expr))); + return *this; +} + +DeleteFiles& DeleteFiles::CaseSensitive(bool case_sensitive) { + MergingSnapshotUpdate::CaseSensitive(case_sensitive); + return *this; +} + +DeleteFiles& DeleteFiles::ValidateFilesExist() { + validate_files_to_delete_exist_ = true; + return *this; +} + +std::string DeleteFiles::operation() { return DataOperation::kDelete; } + +Status DeleteFiles::Validate(const TableMetadata&, const std::shared_ptr&) { + if (validate_files_to_delete_exist_) { + FailMissingDeletePaths(); + } + return {}; +} + +} // namespace iceberg diff --git a/src/iceberg/update/delete_files.h b/src/iceberg/update/delete_files.h new file mode 100644 index 000000000..1be08e35b --- /dev/null +++ b/src/iceberg/update/delete_files.h @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/update/delete_files.h + +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/merging_snapshot_update.h" + +namespace iceberg { + +/// \brief API for deleting data files from a table. +/// +/// This accumulates data-file deletions, produces a new snapshot, and commits that +/// snapshot as current. File paths are matched exactly against table metadata values; +/// equivalent but differently-normalized URIs are not considered matches. +class ICEBERG_EXPORT DeleteFiles : public MergingSnapshotUpdate { + public: + static Result> Make( + std::string table_name, std::shared_ptr ctx); + + /// \brief Delete a data-file path from the table. + DeleteFiles& DeleteFile(std::string_view path); + + /// \brief Delete a data file tracked by object identity and path. + DeleteFiles& DeleteFile(const std::shared_ptr& file); + + /// \brief Delete files whose rows all match the given expression. + DeleteFiles& DeleteFromRowFilter(std::shared_ptr expr); + + /// \brief Set case sensitivity for expression binding. + DeleteFiles& CaseSensitive(bool case_sensitive); + + /// \brief Validate that explicitly requested deleted files still exist. + DeleteFiles& ValidateFilesExist(); + + std::string operation() override; + + protected: + Status Validate(const TableMetadata& current_metadata, + const std::shared_ptr& snapshot) override; + + private: + DeleteFiles(std::string table_name, std::shared_ptr ctx); + + bool validate_files_to_delete_exist_ = false; +}; + +} // namespace iceberg diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build index 6405f603f..d8409bbf7 100644 --- a/src/iceberg/update/meson.build +++ b/src/iceberg/update/meson.build @@ -17,6 +17,7 @@ install_headers( [ + 'delete_files.h', 'expire_snapshots.h', 'fast_append.h', 'merging_snapshot_update.h',