diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index ea76c641a..9a0dc68b7 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -106,6 +106,7 @@ set(ICEBERG_SOURCES update/merge_append.cc update/merging_snapshot_update.cc update/pending_update.cc + update/row_delta.cc update/set_snapshot.cc update/snapshot_manager.cc update/snapshot_update.cc diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 7bd2e052c..ab514be87 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -131,6 +131,7 @@ iceberg_sources = files( 'update/merge_append.cc', 'update/merging_snapshot_update.cc', 'update/pending_update.cc', + 'update/row_delta.cc', 'update/set_snapshot.cc', 'update/snapshot_manager.cc', 'update/snapshot_update.cc', diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index 817e5917c..afa626964 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -35,6 +35,7 @@ #include "iceberg/update/expire_snapshots.h" #include "iceberg/update/fast_append.h" #include "iceberg/update/merge_append.h" +#include "iceberg/update/row_delta.h" #include "iceberg/update/set_snapshot.h" #include "iceberg/update/snapshot_manager.h" #include "iceberg/update/update_location.h" @@ -231,6 +232,12 @@ Result> Table::NewDeleteFiles() { return DeleteFiles::Make(name().name, std::move(ctx)); } +Result> Table::NewRowDelta() { + ICEBERG_ASSIGN_OR_RAISE( + auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate)); + return RowDelta::Make(name().name, std::move(ctx)); +} + Result> Table::NewUpdateStatistics() { ICEBERG_ASSIGN_OR_RAISE( auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate)); @@ -334,6 +341,14 @@ Result> StaticTable::NewMergeAppend() { return NotSupported("Cannot create a merge append for a static table"); } +Result> StaticTable::NewDeleteFiles() { + return NotSupported("Cannot create delete files for a static table"); +} + +Result> StaticTable::NewRowDelta() { + return NotSupported("Cannot create a row delta for a static table"); +} + Result> StaticTable::NewSnapshotManager() { return NotSupported("Cannot create a snapshot manager for a static table"); } diff --git a/src/iceberg/table.h b/src/iceberg/table.h index b71a1ddbc..c8f6ded08 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -182,6 +182,9 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this { /// \brief Create a new DeleteFiles to delete data files and commit the changes. virtual Result> NewDeleteFiles(); + /// \brief Create a new RowDelta to add rows and row-level deletes. + virtual Result> NewRowDelta(); + /// \brief Create a new SnapshotManager to manage snapshots and snapshot references. virtual Result> NewSnapshotManager(); @@ -251,6 +254,10 @@ class ICEBERG_EXPORT StaticTable : public Table { Result> NewMergeAppend() override; + Result> NewDeleteFiles() override; + + Result> NewRowDelta() override; + Result> NewSnapshotManager() override; private: diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index c681de8c9..e528b1333 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -230,6 +230,7 @@ if(ICEBERG_BUILD_BUNDLE) merge_append_test.cc merging_snapshot_update_test.cc name_mapping_update_test.cc + row_delta_test.cc snapshot_manager_test.cc transaction_test.cc update_location_test.cc diff --git a/src/iceberg/test/row_delta_test.cc b/src/iceberg/test/row_delta_test.cc new file mode 100644 index 000000000..5906e7e39 --- /dev/null +++ b/src/iceberg/test/row_delta_test.cc @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/row_delta.h" + +#include +#include +#include +#include + +#include +#include + +#include "iceberg/avro/avro_register.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/partition_spec.h" +#include "iceberg/row/partition_values.h" +#include "iceberg/schema.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/update_test_base.h" +#include "iceberg/update/delete_files.h" +#include "iceberg/update/fast_append.h" + +namespace iceberg { + +class RowDeltaTest : public MinimalUpdateTestBase { + protected: + static void SetUpTestSuite() { avro::RegisterAll(); } + + void SetUp() override { + MinimalUpdateTestBase::SetUp(); + + ICEBERG_UNWRAP_OR_FAIL(spec_, table_->spec()); + ICEBERG_UNWRAP_OR_FAIL(schema_, table_->schema()); + + file_a_ = MakeDataFile("/data/file_a.parquet", /*partition_x=*/1L); + file_b_ = MakeDataFile("/data/file_b.parquet", /*partition_x=*/2L); + } + + std::shared_ptr MakeDataFile(const std::string& path, int64_t partition_x) { + auto file = std::make_shared(); + file->content = DataFile::Content::kData; + file->file_path = table_location_ + path; + file->file_format = FileFormatType::kParquet; + file->partition = PartitionValues(std::vector{Literal::Long(partition_x)}); + file->file_size_in_bytes = 1024; + file->record_count = 100; + file->partition_spec_id = spec_->spec_id(); + return file; + } + + std::shared_ptr MakeDeleteFile(const std::string& path, int64_t partition_x) { + auto file = MakeDataFile(path, partition_x); + file->content = DataFile::Content::kPositionDeletes; + file->file_size_in_bytes = 256; + file->record_count = 7; + return file; + } + + std::shared_ptr MakeDeletionVector(const std::string& path, + const std::string& referenced_data_file, + int64_t partition_x, + int64_t content_offset = 0) { + auto file = MakeDeleteFile(path, partition_x); + file->file_format = FileFormatType::kPuffin; + file->referenced_data_file = referenced_data_file; + file->content_offset = content_offset; + file->content_size_in_bytes = 10; + return file; + } + + void AppendFileAToTable() { + ICEBERG_UNWRAP_OR_FAIL(auto fast_append, table_->NewFastAppend()); + fast_append->AppendFile(file_a_); + EXPECT_THAT(fast_append->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + void SetTableFormatVersion(int8_t format_version) { + table_->metadata()->format_version = format_version; + } + + std::shared_ptr spec_; + std::shared_ptr schema_; + std::shared_ptr file_a_; + std::shared_ptr file_b_; +}; + +TEST_F(RowDeltaTest, AddRowsCommitsAppendOperation) { + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->AddRows(file_a_); + + EXPECT_THAT(row_delta->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->Operation(), std::make_optional(DataOperation::kAppend)); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDataFiles), "1"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedRecords), "100"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedFileSize), "1024"); +} + +TEST_F(RowDeltaTest, AddDeletesCommitsDeleteOperation) { + auto delete_file = MakeDeleteFile("/delete/file_a_pos_deletes.parquet", + /*partition_x=*/1L); + + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->AddDeletes(delete_file); + + EXPECT_THAT(row_delta->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->Operation(), std::make_optional(DataOperation::kDelete)); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDeleteFiles), "1"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedPosDeleteFiles), "1"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedPosDeletes), "7"); +} + +TEST_F(RowDeltaTest, RemoveRowsCommitsOverwriteOperation) { + AppendFileAToTable(); + + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->RemoveRows(file_a_); + + EXPECT_THAT(row_delta->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->Operation(), std::make_optional(DataOperation::kOverwrite)); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles), "1"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kDeletedRecords), "100"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kRemovedFileSize), "1024"); +} + +TEST_F(RowDeltaTest, RemoveRowsAndAddDeletesCommitsDeleteOperation) { + AppendFileAToTable(); + + auto delete_file = MakeDeleteFile("/delete/file_a_pos_deletes.parquet", + /*partition_x=*/1L); + + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->RemoveRows(file_a_); + row_delta->AddDeletes(delete_file); + + EXPECT_THAT(row_delta->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->Operation(), std::make_optional(DataOperation::kDelete)); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles), "1"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDeleteFiles), "1"); +} + +TEST_F(RowDeltaTest, AddRowsAndRemoveDeletesCommitsAppendOperation) { + auto delete_file = MakeDeleteFile("/delete/file_a_pos_deletes.parquet", + /*partition_x=*/1L); + { + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->AddDeletes(delete_file); + EXPECT_THAT(row_delta->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->AddRows(file_a_); + row_delta->RemoveDeletes(delete_file); + + EXPECT_THAT(row_delta->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->Operation(), std::make_optional(DataOperation::kAppend)); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDataFiles), "1"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kRemovedDeleteFiles), "1"); +} + +TEST_F(RowDeltaTest, AddDeletesAndRemoveDeletesCommitsDeleteOperation) { + auto old_delete_file = MakeDeleteFile("/delete/file_a_pos_deletes.parquet", + /*partition_x=*/1L); + { + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->AddDeletes(old_delete_file); + EXPECT_THAT(row_delta->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + auto new_delete_file = MakeDeleteFile("/delete/file_b_pos_deletes.parquet", + /*partition_x=*/2L); + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->AddDeletes(new_delete_file); + row_delta->RemoveDeletes(old_delete_file); + + EXPECT_THAT(row_delta->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->Operation(), std::make_optional(DataOperation::kDelete)); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDeleteFiles), "1"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kRemovedDeleteFiles), "1"); +} + +TEST_F(RowDeltaTest, ValidateNoConflictingDataFilesFailsForConcurrentAppend) { + AppendFileAToTable(); + ICEBERG_UNWRAP_OR_FAIL(auto starting_snapshot, table_->current_snapshot()); + + ICEBERG_UNWRAP_OR_FAIL(auto concurrent_append, table_->NewFastAppend()); + concurrent_append->AppendFile(file_b_); + EXPECT_THAT(concurrent_append->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + + auto file_c = MakeDataFile("/data/file_c.parquet", /*partition_x=*/3L); + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->ValidateFromSnapshot(starting_snapshot->snapshot_id); + row_delta->ValidateNoConflictingDataFiles(); + row_delta->AddRows(file_c); + + auto result = row_delta->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Found conflicting files")); + EXPECT_THAT(result, HasErrorMessage(file_b_->file_path)); +} + +TEST_F(RowDeltaTest, ValidateNoConflictingDeleteFilesFailsForConcurrentDelete) { + AppendFileAToTable(); + ICEBERG_UNWRAP_OR_FAIL(auto starting_snapshot, table_->current_snapshot()); + + auto delete_file = MakeDeleteFile("/delete/file_a_pos_deletes.parquet", + /*partition_x=*/1L); + std::shared_ptr concurrent_delta; + ICEBERG_UNWRAP_OR_FAIL(concurrent_delta, table_->NewRowDelta()); + concurrent_delta->AddDeletes(delete_file); + EXPECT_THAT(concurrent_delta->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + + auto file_c = MakeDataFile("/data/file_c.parquet", /*partition_x=*/3L); + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->ValidateFromSnapshot(starting_snapshot->snapshot_id); + row_delta->ValidateNoConflictingDeleteFiles(); + row_delta->AddRows(file_c); + + auto result = row_delta->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Found new conflicting delete files")); + EXPECT_THAT(result, HasErrorMessage(delete_file->file_path)); +} + +TEST_F(RowDeltaTest, ValidateDataFilesExistSkipsConcurrentDeleteByDefault) { + AppendFileAToTable(); + ICEBERG_UNWRAP_OR_FAIL(auto starting_snapshot, table_->current_snapshot()); + + ICEBERG_UNWRAP_OR_FAIL(auto delete_files, table_->NewDeleteFiles()); + delete_files->DeleteFile(file_a_); + EXPECT_THAT(delete_files->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + + auto delete_file = MakeDeleteFile("/delete/file_a_pos_deletes.parquet", + /*partition_x=*/1L); + delete_file->referenced_data_file = file_a_->file_path; + + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->ValidateFromSnapshot(starting_snapshot->snapshot_id); + std::vector referenced_files{file_a_->file_path}; + row_delta->ValidateDataFilesExist(referenced_files); + row_delta->AddDeletes(delete_file); + + EXPECT_THAT(row_delta->Commit(), IsOk()); +} + +TEST_F(RowDeltaTest, + ValidateDataFilesExistFailsForConcurrentDeleteWithValidateDeletedFiles) { + AppendFileAToTable(); + ICEBERG_UNWRAP_OR_FAIL(auto starting_snapshot, table_->current_snapshot()); + + ICEBERG_UNWRAP_OR_FAIL(auto delete_files, table_->NewDeleteFiles()); + delete_files->DeleteFile(file_a_); + EXPECT_THAT(delete_files->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + + auto delete_file = MakeDeleteFile("/delete/file_a_pos_deletes.parquet", + /*partition_x=*/1L); + delete_file->referenced_data_file = file_a_->file_path; + + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->ValidateFromSnapshot(starting_snapshot->snapshot_id); + std::vector referenced_files{file_a_->file_path}; + row_delta->ValidateDataFilesExist(referenced_files); + row_delta->ValidateDeletedFiles(); + row_delta->AddDeletes(delete_file); + + auto result = row_delta->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Cannot commit, missing data files")); + EXPECT_THAT(result, HasErrorMessage(file_a_->file_path)); +} + +TEST_F(RowDeltaTest, CannotRemoveReferencedDataFile) { + AppendFileAToTable(); + + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + std::vector referenced_files{file_a_->file_path}; + row_delta->ValidateDataFilesExist(referenced_files); + row_delta->RemoveRows(file_a_); + + auto result = row_delta->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Cannot delete data files")); + EXPECT_THAT(result, HasErrorMessage(file_a_->file_path)); +} + +TEST_F(RowDeltaTest, AddDeleteFileForRemovedDataFileCommitsDeleteOperation) { + AppendFileAToTable(); + + auto delete_file = MakeDeleteFile("/delete/file_a_pos_deletes.parquet", + /*partition_x=*/1L); + delete_file->referenced_data_file = file_a_->file_path; + + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->RemoveRows(file_a_); + row_delta->AddDeletes(delete_file); + + EXPECT_THAT(row_delta->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->Operation(), std::make_optional(DataOperation::kDelete)); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles), "1"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDeleteFiles), "1"); +} + +TEST_F(RowDeltaTest, ValidateDeletedFilesAllowsMissingRowsOnEmptyTable) { + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->ValidateDeletedFiles(); + row_delta->RemoveRows(file_a_); + + EXPECT_THAT(row_delta->Commit(), IsOk()); +} + +TEST_F(RowDeltaTest, ValidateDeletedFilesAllowsMissingDeletesOnEmptyTable) { + auto delete_file = MakeDeleteFile("/delete/file_a_pos_deletes.parquet", + /*partition_x=*/1L); + + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->ValidateDeletedFiles(); + row_delta->RemoveDeletes(delete_file); + + EXPECT_THAT(row_delta->Commit(), IsOk()); +} + +TEST_F(RowDeltaTest, AddDeletionVectorValidatesConcurrentDVs) { + AppendFileAToTable(); + ICEBERG_UNWRAP_OR_FAIL(auto starting_snapshot, table_->current_snapshot()); + SetTableFormatVersion(3); + + auto concurrent_dv = + MakeDeletionVector("/delete/concurrent-dv-a.puffin", file_a_->file_path, + /*partition_x=*/1L, /*content_offset=*/0); + std::shared_ptr concurrent_delta; + ICEBERG_UNWRAP_OR_FAIL(concurrent_delta, table_->NewRowDelta()); + concurrent_delta->AddDeletes(concurrent_dv); + EXPECT_THAT(concurrent_delta->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + SetTableFormatVersion(3); + + auto dv = MakeDeletionVector("/delete/dv-a.puffin", file_a_->file_path, + /*partition_x=*/1L, /*content_offset=*/10); + std::shared_ptr row_delta; + ICEBERG_UNWRAP_OR_FAIL(row_delta, table_->NewRowDelta()); + row_delta->ValidateFromSnapshot(starting_snapshot->snapshot_id); + row_delta->AddDeletes(dv); + + auto result = row_delta->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Found concurrently added DV")); + EXPECT_THAT(result, HasErrorMessage(file_a_->file_path)); +} + +} // namespace iceberg diff --git a/src/iceberg/test/table_test.cc b/src/iceberg/test/table_test.cc index 881c4fdd0..d7ebe4a0a 100644 --- a/src/iceberg/test/table_test.cc +++ b/src/iceberg/test/table_test.cc @@ -161,6 +161,8 @@ TEST(StaticTableTest, NewMutatingOperationsAreNotSupported) { EXPECT_THAT(table->NewUpdatePartitionStatistics(), IsError(ErrorKind::kNotSupported)); EXPECT_THAT(table->NewFastAppend(), IsError(ErrorKind::kNotSupported)); EXPECT_THAT(table->NewMergeAppend(), IsError(ErrorKind::kNotSupported)); + EXPECT_THAT(table->NewDeleteFiles(), IsError(ErrorKind::kNotSupported)); + EXPECT_THAT(table->NewRowDelta(), IsError(ErrorKind::kNotSupported)); EXPECT_THAT(table->NewSnapshotManager(), IsError(ErrorKind::kNotSupported)); } diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index ac1f08241..e911a61dc 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -37,6 +37,7 @@ #include "iceberg/update/fast_append.h" #include "iceberg/update/merge_append.h" #include "iceberg/update/pending_update.h" +#include "iceberg/update/row_delta.h" #include "iceberg/update/set_snapshot.h" #include "iceberg/update/snapshot_manager.h" #include "iceberg/update/snapshot_update.h" @@ -505,6 +506,13 @@ Result> Transaction::NewDeleteFiles() { return delete_files; } +Result> Transaction::NewRowDelta() { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr row_delta, + RowDelta::Make(ctx_->table->name().name, ctx_)); + ICEBERG_RETURN_UNEXPECTED(AddUpdate(row_delta)); + return row_delta; +} + Result> Transaction::NewUpdateStatistics() { ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_statistics, UpdateStatistics::Make(ctx_)); diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 52a0605c6..34ca78bd7 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -112,6 +112,9 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> NewDeleteFiles(); + /// \brief Create a new RowDelta to add rows and row-level deletes. + Result> NewRowDelta(); + /// \brief Create a new SnapshotManager to manage snapshots. Result> NewSnapshotManager(); diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 0320f24ea..f29bc4a1a 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -243,6 +243,7 @@ class ExpireSnapshots; class FastAppend; class MergeAppend; class PendingUpdate; +class RowDelta; class SetSnapshot; class SnapshotManager; class SnapshotUpdate; diff --git a/src/iceberg/update/merging_snapshot_update.h b/src/iceberg/update/merging_snapshot_update.h index 879403222..fc3987ee1 100644 --- a/src/iceberg/update/merging_snapshot_update.h +++ b/src/iceberg/update/merging_snapshot_update.h @@ -288,6 +288,14 @@ class ICEBERG_EXPORT MergingSnapshotUpdate : public SnapshotUpdate { const std::shared_ptr& parent, std::shared_ptr io, bool case_sensitive = true); + /// \brief Return an error if a staged deletion vector conflicts with a deletion + /// vector added since starting_snapshot_id. + Status ValidateAddedDVs(const TableMetadata& metadata, + std::optional starting_snapshot_id, + std::shared_ptr conflict_filter, + const std::shared_ptr& parent, + std::shared_ptr io) const; + private: struct PendingDeleteFile { std::shared_ptr file; @@ -324,12 +332,6 @@ class ICEBERG_EXPORT MergingSnapshotUpdate : public SnapshotUpdate { Status AddDeleteFile(std::shared_ptr file, std::optional data_sequence_number); - Status ValidateAddedDVs(const TableMetadata& metadata, - std::optional starting_snapshot_id, - std::shared_ptr conflict_filter, - const std::shared_ptr& parent, - std::shared_ptr io) const; - Status ManagersReady() const; void SetSummaryProperty(const std::string& property, const std::string& value) override; diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build index 9f950e8d0..4f594a06e 100644 --- a/src/iceberg/update/meson.build +++ b/src/iceberg/update/meson.build @@ -23,6 +23,7 @@ install_headers( 'merge_append.h', 'merging_snapshot_update.h', 'pending_update.h', + 'row_delta.h', 'set_snapshot.h', 'snapshot_manager.h', 'snapshot_update.h', diff --git a/src/iceberg/update/row_delta.cc b/src/iceberg/update/row_delta.cc new file mode 100644 index 000000000..dd3f50c58 --- /dev/null +++ b/src/iceberg/update/row_delta.cc @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/row_delta.h" + +#include +#include +#include +#include +#include + +#include "iceberg/expression/expressions.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" +#include "iceberg/util/error_collector.h" +#include "iceberg/util/formatter_internal.h" +#include "iceberg/util/macros.h" +#include "iceberg/util/snapshot_util_internal.h" + +namespace iceberg { + +Result> RowDelta::Make( + std::string table_name, std::shared_ptr ctx) { + ICEBERG_PRECHECK(!table_name.empty(), "Table name cannot be empty"); + ICEBERG_PRECHECK(ctx != nullptr, "Cannot create RowDelta without a context"); + return std::unique_ptr(new RowDelta(std::move(table_name), std::move(ctx))); +} + +RowDelta::RowDelta(std::string table_name, std::shared_ptr ctx) + : MergingSnapshotUpdate(std::move(table_name), std::move(ctx)), + conflict_detection_filter_(Expressions::AlwaysTrue()) {} + +RowDelta& RowDelta::AddRows(const std::shared_ptr& inserts) { + ICEBERG_BUILDER_RETURN_IF_ERROR(AddDataFile(inserts)); + return *this; +} + +RowDelta& RowDelta::AddDeletes(const std::shared_ptr& deletes) { + ICEBERG_BUILDER_RETURN_IF_ERROR(AddDeleteFile(deletes)); + return *this; +} + +RowDelta& RowDelta::RemoveRows(const std::shared_ptr& file) { + ICEBERG_BUILDER_RETURN_IF_ERROR(DeleteDataFile(file)); + removed_data_files_.insert(file); + return *this; +} + +RowDelta& RowDelta::RemoveDeletes(const std::shared_ptr& deletes) { + ICEBERG_BUILDER_RETURN_IF_ERROR(DeleteDeleteFile(deletes)); + return *this; +} + +RowDelta& RowDelta::ValidateFromSnapshot(int64_t snapshot_id) { + starting_snapshot_id_ = snapshot_id; + return *this; +} + +RowDelta& RowDelta::CaseSensitive(bool case_sensitive) { + MergingSnapshotUpdate::CaseSensitive(case_sensitive); + return *this; +} + +RowDelta& RowDelta::ValidateDataFilesExist( + std::span referenced_files) { + for (const auto& file : referenced_files) { + referenced_data_files_.insert(file); + } + return *this; +} + +RowDelta& RowDelta::ValidateDeletedFiles() { + validate_deletes_ = true; + return *this; +} + +RowDelta& RowDelta::ConflictDetectionFilter(std::shared_ptr filter) { + ICEBERG_BUILDER_CHECK(filter != nullptr, "Conflict detection filter cannot be null"); + conflict_detection_filter_ = std::move(filter); + return *this; +} + +RowDelta& RowDelta::ValidateNoConflictingDataFiles() { + validate_new_data_files_ = true; + return *this; +} + +RowDelta& RowDelta::ValidateNoConflictingDeleteFiles() { + validate_new_delete_files_ = true; + return *this; +} + +std::string RowDelta::operation() { + if (AddsDataFiles() && !AddsDeleteFiles() && !DeletesDataFiles()) { + return DataOperation::kAppend; + } + + if (AddsDeleteFiles() && !AddsDataFiles()) { + return DataOperation::kDelete; + } + + return DataOperation::kOverwrite; +} + +Status RowDelta::Validate(const TableMetadata& current_metadata, + const std::shared_ptr& snapshot) { + if (snapshot == nullptr) { + return {}; + } + + if (validate_deletes_) { + FailMissingDeletePaths(); + } + + if (starting_snapshot_id_.has_value()) { + ICEBERG_ASSIGN_OR_RAISE(bool is_ancestor, SnapshotUtil::IsAncestorOf( + current_metadata, snapshot->snapshot_id, + starting_snapshot_id_.value())); + ICEBERG_CHECK(is_ancestor, "Snapshot {} is not an ancestor of {}", + starting_snapshot_id_.value(), snapshot->snapshot_id); + } + + auto io = ctx_->table->io(); + if (!referenced_data_files_.empty()) { + ICEBERG_RETURN_UNEXPECTED(MergingSnapshotUpdate::ValidateDataFilesExist( + current_metadata, starting_snapshot_id_, referenced_data_files_, + /*skip_deletes=*/!validate_deletes_, conflict_detection_filter_, snapshot, io, + IsCaseSensitive())); + } + + if (validate_new_data_files_) { + ICEBERG_RETURN_UNEXPECTED(MergingSnapshotUpdate::ValidateAddedDataFiles( + current_metadata, starting_snapshot_id_, conflict_detection_filter_, snapshot, io, + IsCaseSensitive())); + } + + if (validate_new_delete_files_) { + // validate that explicitly deleted files have not had added deletes + if (!removed_data_files_.empty()) { + ICEBERG_RETURN_UNEXPECTED(MergingSnapshotUpdate::ValidateNoNewDeletesForDataFiles( + current_metadata, starting_snapshot_id_, conflict_detection_filter_, + removed_data_files_, snapshot, io, IsCaseSensitive())); + } + + // validate that previous deletes do not conflict with added deletes + ICEBERG_RETURN_UNEXPECTED(MergingSnapshotUpdate::ValidateNoNewDeleteFiles( + current_metadata, starting_snapshot_id_, conflict_detection_filter_, snapshot, io, + IsCaseSensitive())); + } + + ICEBERG_RETURN_UNEXPECTED(ValidateNoConflictingFileAndPositionDeletes()); + + return MergingSnapshotUpdate::ValidateAddedDVs( + current_metadata, starting_snapshot_id_, conflict_detection_filter_, snapshot, io); +} + +Status RowDelta::ValidateNoConflictingFileAndPositionDeletes() const { + std::vector conflicting_files; + for (const auto& file : removed_data_files_) { + if (file != nullptr && referenced_data_files_.contains(file->file_path)) { + conflicting_files.push_back(file->file_path); + } + } + + if (!conflicting_files.empty()) { + return ValidationFailed( + "Cannot delete data files {} that are referenced by new delete files", + FormatRange(conflicting_files, ", ", "[", "]")); + } + + return {}; +} + +} // namespace iceberg diff --git a/src/iceberg/update/row_delta.h b/src/iceberg/update/row_delta.h new file mode 100644 index 000000000..ddb54d836 --- /dev/null +++ b/src/iceberg/update/row_delta.h @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/update/row_delta.h + +#include +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/merging_snapshot_update.h" +#include "iceberg/util/data_file_set.h" + +namespace iceberg { + +/// \brief API for encoding row-level changes to a table. +/// +/// This API accumulates data and delete file changes, produces a new Snapshot +/// of the table, and commits that snapshot as current. +/// +/// When committing, these changes are applied to the latest table snapshot. +/// Commit conflicts are resolved by applying the changes to the new latest +/// snapshot and reattempting the commit. +class ICEBERG_EXPORT RowDelta : public MergingSnapshotUpdate { + public: + /// \brief Create a new RowDelta instance. + static Result> Make(std::string table_name, + std::shared_ptr ctx); + + /// \brief Add a data file to the table. + /// + /// \param inserts A data file of rows to insert. + /// \return This RowDelta for method chaining. + RowDelta& AddRows(const std::shared_ptr& inserts); + + /// \brief Add a delete file to the table. + /// + /// \param deletes A delete file of rows to delete. + /// \return This RowDelta for method chaining. + RowDelta& AddDeletes(const std::shared_ptr& deletes); + + /// \brief Remove a data file from the table. + /// + /// \param file A data file. + /// \return This RowDelta for method chaining. + RowDelta& RemoveRows(const std::shared_ptr& file); + + /// \brief Remove a rewritten delete file from the table. + /// + /// \param deletes A delete file that can be removed from the table. + /// \return This RowDelta for method chaining. + RowDelta& RemoveDeletes(const std::shared_ptr& deletes); + + /// \brief Set the snapshot ID used in any reads for this operation. + /// + /// Validations check changes after this snapshot ID. If the from snapshot is + /// not set, all ancestor snapshots through the table's initial snapshot are + /// validated. + /// + /// \param snapshot_id A snapshot ID. + /// \return This RowDelta for method chaining. + RowDelta& ValidateFromSnapshot(int64_t snapshot_id); + + /// \brief Enable or disable case-sensitive expression binding for validations. + /// + /// \param case_sensitive Whether expression binding should be case sensitive. + /// \return This RowDelta for method chaining. + RowDelta& CaseSensitive(bool case_sensitive); + + /// \brief Add data file paths that must not be removed by conflicting commits. + /// + /// If any path has been removed by a conflicting commit in the table since + /// the snapshot passed to ValidateFromSnapshot(), the operation fails. + /// + /// By default, this validation checks only rewrite and overwrite commits. To + /// apply validation to delete commits, call ValidateDeletedFiles(). + /// + /// \param referenced_files File paths that are referenced by a position + /// delete file. + /// \return This RowDelta for method chaining. + RowDelta& ValidateDataFilesExist(std::span referenced_files); + + /// \brief Enable validation that referenced data files were not deleted. + /// + /// If a data file has a row deleted using a position delete file, rewriting + /// or overwriting the data file concurrently would un-delete the row. Deleting + /// the data file is normally allowed, but a delete may be part of a + /// transaction that reads and re-appends a row. This method is used to + /// validate deletes for the transaction case. + /// + /// \return This RowDelta for method chaining. + RowDelta& ValidateDeletedFiles(); + + /// \brief Set a conflict detection filter used to validate added files. + /// + /// If not called, a true literal is used as the conflict detection filter. + /// + /// \param filter An expression on rows in the table. + /// \return This RowDelta for method chaining. + RowDelta& ConflictDetectionFilter(std::shared_ptr filter); + + /// \brief Enable validation that concurrent data files do not conflict. + /// + /// This method should be called when the table is queried to determine which + /// files to delete or append. If a concurrent operation commits a new file + /// after the data was read and that file might contain rows matching the + /// conflict detection filter, this operation detects that during retries and + /// fails. + /// + /// Calling this method is required to maintain serializable isolation for + /// update/delete operations. Otherwise, the isolation level is snapshot + /// isolation. + /// + /// Validation uses the filter passed to ConflictDetectionFilter() and applies + /// to operations after the snapshot passed to ValidateFromSnapshot(). + /// + /// \return This RowDelta for method chaining. + RowDelta& ValidateNoConflictingDataFiles(); + + /// \brief Enable validation that concurrent delete files do not conflict. + /// + /// This method must be called when the table is queried to produce a row + /// delta for UPDATE and MERGE operations independently of the isolation level. + /// Calling this method is not required for DELETE operations because it is OK + /// to delete a record that is also deleted concurrently. + /// + /// Validation uses the filter passed to ConflictDetectionFilter() and applies + /// to operations after the snapshot passed to ValidateFromSnapshot(). + /// + /// \return This RowDelta for method chaining. + RowDelta& ValidateNoConflictingDeleteFiles(); + + std::string operation() override; + + protected: + Status Validate(const TableMetadata& current_metadata, + const std::shared_ptr& snapshot) override; + + private: + explicit RowDelta(std::string table_name, std::shared_ptr ctx); + + Status ValidateNoConflictingFileAndPositionDeletes() const; + + std::optional starting_snapshot_id_; + std::unordered_set referenced_data_files_; + DataFileSet removed_data_files_; + bool validate_deletes_ = false; + std::shared_ptr conflict_detection_filter_; + bool validate_new_data_files_ = false; + bool validate_new_delete_files_ = false; +}; + +} // namespace iceberg