diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 0fe418d48..f710d0027 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -97,6 +97,7 @@ set(ICEBERG_SOURCES update/expire_snapshots.cc update/fast_append.cc update/merging_snapshot_update.cc + update/overwrite_files.cc update/pending_update.cc update/set_snapshot.cc update/snapshot_manager.cc diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 8c4b49e9b..a8d4ced3e 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -119,6 +119,7 @@ iceberg_sources = files( 'update/expire_snapshots.cc', 'update/fast_append.cc', 'update/merging_snapshot_update.cc', + 'update/overwrite_files.cc', 'update/pending_update.cc', 'update/set_snapshot.cc', 'update/snapshot_manager.cc', diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index 1255871c3..64597468e 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -33,6 +33,7 @@ #include "iceberg/transaction.h" #include "iceberg/update/expire_snapshots.h" #include "iceberg/update/fast_append.h" +#include "iceberg/update/overwrite_files.h" #include "iceberg/update/set_snapshot.h" #include "iceberg/update/snapshot_manager.h" #include "iceberg/update/update_location.h" @@ -217,6 +218,12 @@ Result> Table::NewFastAppend() { return FastAppend::Make(name().name, std::move(ctx)); } +Result> Table::NewOverwrite() { + ICEBERG_ASSIGN_OR_RAISE( + auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate)); + return OverwriteFiles::Make(name().name, std::move(ctx)); +} + Result> Table::NewUpdateStatistics() { ICEBERG_ASSIGN_OR_RAISE( auto ctx, TransactionContext::Make(shared_from_this(), TransactionKind::kUpdate)); diff --git a/src/iceberg/table.h b/src/iceberg/table.h index 8d8849f37..54c40a29c 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -176,6 +176,9 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this { /// \brief Create a new FastAppend to append data files and commit the changes. virtual Result> NewFastAppend(); + /// \brief Create a new OverwriteFiles to overwrite data files and commit the changes. + virtual Result> NewOverwrite(); + /// \brief Create a new SnapshotManager to manage snapshots and snapshot references. virtual Result> NewSnapshotManager(); diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 0e8f03150..ac14656a1 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -232,7 +232,8 @@ if(ICEBERG_BUILD_BUNDLE) update_properties_test.cc update_schema_test.cc update_sort_order_test.cc - update_statistics_test.cc) + update_statistics_test.cc + overwrite_files_test.cc) add_iceberg_test(data_test USE_BUNDLE diff --git a/src/iceberg/test/overwrite_files_test.cc b/src/iceberg/test/overwrite_files_test.cc new file mode 100644 index 000000000..7c74cc4cd --- /dev/null +++ b/src/iceberg/test/overwrite_files_test.cc @@ -0,0 +1,1094 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/overwrite_files.h" + +#include +#include +#include +#include + +#include +#include + +#include "iceberg/avro/avro_register.h" +#include "iceberg/constants.h" +#include "iceberg/expression/expressions.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/partition_field.h" +#include "iceberg/partition_spec.h" +#include "iceberg/row/partition_values.h" +#include "iceberg/schema.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/update_test_base.h" +#include "iceberg/transaction.h" +#include "iceberg/transform.h" +#include "iceberg/update/fast_append.h" +#include "iceberg/util/data_file_set.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +// ===================================================================================== +// Test harness for OverwriteFiles. +// +// Modeled on src/iceberg/test/merging_snapshot_update_test.cc (same fixture style, +// same in-memory mock FileIO / catalog setup, same DataFile / commit helpers). Unlike +// that file, OverwriteFiles is the production class with a private constructor, so the +// tests drive it exclusively through its public builder surface (AddFile / DeleteFile / +// OverwriteByRowFilter / ... / operation() / Validate() / Commit()) and observe its +// behavior through the public API: operation() classification, the committed snapshot +// summary, and the public Validate(...) entry point that the commit kernel invokes. +// +// The base table (TableMetadataV2ValidMinimal.json) has schema {x: long (id 1), +// y: long (id 2), z: long (id 3)} and a single partition spec (spec id 0) that +// partitions by identity(x). +// ===================================================================================== +class OverwriteFilesTest : public UpdateTestBase { + protected: + static void SetUpTestSuite() { avro::RegisterAll(); } + + std::string MetadataResource() const override { + return "TableMetadataV2ValidMinimal.json"; + } + + void SetUp() override { + UpdateTestBase::SetUp(); + + ICEBERG_UNWRAP_OR_FAIL(spec_, table_->spec()); + ICEBERG_UNWRAP_OR_FAIL(schema_, table_->schema()); + + file_a_ = MakeDataFile("/data/file_a.parquet", /*partition_x=*/1L); + file_b_ = MakeDataFile("/data/file_b.parquet", /*partition_x=*/2L); + } + + // A plain data file in spec 0 (identity(x)) with the given partition value for x. + std::shared_ptr MakeDataFile(const std::string& path, int64_t partition_x, + int64_t record_count = 100) { + auto f = std::make_shared(); + f->content = DataFile::Content::kData; + f->file_path = table_location_ + path; + f->file_format = FileFormatType::kParquet; + f->partition = PartitionValues(std::vector{Literal::Long(partition_x)}); + f->file_size_in_bytes = 1024; + f->record_count = record_count; + f->partition_spec_id = spec_->spec_id(); + return f; + } + + // A data file carrying column metrics for column y (field id 2): lower/upper bounds + // plus value/null counts, so the StrictMetricsEvaluator can reason about it. + std::shared_ptr MakeDataFileWithYBounds(const std::string& path, + int64_t partition_x, int64_t y_lower, + int64_t y_upper) { + auto f = MakeDataFile(path, partition_x); + f->lower_bounds = {{2, Literal::Long(y_lower).Serialize().value()}}; + f->upper_bounds = {{2, Literal::Long(y_upper).Serialize().value()}}; + f->value_counts = {{2, f->record_count}}; + f->null_value_counts = {{2, 0}}; + return f; + } + + std::shared_ptr MakeDeleteFile(const std::string& path, int64_t partition_x) { + auto f = MakeDataFile(path, partition_x); + f->content = DataFile::Content::kPositionDeletes; + return f; + } + + // An equality delete file in spec 0 (partition x = partition_x). + std::shared_ptr MakeEqualityDeleteFile(const std::string& path, + int64_t partition_x) { + auto f = MakeDeleteFile(path, partition_x); + f->content = DataFile::Content::kEqualityDeletes; + f->equality_ids = {1}; + return f; + } + + // Write a delete manifest containing the given delete files, with the snapshot id and + // sequence number assigned on each entry (so the manifest list writer does not need to + // inherit them). + Result WriteDeleteManifest( + const std::string& path, const std::vector>& files, + int64_t snapshot_id, int64_t sequence_number) { + ICEBERG_ASSIGN_OR_RAISE( + auto writer, + ManifestWriter::MakeWriter(/*format_version=*/2, snapshot_id, path, file_io_, + spec_, schema_, ManifestContent::kDeletes)); + for (const auto& f : files) { + ManifestEntry entry; + entry.status = ManifestStatus::kAdded; + entry.snapshot_id = snapshot_id; + entry.sequence_number = sequence_number; + entry.data_file = f; + ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry)); + } + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + return writer->ToManifestFile(); + } + + // Build a synthetic snapshot from the given manifests (mirrors the merging-update test + // harness). Used to inject a concurrent commit between read and validate. + Result> MakeSyntheticSnapshot( + std::string operation, int64_t snapshot_id, + std::optional parent_snapshot_id, int64_t sequence_number, + const std::vector& manifests) { + auto manifest_list_path = table_location_ + "/metadata/manifest-list-" + + std::to_string(snapshot_id) + ".avro"; + ICEBERG_ASSIGN_OR_RAISE( + auto writer, + ManifestListWriter::MakeWriter(table_->metadata()->format_version, snapshot_id, + parent_snapshot_id, manifest_list_path, file_io_, + sequence_number)); + ICEBERG_RETURN_UNEXPECTED(writer->AddAll(manifests)); + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + + ICEBERG_ASSIGN_OR_RAISE( + auto snapshot, + Snapshot::Make(sequence_number, snapshot_id, parent_snapshot_id, TimePointMs{}, + std::move(operation), {}, table_->metadata()->current_schema_id, + manifest_list_path)); + return std::shared_ptr(std::move(snapshot)); + } + + // Inject a concurrent snapshot that adds an equality delete file covering partition + // `partition_x`, layered on top of `parent`. Returns the metadata containing the new + // snapshot (as current) plus the new snapshot itself, so a subsequent Validate(...) can + // scan the range (parent, new] for new deletes. + struct ConcurrentDelete { + std::shared_ptr metadata; + std::shared_ptr snapshot; + }; + ConcurrentDelete InjectConcurrentEqualityDelete(const std::shared_ptr& parent, + const std::string& delete_path, + int64_t partition_x) { + ConcurrentDelete out; + auto del_file = MakeEqualityDeleteFile(delete_path, partition_x); + const int64_t new_snapshot_id = parent->snapshot_id + 1000; + const int64_t new_sequence_number = parent->sequence_number + 1; + auto manifest = + WriteDeleteManifest(table_location_ + "/metadata/concurrent-del-manifest.avro", + {del_file}, new_snapshot_id, new_sequence_number); + EXPECT_TRUE(manifest.has_value()); + auto snap = MakeSyntheticSnapshot(DataOperation::kOverwrite, new_snapshot_id, + parent->snapshot_id, new_sequence_number, + {manifest.value()}); + if (!snap.has_value()) { + ADD_FAILURE() << "MakeSyntheticSnapshot failed: " << snap.error().message; + } + EXPECT_TRUE(snap.has_value()); + out.snapshot = snap.value(); + out.metadata = std::make_shared(*table_->metadata()); + out.metadata->snapshots.push_back(out.snapshot); + out.metadata->current_snapshot_id = out.snapshot->snapshot_id; + out.metadata->last_sequence_number = out.snapshot->sequence_number; + return out; + } + + Result> NewOverwrite() { + return table_->NewOverwrite(); + } + + // Commit file_a_ with FastAppend and refresh the table; returns its snapshot id. + int64_t CommitFileA() { + auto fa = table_->NewFastAppend(); + EXPECT_TRUE(fa.has_value()); + fa.value()->AppendFile(file_a_); + EXPECT_THAT(fa.value()->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + auto snap = table_->current_snapshot(); + EXPECT_TRUE(snap.has_value()); + return snap.value()->snapshot_id; + } + + // Append a single file via FastAppend and refresh; returns the new snapshot. + std::shared_ptr CommitFastAppend(const std::shared_ptr& file) { + auto fa = table_->NewFastAppend(); + EXPECT_TRUE(fa.has_value()); + fa.value()->AppendFile(file); + EXPECT_THAT(fa.value()->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + auto snap = table_->current_snapshot(); + EXPECT_TRUE(snap.has_value()); + return snap.value(); + } + + std::shared_ptr spec_; + std::shared_ptr schema_; + std::shared_ptr file_a_; + std::shared_ptr file_b_; +}; + +// ===================================================================================== +// 9.2 Entry-point and builder-method tests +// ===================================================================================== + +// Req 1.1: Table::NewOverwrite() returns a valid builder for a standalone operation. +TEST_F(OverwriteFilesTest, TableNewOverwriteReturnsBuilder) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + ASSERT_NE(op, nullptr); +} + +// Req 1.2, 1.3: Transaction::NewOverwrite() returns a valid builder registered with the +// transaction (commit deferred until the transaction commits). +TEST_F(OverwriteFilesTest, TransactionNewOverwriteReturnsBuilder) { + ICEBERG_UNWRAP_OR_FAIL(auto txn, Transaction::Make(table_, TransactionKind::kUpdate)); + ICEBERG_UNWRAP_OR_FAIL(auto op, txn->NewOverwrite()); + ASSERT_NE(op, nullptr); + + (*op).OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))).AddFile(file_a_); + + // Within a transaction, the builder is staged by its own Commit() before the + // transaction is committed. + EXPECT_THAT(op->Commit(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto committed, txn->Commit()); + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kOperation), + DataOperation::kOverwrite); +} + +// Builder methods all return *this and chain (Req 2.1-2.3, 3.1, 4.1-4.2, 6.1, 6.3, 6.4). +TEST_F(OverwriteFilesTest, BuilderMethodsReturnSelfAndChain) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + auto* self = op.get(); + + EXPECT_EQ(&op->AddFile(file_a_), self); + EXPECT_EQ(&op->DeleteFile(file_b_), self); + EXPECT_EQ(&op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))), self); + EXPECT_EQ(&op->ValidateFromSnapshot(42), self); + EXPECT_EQ(&op->ConflictDetectionFilter(Expressions::Equal("x", Literal::Long(1L))), + self); + EXPECT_EQ(&op->ValidateNoConflictingData(), self); + EXPECT_EQ(&op->ValidateNoConflictingDeletes(), self); + EXPECT_EQ(&op->ValidateAddedFilesMatchOverwriteFilter(), self); + EXPECT_EQ(&op->WithCaseSensitivity(false), self); + + // A single fluent chain compiles and returns the same instance. + OverwriteFiles& chained = + (*op) + .AddFile(MakeDataFile("/data/chain.parquet", 1L)) + .OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))) + .ValidateNoConflictingData(); + EXPECT_EQ(&chained, self); +} + +// ===================================================================================== +// 9.3 operation() truth-table tests (Req 5.1-5.4) +// ===================================================================================== + +TEST_F(OverwriteFilesTest, OperationAddOnlyIsAppend) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->AddFile(file_a_); + EXPECT_EQ(op->operation(), DataOperation::kAppend); +} + +TEST_F(OverwriteFilesTest, OperationDeleteOnlyIsDelete) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFile(file_a_); + EXPECT_EQ(op->operation(), DataOperation::kDelete); +} + +TEST_F(OverwriteFilesTest, OperationAddAndDeleteIsOverwrite) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFile(file_a_); + op->AddFile(file_b_); + EXPECT_EQ(op->operation(), DataOperation::kOverwrite); +} + +TEST_F(OverwriteFilesTest, OperationPureRowFilterAndAddIsOverwrite) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->AddFile(file_b_); + EXPECT_EQ(op->operation(), DataOperation::kOverwrite); +} + +TEST_F(OverwriteFilesTest, OperationNeitherIsOverwrite) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + EXPECT_EQ(op->operation(), DataOperation::kOverwrite); +} + +// ===================================================================================== +// 9.4 Commit-path and snapshot-control tests (Req 11.2-11.5) +// ===================================================================================== + +// DeleteFile + AddFile commits as overwrite and the recorded operation matches +// operation() (Req 11.4). +TEST_F(OverwriteFilesTest, CommitDeleteAndAddIsOverwrite) { + CommitFileA(); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFile(file_a_); + op->AddFile(file_b_); + const std::string expected_operation = op->operation(); + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kOperation), expected_operation); + EXPECT_EQ(expected_operation, DataOperation::kOverwrite); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDataFiles), "1"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles), "1"); +} + +// OverwriteByRowFilter + AddFile commits as overwrite. +TEST_F(OverwriteFilesTest, CommitRowFilterAndAddIsOverwrite) { + CommitFileA(); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->AddFile(MakeDataFile("/data/new_x1.parquet", 1L)); + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kOperation), + DataOperation::kOverwrite); +} + +// An empty overwrite (no adds, no deletes) commits and records the overwrite operation. +TEST_F(OverwriteFilesTest, CommitEmptyOverwrite) { + CommitFileA(); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + EXPECT_EQ(op->operation(), DataOperation::kOverwrite); + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kOperation), + DataOperation::kOverwrite); +} + +// Duplicate AddFile / DeleteFile are deduplicated by the underlying set types (Req 3.7). +TEST_F(OverwriteFilesTest, CommitDeduplicatesDuplicateAddAndDelete) { + CommitFileA(); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + auto add = MakeDataFile("/data/dup_add.parquet", 1L); + op->DeleteFile(file_a_); + op->DeleteFile(file_a_); // duplicate delete + op->AddFile(add); + op->AddFile(add); // duplicate add + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDataFiles), "1"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles), "1"); +} + +// StageOnly commits the snapshot without advancing the target branch (Req 11.2). +TEST_F(OverwriteFilesTest, CommitStageOnlyDoesNotAdvanceCurrentSnapshot) { + const int64_t base_snapshot_id = CommitFileA(); + const size_t base_snapshot_count = table_->metadata()->snapshots.size(); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->StageOnly(); + op->AddFile(file_b_); + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + // The staged snapshot is recorded but the main branch still points at file_a's + // snapshot. + ICEBERG_UNWRAP_OR_FAIL(auto current, table_->current_snapshot()); + EXPECT_EQ(current->snapshot_id, base_snapshot_id); + EXPECT_GT(table_->metadata()->snapshots.size(), base_snapshot_count); +} + +// SetTargetBranch commits to a named branch (Req 11.2). +TEST_F(OverwriteFilesTest, CommitToTargetBranch) { + CommitFileA(); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->SetTargetBranch("audit"); + op->AddFile(file_b_); + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + EXPECT_TRUE(table_->metadata()->refs.contains("audit")); +} + +// A custom Set(property, value) is carried into the committed snapshot summary +// (Req 11.2). +TEST_F(OverwriteFilesTest, CommitCustomSummaryProperty) { + CommitFileA(); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->Set("custom-prop", "custom-value"); + op->AddFile(file_b_); + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at("custom-prop"), "custom-value"); +} + +// ===================================================================================== +// 9.5 Bulk DeleteFiles tests (Req 3.3-3.7) +// ===================================================================================== + +// A DataFileSet + DeleteFileSet forwards data files to DeleteDataFile and delete files +// to DeleteDeleteFile; the committed snapshot reflects the data-file removal. (The +// delete file is forwarded to DeleteDeleteFile; with no matching committed delete file +// present its removal is a harmless no-op, mirroring the inherited missing-delete +// behavior.) +TEST_F(OverwriteFilesTest, BulkDeleteFilesRemovesDataAndDeleteFiles) { + // Seed the table with a data file. + { + ICEBERG_UNWRAP_OR_FAIL(auto seed, NewOverwrite()); + seed->AddFile(file_a_); + EXPECT_THAT(seed->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + auto del_file = MakeDeleteFile("/delete/del_a.parquet", 1L); + + // Build the bulk delete: remove file_a (data) plus del_file (delete file). + DataFileSet data_files; + data_files.insert(file_a_); + DeleteFileSet delete_files; + delete_files.insert(del_file); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFiles(data_files, delete_files); + op->AddFile(file_b_); + // Both a data file deletion and an add => overwrite. + EXPECT_EQ(op->operation(), DataOperation::kOverwrite); + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles), "1"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDataFiles), "1"); +} + +// Empty sets are a no-op: with no adds or deletes the builder is a bare overwrite. +TEST_F(OverwriteFilesTest, BulkDeleteFilesEmptySetsAreNoOp) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFiles(DataFileSet{}, DeleteFileSet{}); + EXPECT_EQ(op->operation(), DataOperation::kOverwrite); // neither adds nor deletes +} + +// A DataFileSet portion of DeleteFiles records data files such that DeletesDataFiles +// becomes true (observed via operation() == delete), equivalent to repeated DeleteFile. +TEST_F(OverwriteFilesTest, BulkDeleteFilesDataPortionMarksDelete) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + DataFileSet data_files; + data_files.insert(file_a_); + data_files.insert(file_b_); + op->DeleteFiles(data_files, DeleteFileSet{}); + EXPECT_EQ(op->operation(), DataOperation::kDelete); +} + +// DeleteFiles is equivalent to repeated DeleteFile for the data-file portion: both +// classify as delete and (after committing against a seeded table) remove the files. +TEST_F(OverwriteFilesTest, BulkDeleteFilesEquivalentToRepeatedDeleteFile) { + // Seed file_a and file_b. + { + ICEBERG_UNWRAP_OR_FAIL(auto seed, NewOverwrite()); + seed->AddFile(file_a_); + seed->AddFile(file_b_); + EXPECT_THAT(seed->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + DataFileSet data_files; + data_files.insert(file_a_); + data_files.insert(file_b_); + op->DeleteFiles(data_files, DeleteFileSet{}); + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles), "2"); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kTotalDataFiles), "0"); +} + +// Content validation: because both sets hold std::shared_ptr, DeleteFiles +// guards content so a data file cannot be passed as a delete file (or vice versa). A +// delete file (position/equality) in the data-file set is rejected; the error surfaces +// at Commit(). +TEST_F(OverwriteFilesTest, BulkDeleteFilesRejectsDeleteFileInDataSet) { + auto del_file = + MakeDeleteFile("/delete/del_a.parquet", 1L); // content = positionDeletes + DataFileSet data_files; + data_files.insert(del_file); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFiles(data_files, DeleteFileSet{}); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("has delete-file content")); +} + +// A data file (content kData) in the delete-file set is rejected. +TEST_F(OverwriteFilesTest, BulkDeleteFilesRejectsDataFileInDeleteSet) { + DeleteFileSet delete_files; + delete_files.insert(file_a_); // content = kData + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFiles(DataFileSet{}, delete_files); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("has data-file content")); +} + +// An equality delete file is a valid delete file (content != kData) and is accepted in +// the delete-file set. +TEST_F(OverwriteFilesTest, BulkDeleteFilesAcceptsEqualityDeleteInDeleteSet) { + auto eq_delete = MakeEqualityDeleteFile("/delete/eq_a.parquet", 1L); + DeleteFileSet delete_files; + delete_files.insert(eq_delete); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFiles(DataFileSet{}, delete_files); + op->AddFile(file_b_); + EXPECT_THAT(op->Commit(), IsOk()); +} + +// ===================================================================================== +// 9.6 Concurrency-validation tests (Req 8.2, 8.3, 9.2-9.5; Properties 6, 7) +// ===================================================================================== + +// ValidateNoConflictingData: a competing FastAppend that added a data file matching the +// resolved conflict-detection filter makes the overwrite commit fail (Req 8.3). +TEST_F(OverwriteFilesTest, ValidateNoConflictingDataDetectsConflictingAdd) { + const int64_t first_id = CommitFileA(); + // Competing append of file_b (partition x=2) between read and commit. + CommitFastAppend(file_b_); + + // The overwrite targets the x=2 range, so the concurrent add of file_b conflicts. + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(2L))); + op->AddFile(MakeDataFile("/data/replacement_x2.parquet", 2L)); + op->ValidateFromSnapshot(first_id); + op->ValidateNoConflictingData(); + EXPECT_THAT(op->Commit(), IsError(ErrorKind::kValidationFailed)); +} + +// A non-conflicting concurrent change still commits, and the recorded operation matches +// operation() (Req 11.4, 11.5; Property 6). +TEST_F(OverwriteFilesTest, ValidateNoConflictingDataAllowsNonConflictingChange) { + const int64_t first_id = CommitFileA(); + // Competing append of file_b in partition x=2. + CommitFastAppend(file_b_); + + // The overwrite targets the x=1 range; the concurrent x=2 add does not conflict. + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->AddFile(MakeDataFile("/data/replacement_x1.parquet", 1L)); + op->ValidateFromSnapshot(first_id); + op->ValidateNoConflictingData(); + op->ValidateNoConflictingDeletes(); + const std::string expected_operation = op->operation(); + EXPECT_THAT(op->Commit(), IsOk()); + + EXPECT_THAT(table_->Refresh(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot()); + EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kOperation), expected_operation); +} + +// ValidateNoConflictingDeletes: a competing snapshot that deleted a data file in the +// overwrite range makes the commit fail (Req 9.2-9.4). +TEST_F(OverwriteFilesTest, ValidateNoConflictingDeletesDetectsConflictingDelete) { + const int64_t first_id = CommitFileA(); + + // Competing overwrite removes file_a (partition x=1) between read and commit. + { + ICEBERG_UNWRAP_OR_FAIL(auto competing, NewOverwrite()); + competing->DeleteFile(file_a_); + EXPECT_THAT(competing->Commit(), IsOk()); + EXPECT_THAT(table_->Refresh(), IsOk()); + } + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->AddFile(MakeDataFile("/data/replacement_after_delete.parquet", 1L)); + op->ValidateFromSnapshot(first_id); + op->ValidateNoConflictingDeletes(); + EXPECT_THAT(op->Commit(), IsError(ErrorKind::kValidationFailed)); +} + +// ValidateNoConflictingDeletes allows a non-conflicting concurrent append to commit. +TEST_F(OverwriteFilesTest, ValidateNoConflictingDeletesAllowsNonConflictingChange) { + const int64_t first_id = CommitFileA(); + // Competing append in partition x=2 (no deletes in the x=1 range). + CommitFastAppend(file_b_); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->AddFile(MakeDataFile("/data/replacement_no_conflict.parquet", 1L)); + op->ValidateFromSnapshot(first_id); + op->ValidateNoConflictingDeletes(); + EXPECT_THAT(op->Commit(), IsOk()); +} + +// ===================================================================================== +// Fix #1: the explicit replaced-files delete branch (Path B) honors the +// ConflictDetectionFilter. +// +// Path B fires when explicit data files were registered for replacement +// (deleted_data_files_ non-empty) under ValidateNoConflictingDeletes(). It now calls the +// STATIC data_filter overload of ValidateNoNewDeletesForDataFiles, passing the raw +// conflict_detection_filter_ (which may be nullptr = "no filter, consider all delete +// files"). These tests inject a concurrent equality-delete file that +// covers the replaced data file (file_a, partition x=1) and assert that: +// * with NO conflict filter, the concurrent delete is a conflict => FAIL; +// * with a conflict filter that does NOT cover x=1 (here x=2), the delete is filtered +// out of the conflict scope => SUCCEED. +// +// To exercise Path B in isolation, the builder uses explicit DeleteFile (no +// OverwriteByRowFilter), so RowFilter() stays AlwaysFalse and Path A is skipped. +// ===================================================================================== + +// No conflict filter => the concurrent delete on the replaced file is detected (Path B +// passes nullptr through, so all delete files are considered). +TEST_F(OverwriteFilesTest, PathBExplicitDeletesDetectsConcurrentDeleteWithoutFilter) { + CommitFileA(); + ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot, table_->current_snapshot()); + + // Concurrent commit adds an equality delete covering file_a (partition x=1). + auto concurrent = InjectConcurrentEqualityDelete( + first_snapshot, "/delete/concurrent_x1.parquet", /*partition_x=*/1L); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFile(file_a_); // explicit replacement => deleted_data_files_ non-empty + op->AddFile(MakeDataFile("/data/rewrite_x1.parquet", 1L)); + op->ValidateFromSnapshot(first_snapshot->snapshot_id); + op->ValidateNoConflictingDeletes(); + // No ConflictDetectionFilter => Path B considers all concurrent deletes => conflict. + EXPECT_THAT(op->Validate(*concurrent.metadata, concurrent.snapshot), + IsError(ErrorKind::kValidationFailed)); +} + +// A conflict filter that does not cover the replaced file's partition narrows the scope, +// so the concurrent delete is filtered out and validation succeeds. +TEST_F(OverwriteFilesTest, PathBExplicitDeletesConflictFilterNarrowsScope) { + CommitFileA(); + ICEBERG_UNWRAP_OR_FAIL(auto first_snapshot, table_->current_snapshot()); + + // Same concurrent equality delete covering file_a (partition x=1). + auto concurrent = InjectConcurrentEqualityDelete( + first_snapshot, "/delete/concurrent_x1.parquet", /*partition_x=*/1L); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFile(file_a_); + op->AddFile(MakeDataFile("/data/rewrite_x1.parquet", 1L)); + op->ValidateFromSnapshot(first_snapshot->snapshot_id); + // Conflict filter targets x=2, which does NOT cover the x=1 delete => filtered out. + op->ConflictDetectionFilter(Expressions::Equal("x", Literal::Long(2L))); + op->ValidateNoConflictingDeletes(); + EXPECT_THAT(op->Validate(*concurrent.metadata, concurrent.snapshot), IsOk()); +} + +// ===================================================================================== +// 9.7 Strict added-file range validation tests (Req 10.1-10.6; Properties 9, 10) +// +// These exercise OverwriteFiles::Validate(...) directly (the same entry point the commit +// kernel invokes), which is sufficient and deterministic: the strict-range branch does +// not depend on concurrent snapshots. +// ===================================================================================== + +// Strict partition projection proves containment directly (Req 10.3). +TEST_F(OverwriteFilesTest, StrictRangeAcceptedByStrictProjection) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->AddFile(MakeDataFile("/data/in_partition.parquet", 1L)); + op->ValidateAddedFilesMatchOverwriteFilter(); + EXPECT_THAT(op->Validate(*table_->metadata(), /*snapshot=*/nullptr), IsOk()); +} + +// Strict partition projection is insufficient (filter on a non-partition column) but the +// StrictMetricsEvaluator proves containment from the file's bounds (Req 10.3). +TEST_F(OverwriteFilesTest, StrictRangeAcceptedByStrictMetrics) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("y", Literal::Long(5L))); + // y bounds [5, 5] => every row has y == 5, fully contained in the filter. + op->AddFile(MakeDataFileWithYBounds("/data/y_eq_5.parquet", 1L, 5L, 5L)); + op->ValidateAddedFilesMatchOverwriteFilter(); + EXPECT_THAT(op->Validate(*table_->metadata(), /*snapshot=*/nullptr), IsOk()); +} + +// Neither the strict projection nor the metrics can prove containment => fail (Req 10.5). +TEST_F(OverwriteFilesTest, StrictRangeRejectedWhenNotProvable) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("y", Literal::Long(5L))); + // y bounds [1, 10] => not all rows are y == 5. + op->AddFile(MakeDataFileWithYBounds("/data/y_range.parquet", 1L, 1L, 10L)); + op->ValidateAddedFilesMatchOverwriteFilter(); + EXPECT_THAT(op->Validate(*table_->metadata(), /*snapshot=*/nullptr), + IsError(ErrorKind::kValidationFailed)); +} + +// A file whose partition falls outside the inclusive projection is rejected (Req 10.4). +TEST_F(OverwriteFilesTest, StrictRangeRejectsFileOutsidePartitionRange) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->AddFile(MakeDataFile("/data/wrong_partition.parquet", /*partition_x=*/2L)); + op->ValidateAddedFilesMatchOverwriteFilter(); + EXPECT_THAT(op->Validate(*table_->metadata(), /*snapshot=*/nullptr), + IsError(ErrorKind::kValidationFailed)); +} + +// ValidateAddedFilesMatchOverwriteFilter without a row filter fails (Req 10.1, 10.2; +// Property 10). +TEST_F(OverwriteFilesTest, StrictRangeRequiresRowFilter) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->AddFile(MakeDataFile("/data/no_filter.parquet", 1L)); + op->ValidateAddedFilesMatchOverwriteFilter(); + EXPECT_THAT(op->Validate(*table_->metadata(), /*snapshot=*/nullptr), + IsError(ErrorKind::kValidationFailed)); +} + +// Fix #2: added files belonging to MORE THAN ONE partition spec are rejected, since the +// validation resolves a single spec via DataSpec() (which requires exactly one spec among +// added files). DataSpec() fails fast with a multi-spec error. +TEST_F(OverwriteFilesTest, StrictRangeRejectsMultiplePartitionSpecs) { + // Add a second partition spec (id 1) to the table metadata BEFORE creating the builder, + // so the producer's base metadata can resolve both specs when files are staged. + ICEBERG_UNWRAP_OR_FAIL( + auto spec1, PartitionSpec::Make(*schema_, /*spec_id=*/1, + {PartitionField(/*source_id=*/1, /*field_id=*/1001, + "x_v1", Transform::Identity())}, + /*allow_missing_fields=*/false)); + table_->metadata()->partition_specs.push_back( + std::shared_ptr(std::move(spec1))); + // Confirm both specs resolve. + ASSERT_THAT(table_->metadata()->PartitionSpecById(0), IsOk()); + ASSERT_THAT(table_->metadata()->PartitionSpecById(1), IsOk()); + + auto file_spec0 = MakeDataFile("/data/spec0_x1.parquet", 1L); // partition_spec_id 0 + auto file_spec1 = MakeDataFile("/data/spec1_x1.parquet", 1L); + file_spec1->partition_spec_id = 1; + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->AddFile(file_spec0); + op->AddFile(file_spec1); + op->ValidateAddedFilesMatchOverwriteFilter(); + // DataSpec() rejects the two distinct specs with an InvalidArgument error. + EXPECT_THAT(op->Validate(*table_->metadata(), /*snapshot=*/nullptr), + IsError(ErrorKind::kInvalidArgument)); +} + +// Fix #3: enabling the strict added-file range validation with a row filter set but NO +// added data files (e.g. a pure overwrite-by-filter with no AddFile) fails, because +// DataSpec() rejects an empty added-files set. +TEST_F(OverwriteFilesTest, StrictRangeRejectsEmptyAddedFiles) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + // Row filter is set (precondition satisfied) but no AddFile was called. + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->ValidateAddedFilesMatchOverwriteFilter(); + // DataSpec() raises InvalidArgument because no data file was added. + EXPECT_THAT(op->Validate(*table_->metadata(), /*snapshot=*/nullptr), + IsError(ErrorKind::kInvalidArgument)); +} + +// ===================================================================================== +// 9.8 Case-sensitivity and null-rejection tests (Req 6.4, 12.1-12.4; Property 4) +// ===================================================================================== + +// Case-insensitive binding accepts a differently-cased column name where the +// case-sensitive (default) binding rejects it. Observed through the strict-range +// validation, which binds the row filter using the configured case sensitivity. +TEST_F(OverwriteFilesTest, CaseSensitivityAffectsFilterBinding) { + // Case-sensitive (default): the filter references "X" which does not match column "x". + { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("X", Literal::Long(1L))); + op->AddFile(MakeDataFile("/data/cs.parquet", 1L)); + op->ValidateAddedFilesMatchOverwriteFilter(); + // Binding "X" against schema {x, y, z} fails case-sensitively. + auto result = op->Validate(*table_->metadata(), /*snapshot=*/nullptr); + EXPECT_FALSE(result.has_value()); + } + // Case-insensitive: "X" binds to column "x" and the in-range file validates. + { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->WithCaseSensitivity(false); + op->OverwriteByRowFilter(Expressions::Equal("X", Literal::Long(1L))); + op->AddFile(MakeDataFile("/data/ci.parquet", 1L)); + op->ValidateAddedFilesMatchOverwriteFilter(); + EXPECT_THAT(op->Validate(*table_->metadata(), /*snapshot=*/nullptr), IsOk()); + } +} + +// Null arguments to the pointer-taking builder methods surface at Commit() without +// crashing (Req 12.1, 12.2; Property 4). The ErrorCollector aggregates deferred builder +// errors and surfaces them as a kValidationFailed error at Commit() that preserves the +// underlying invalid-argument message. +TEST_F(OverwriteFilesTest, NullAddFileRejectedAtCommit) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->AddFile(nullptr); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Invalid data file: null")); +} + +TEST_F(OverwriteFilesTest, NullDeleteFileRejectedAtCommit) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFile(nullptr); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Invalid data file: null")); +} + +TEST_F(OverwriteFilesTest, NullOverwriteByRowFilterRejectedAtCommit) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(nullptr); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Invalid row filter expression: null")); +} + +TEST_F(OverwriteFilesTest, NullConflictDetectionFilterRejectedAtCommit) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->ConflictDetectionFilter(nullptr); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Invalid conflict detection filter: null")); +} + +// The builder chain continues without crashing after a null argument is recorded. +TEST_F(OverwriteFilesTest, NullArgumentDoesNotCrashBuilderChain) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + (*op).AddFile(nullptr).AddFile(file_a_).OverwriteByRowFilter( + Expressions::Equal("x", Literal::Long(1L))); + // The recorded error still surfaces at commit. + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Invalid data file: null")); +} + +// A null element cannot enter a DataFileSet / DeleteFileSet (insert() rejects nullptr), +// so DeleteFiles({null...}, {null...}) is a no-op rather than an error. The deferred +// null-element rejection inside DeleteFiles is therefore a defensive guard that is +// unreachable through the public set API; this test documents that observable behavior. +TEST_F(OverwriteFilesTest, DeleteFilesNullElementsCannotEnterSets) { + DataFileSet data_files; + data_files.insert(std::shared_ptr{nullptr}); + DeleteFileSet delete_files; + delete_files.insert(std::shared_ptr{nullptr}); + EXPECT_TRUE(data_files.empty()); + EXPECT_TRUE(delete_files.empty()); + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFiles(data_files, delete_files); + // No data files were deleted => still a bare overwrite, and commit succeeds. + EXPECT_EQ(op->operation(), DataOperation::kOverwrite); + EXPECT_THAT(op->Commit(), IsOk()); +} + +// ValidateFromSnapshot accepts a non-negative id (0 is in the generated id range +// [0, INT64_MAX]) and rejects a negative id (including kInvalidSnapshotId == -1) as a +// deferred error surfaced at Commit(). Req 6.1. +TEST_F(OverwriteFilesTest, ValidateFromSnapshotRejectsNegativeSnapshotId) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->AddFile(file_a_).ValidateFromSnapshot(-1); + auto result = op->Commit(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Invalid snapshot id")); +} + +TEST_F(OverwriteFilesTest, ValidateFromSnapshotAcceptsZeroSnapshotId) { + // 0 is a legal generated snapshot id (the generator masks with int64_t::max()), so it + // must not be rejected at the builder stage. With no concurrency validation enabled, + // the starting id is merely recorded and the commit succeeds. + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->AddFile(file_a_).ValidateFromSnapshot(0); + EXPECT_THAT(op->Commit(), IsOk()); +} + +// ===================================================================================== +// Property-style tests (parameterized via loops; no PBT library is available). +// ===================================================================================== + +// Property 2 (builder forwarding) + Property 3 (delete dual-tracking), Task 2.4. +// +// AddedDataFiles() / deleted_data_files_ are not publicly observable, so the properties +// are checked indirectly through operation(): for any non-null file, AddFile-only yields +// `append` (the file was added and the row filter is untouched), while DeleteFile-only +// and DeleteFiles-only (data portion) yield `delete` (the file was registered for +// deletion and DeletesDataFiles() became true). Validates Req 2.1, 2.2, 3.1-3.5. +TEST_F(OverwriteFilesTest, PropertyBuilderForwardingAndDualTracking) { + const std::vector> files = { + MakeDataFile("/data/p0.parquet", 1L), + MakeDataFile("/data/p1.parquet", 2L), + MakeDataFile("/data/p2.parquet", 3L), + MakeDataFile("/data/p3.parquet", 7L), + }; + + for (const auto& file : files) { + { + // AddFile preserves "no deletes" => append, proving the row filter is unchanged. + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->AddFile(file); + EXPECT_EQ(op->operation(), DataOperation::kAppend) << file->file_path; + } + { + // DeleteFile dual-tracks => DeletesDataFiles() true, no adds => delete. + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->DeleteFile(file); + EXPECT_EQ(op->operation(), DataOperation::kDelete) << file->file_path; + } + { + // Bulk DeleteFiles data portion behaves like repeated DeleteFile. + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + DataFileSet data_files; + data_files.insert(file); + op->DeleteFiles(data_files, DeleteFileSet{}); + EXPECT_EQ(op->operation(), DataOperation::kDelete) << file->file_path; + } + } +} + +// Property 4 (null rejection), Task 2.5: every pointer-taking builder mutator records a +// deferred error that surfaces as an InvalidArgument-class error at Commit() without +// crashing. Validates Req 12.1, 12.2, 12.3, 12.4. +TEST_F(OverwriteFilesTest, PropertyNullArgumentRejection) { + using Mutator = std::function; + const std::vector mutators = { + [](OverwriteFiles& op) { op.AddFile(nullptr); }, + [](OverwriteFiles& op) { op.DeleteFile(nullptr); }, + [](OverwriteFiles& op) { op.OverwriteByRowFilter(nullptr); }, + [](OverwriteFiles& op) { op.ConflictDetectionFilter(nullptr); }, + }; + + for (const auto& mutate : mutators) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + mutate(*op); + // Deferred builder errors are aggregated and surfaced as kValidationFailed at commit. + EXPECT_THAT(op->Commit(), IsError(ErrorKind::kValidationFailed)); + } +} + +// Property 1 (operation() reflects content), Task 3.2: exhaustive truth table over +// {adds, deletes (explicit or via row filter)}. Validates Req 5.1-5.4. +TEST_F(OverwriteFilesTest, PropertyOperationTruthTable) { + struct Case { + bool add; + bool delete_file; + bool row_filter; + std::string expected; + }; + const std::vector cases = { + {/*add=*/true, /*delete_file=*/false, /*row_filter=*/false, DataOperation::kAppend}, + {false, true, false, DataOperation::kDelete}, + {false, false, true, DataOperation::kDelete}, // row filter counts as a delete + {true, true, false, DataOperation::kOverwrite}, + {true, false, true, DataOperation::kOverwrite}, + {false, true, true, DataOperation::kDelete}, // deletes only (no adds) => delete + {false, false, false, DataOperation::kOverwrite}, // neither + }; + + int index = 0; + for (const auto& c : cases) { + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + if (c.add) { + op->AddFile(MakeDataFile("/data/tt_add" + std::to_string(index) + ".parquet", 1L)); + } + if (c.delete_file) { + op->DeleteFile( + MakeDataFile("/data/tt_del" + std::to_string(index) + ".parquet", 1L)); + } + if (c.row_filter) { + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + } + EXPECT_EQ(op->operation(), c.expected) << "case index " << index; + ++index; + } +} + +// Property 5 (conflict filter resolution), Task 4.4. +// +// DataConflictDetectionFilter() is private, so its three resolution outcomes are observed +// indirectly through ValidateNoConflictingData against a competing concurrent add of +// file_b (partition x=2): +// * explicit filter set -> the explicit filter is used (overrides the row filter); +// * row filter only -> the row filter is used; +// * file-replacement present -> AlwaysTrue (any concurrent add conflicts). +// Validates Req 7.1, 7.2, 7.3. +TEST_F(OverwriteFilesTest, PropertyConflictFilterResolution) { + // Resolution case 2 (row filter only): row filter x=1 does NOT match the concurrent + // x=2 add -> no conflict. + { + const int64_t first_id = CommitFileA(); + CommitFastAppend(file_b_); + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->AddFile(MakeDataFile("/data/r2_ok.parquet", 1L)); + op->ValidateFromSnapshot(first_id); + op->ValidateNoConflictingData(); + EXPECT_THAT(op->Validate(*table_->metadata(), table_->current_snapshot().value()), + IsOk()); + } + // Resolution case 2 (row filter only): row filter x=2 DOES match -> conflict, proving + // the row filter (not AlwaysFalse / AlwaysTrue blindly) is what was used. + { + SetUp(); // fresh table + const int64_t first_id = CommitFileA(); + CommitFastAppend(file_b_); + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(2L))); + op->AddFile(MakeDataFile("/data/r2_conflict.parquet", 2L)); + op->ValidateFromSnapshot(first_id); + op->ValidateNoConflictingData(); + EXPECT_THAT(op->Validate(*table_->metadata(), table_->current_snapshot().value()), + IsError(ErrorKind::kValidationFailed)); + } + // Resolution case 1 (explicit filter overrides row filter): row filter x=1 would NOT + // conflict, but the explicit conflict filter x=2 does -> conflict. + { + SetUp(); + const int64_t first_id = CommitFileA(); + CommitFastAppend(file_b_); + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->ConflictDetectionFilter(Expressions::Equal("x", Literal::Long(2L))); + op->AddFile(MakeDataFile("/data/r1.parquet", 1L)); + op->ValidateFromSnapshot(first_id); + op->ValidateNoConflictingData(); + EXPECT_THAT(op->Validate(*table_->metadata(), table_->current_snapshot().value()), + IsError(ErrorKind::kValidationFailed)); + } + // Resolution case 3 (file replacement -> AlwaysTrue): with an explicit DeleteFile and + // no explicit conflict filter, ANY concurrent add conflicts (here file_b in x=2, even + // though the row filter is x=1). + { + SetUp(); + const int64_t first_id = CommitFileA(); + CommitFastAppend(file_b_); + ICEBERG_UNWRAP_OR_FAIL(auto op, NewOverwrite()); + op->OverwriteByRowFilter(Expressions::Equal("x", Literal::Long(1L))); + op->DeleteFile(file_a_); // makes deleted_data_files_ non-empty => AlwaysTrue + op->AddFile(MakeDataFile("/data/r3.parquet", 1L)); + op->ValidateFromSnapshot(first_id); + op->ValidateNoConflictingData(); + EXPECT_THAT(op->Validate(*table_->metadata(), table_->current_snapshot().value()), + IsError(ErrorKind::kValidationFailed)); + } +} + +} // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 049b0f49d..c28c7b554 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -34,6 +34,7 @@ #include "iceberg/table_update.h" #include "iceberg/update/expire_snapshots.h" #include "iceberg/update/fast_append.h" +#include "iceberg/update/overwrite_files.h" #include "iceberg/update/pending_update.h" #include "iceberg/update/set_snapshot.h" #include "iceberg/update/snapshot_manager.h" @@ -478,6 +479,13 @@ Result> Transaction::NewFastAppend() { return fast_append; } +Result> Transaction::NewOverwrite() { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr overwrite, + OverwriteFiles::Make(ctx_->table->name().name, ctx_)); + ICEBERG_RETURN_UNEXPECTED(AddUpdate(overwrite)); + return overwrite; +} + Result> Transaction::NewUpdateStatistics() { ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_statistics, UpdateStatistics::Make(ctx_)); diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 60fe935f3..af6897567 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -106,6 +106,9 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> NewFastAppend(); + /// \brief Create a new OverwriteFiles to overwrite data files and commit the changes. + Result> NewOverwrite(); + /// \brief Create a new SnapshotManager to manage snapshots. Result> NewSnapshotManager(); diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 745c63acb..ff89d71db 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -223,6 +223,7 @@ class TransactionContext; /// \brief Update family. class ExpireSnapshots; class FastAppend; +class OverwriteFiles; class PendingUpdate; class SetSnapshot; class SnapshotManager; diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build index 6405f603f..b8c06465f 100644 --- a/src/iceberg/update/meson.build +++ b/src/iceberg/update/meson.build @@ -20,6 +20,7 @@ install_headers( 'expire_snapshots.h', 'fast_append.h', 'merging_snapshot_update.h', + 'overwrite_files.h', 'pending_update.h', 'set_snapshot.h', 'snapshot_manager.h', diff --git a/src/iceberg/update/overwrite_files.cc b/src/iceberg/update/overwrite_files.cc new file mode 100644 index 000000000..8283c4529 --- /dev/null +++ b/src/iceberg/update/overwrite_files.cc @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/overwrite_files.h" + +#include + +#include "iceberg/expression/binder.h" +#include "iceberg/expression/evaluator.h" +#include "iceberg/expression/expressions.h" +#include "iceberg/expression/projections.h" +#include "iceberg/expression/strict_metrics_evaluator.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/partition_spec.h" +#include "iceberg/schema.h" +#include "iceberg/schema_field.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" +#include "iceberg/type.h" +#include "iceberg/util/error_collector.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result> OverwriteFiles::Make( + std::string table_name, std::shared_ptr ctx) { + ICEBERG_PRECHECK(!table_name.empty(), "Table name cannot be empty"); + ICEBERG_PRECHECK(ctx != nullptr, "Cannot create OverwriteFiles without a context"); + return std::shared_ptr( + new OverwriteFiles(std::move(table_name), std::move(ctx))); +} + +OverwriteFiles::OverwriteFiles(std::string table_name, + std::shared_ptr ctx) + : MergingSnapshotUpdate(std::move(table_name), std::move(ctx)) {} + +OverwriteFiles::~OverwriteFiles() = default; + +OverwriteFiles& OverwriteFiles::AddFile(const std::shared_ptr& file) { + ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid data file: null"); + // Forward to the inherited staging path. Any error (e.g. missing partition spec + // ID) is captured by the ErrorCollector and surfaced at Commit() rather than + // dropped. RowFilter() and deleted_data_files_ are intentionally left unchanged. + ICEBERG_BUILDER_RETURN_IF_ERROR(AddDataFile(file)); + return *this; +} + +OverwriteFiles& OverwriteFiles::DeleteFile(const std::shared_ptr& file) { + ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid data file: null"); + // Dual-track: record the file for delete-conflict validation AND register it for + // removal via the inherited pipeline. + deleted_data_files_.insert(file); + ICEBERG_BUILDER_RETURN_IF_ERROR(DeleteDataFile(file)); + return *this; +} + +OverwriteFiles& OverwriteFiles::DeleteFiles(const DataFileSet& data_files_to_delete, + const DeleteFileSet& delete_files_to_delete) { + // Bulk equivalent of repeated DeleteFile(...) plus explicit delete-file removal. Empty + // sets are no-ops; the set types handle deduplication of repeated entries. + // + // Because both sets hold std::shared_ptr, content is validated here so a data + // file cannot be registered as a delete file (or vice versa): a data file must have + // content kData, and a delete file must be a position/equality delete (content != + // kData). This restores the type safety the C++ shared DataFile representation cannot + // enforce at compile time. + for (const auto& file : data_files_to_delete) { + ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid data file: null"); + ICEBERG_BUILDER_CHECK(file->content == DataFile::Content::kData, + "Invalid data file to delete: {} has delete-file content", + file->file_path); + // Dual-track: record for delete-conflict validation AND register for removal. + deleted_data_files_.insert(file); + ICEBERG_BUILDER_RETURN_IF_ERROR(DeleteDataFile(file)); + } + for (const auto& file : delete_files_to_delete) { + ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid delete file: null"); + ICEBERG_BUILDER_CHECK(file->content != DataFile::Content::kData, + "Invalid delete file to delete: {} has data-file content", + file->file_path); + ICEBERG_BUILDER_RETURN_IF_ERROR(DeleteDeleteFile(file)); + } + return *this; +} + +OverwriteFiles& OverwriteFiles::OverwriteByRowFilter(std::shared_ptr expr) { + ICEBERG_BUILDER_CHECK(expr != nullptr, "Invalid row filter expression: null"); + // Forward to the inherited filter path: this sets RowFilter() and forwards the + // expression to both the data and delete filter managers. Any error is captured by the + // ErrorCollector and surfaced at Commit(). + ICEBERG_BUILDER_RETURN_IF_ERROR(DeleteByRowFilter(std::move(expr))); + return *this; +} + +OverwriteFiles& OverwriteFiles::ValidateFromSnapshot(int64_t snapshot_id) { + // Snapshot ids are non-negative: SnapshotUtil::GenerateSnapshotId() masks with + // int64_t::max() and so yields a value in [0, INT64_MAX], while kInvalidSnapshotId is + // -1. A negative id therefore can never identify a real snapshot and is rejected via + // the deferred ErrorCollector (surfaced at Commit()); 0 is accepted because it is in + // the generator's range. A negative id is always a misuse, so we keep an early, + // clearer guard against the obviously-invalid sentinel/negative values. + ICEBERG_BUILDER_CHECK(snapshot_id >= 0, "Invalid snapshot id: {}", snapshot_id); + starting_snapshot_id_ = snapshot_id; + return *this; +} + +OverwriteFiles& OverwriteFiles::ConflictDetectionFilter( + std::shared_ptr expr) { + ICEBERG_BUILDER_CHECK(expr != nullptr, "Invalid conflict detection filter: null"); + conflict_detection_filter_ = std::move(expr); + return *this; +} + +OverwriteFiles& OverwriteFiles::WithCaseSensitivity(bool case_sensitive) { + // Forward to the protected MergingSnapshotUpdate::CaseSensitive(bool) setter (which + // returns void); the public name differs to avoid the public/protected name clash and + // keep a fluent return type. + CaseSensitive(case_sensitive); + return *this; +} + +OverwriteFiles& OverwriteFiles::ValidateNoConflictingData() { + // Enable concurrent-data validation and require every explicitly-deleted old file to + // be hit during manifest filtering. + validate_no_conflicting_data_ = true; + FailMissingDeletePaths(); + return *this; +} + +OverwriteFiles& OverwriteFiles::ValidateNoConflictingDeletes() { + // Enable concurrent-delete validation and require every explicitly-deleted old file to + // be hit during manifest filtering. + validate_no_conflicting_deletes_ = true; + FailMissingDeletePaths(); + return *this; +} + +OverwriteFiles& OverwriteFiles::ValidateAddedFilesMatchOverwriteFilter() { + // Enable strict validation that every added data file is fully contained in the + // overwrite range. The precondition (a row filter must be set) is enforced in + // Validate(...). + validate_added_files_match_overwrite_filter_ = true; + return *this; +} + +// Classify the snapshot operation from the current builder content. DeletesDataFiles() +// is true for both an explicit DeleteFile(...) and a pure OverwriteByRowFilter(...) (the +// latter via ManifestFilterManager::ContainsDeletes()), so OverwriteByRowFilter + AddFile +// correctly classifies as overwrite without any RowFilter()-based fallback. +std::string OverwriteFiles::operation() { + if (DeletesDataFiles() && !AddsDataFiles()) { + return DataOperation::kDelete; + } + if (AddsDataFiles() && !DeletesDataFiles()) { + return DataOperation::kAppend; + } + return DataOperation::kOverwrite; +} + +// Select the conflict-detection filter. Precedence: (1) the explicitly-set +// conflict_detection_filter_; otherwise +// (2) the row filter when it is set and not AlwaysFalse and no explicit data files were +// registered for deletion (the pure overwriteByRowFilter case, where the row filter +// already describes the conflicting range); otherwise (3) the conservative AlwaysTrue, +// under which any newly-added data file counts as a conflict (file-replacement or mixed +// mode). RowFilter() defaults to the AlwaysFalse singleton when unset, so comparing +// against the Expressions::AlwaysFalse() singleton by pointer identity (consistent with +// how the filter managers track delete_expr_) correctly treats the unset case as "no +// row filter". +std::shared_ptr OverwriteFiles::DataConflictDetectionFilter() const { + if (conflict_detection_filter_ != nullptr) { + return conflict_detection_filter_; + } + if (RowFilter() != nullptr && RowFilter() != Expressions::AlwaysFalse() && + deleted_data_files_.empty()) { + return RowFilter(); + } + return Expressions::AlwaysTrue(); +} + +// Run the enabled overwrite-specific concurrency checks before commit. The branches are +// sequential and independent. The first failing check aborts the commit; on success we +// return OK and let the inherited Apply(...) build the snapshot. +Status OverwriteFiles::Validate(const TableMetadata& current_metadata, + const std::shared_ptr& snapshot) { + // 0. Strict added-file range precondition: when + // ValidateAddedFilesMatchOverwriteFilter() + // is enabled, a row filter is required to define the overwrite range. RowFilter() + // defaults to the AlwaysFalse singleton when unset, so an unset or AlwaysFalse + // filter means there is no range to validate against and the commit must fail. + if (validate_added_files_match_overwrite_filter_) { + if (RowFilter() == nullptr || RowFilter() == Expressions::AlwaysFalse()) { + return ValidationFailed( + "Cannot validate added files match overwrite filter: row filter is not set"); + } + ICEBERG_RETURN_UNEXPECTED( + ValidateAddedFilesMatchOverwriteFilterImpl(current_metadata)); + } + + // 1. Concurrent newly-added data files: fail if any snapshot after the starting point + // added a data file matching the resolved conflict-detection filter. + if (validate_no_conflicting_data_) { + ICEBERG_RETURN_UNEXPECTED(ValidateAddedDataFiles( + current_metadata, starting_snapshot_id_, DataConflictDetectionFilter(), snapshot, + ctx_->table->io(), IsCaseSensitive())); + } + + // 2. Concurrent deletes: the two sub-checks are independent and both may fire. + if (validate_no_conflicting_deletes_) { + // Path A: a row filter is in play (set and not the AlwaysFalse singleton). Fail if a + // new delete file matches the range, or if a data file in the range was concurrently + // removed. Prefer the explicit conflict_detection_filter_ when set, else the row + // filter that describes the overwrite range. + if (RowFilter() != nullptr && RowFilter() != Expressions::AlwaysFalse()) { + auto delete_filter = conflict_detection_filter_ != nullptr + ? conflict_detection_filter_ + : RowFilter(); + ICEBERG_RETURN_UNEXPECTED( + ValidateNoNewDeleteFiles(current_metadata, starting_snapshot_id_, delete_filter, + snapshot, ctx_->table->io(), IsCaseSensitive())); + ICEBERG_RETURN_UNEXPECTED( + ValidateDeletedDataFiles(current_metadata, starting_snapshot_id_, delete_filter, + snapshot, ctx_->table->io(), IsCaseSensitive())); + } + + // Path B: explicit old data files were registered for replacement. Fail if a new + // delete file matching the conflict-detection filter covers any of them. Use the + // STATIC data_filter overload, passing the raw conflict_detection_filter_ (which may + // be nullptr — a nullptr filter means no filter, so all delete files are considered). + // The 3rd argument is an Expression, so this overload is unambiguous and needs no + // member-function-pointer cast. + if (!deleted_data_files_.empty()) { + ICEBERG_RETURN_UNEXPECTED(ValidateNoNewDeletesForDataFiles( + current_metadata, starting_snapshot_id_, conflict_detection_filter_, + deleted_data_files_, snapshot, ctx_->table->io(), IsCaseSensitive())); + } + } + + return {}; +} + +// Every added data file must be fully contained in the overwrite range defined by +// RowFilter(). The validation resolves a single partition spec via DataSpec() and then, +// for each added file, an inclusive partition projection provides a fast negative (reject +// when the file's partition cannot possibly fall in range), a strict partition projection +// provides a fast positive (accept when the whole partition is guaranteed in range), and +// a file-level StrictMetricsEvaluator provides the fallback proof. Metrics that are +// missing or insufficient yield a non-matching result and therefore fail validation +// (conservative: never silently accept an unprovable file). +// +// DataSpec() returns InvalidArgument when zero data files were added (empty-added-files +// is rejected) and when more than one partition spec is represented among the added files +// (multi-spec is rejected). DataSpec() reads the spec from the producer's base metadata; +// the projections use the table schema from metadata.Schema(). +// +// Implementation notes: +// * `Evaluator::Make` takes a `Schema`, and the partition type is obtained via +// `PartitionSpec::PartitionType(schema)` (which needs the table schema and returns a +// `StructType`). We therefore wrap the partition `StructType`'s fields in a `Schema` +// and bind the projected expression against it. +// * `StrictMetricsEvaluator::Make` binds its expression internally, and `Binder::Bind` +// rejects an already-bound predicate. So the bound filter is used only for the +// projections (`ProjectionEvaluator::Project` requires a bound expression), while the +// UNBOUND `RowFilter()` is handed to `StrictMetricsEvaluator::Make`. +Status OverwriteFiles::ValidateAddedFilesMatchOverwriteFilterImpl( + const TableMetadata& metadata) { + ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema()); + + // Single spec for all added files. This rejects both the empty-added-files case and + // the multi-spec case via InvalidArgument. + ICEBERG_ASSIGN_OR_RAISE(auto spec, DataSpec()); + + // Bind the row filter once for projection (Project requires a bound expression); build + // the strict metrics evaluator once over the UNBOUND row filter (it binds internally). + ICEBERG_ASSIGN_OR_RAISE(auto bound_filter, + Binder::Bind(*schema, RowFilter(), IsCaseSensitive())); + ICEBERG_ASSIGN_OR_RAISE( + auto strict_metrics_evaluator, + StrictMetricsEvaluator::Make(RowFilter(), schema, IsCaseSensitive())); + + // Build ONE inclusive and ONE strict partition-value evaluator from the single spec. + // The Evaluators bind on construction and do not retain the partition schema, so the + // local schema below may safely go out of scope. + ICEBERG_ASSIGN_OR_RAISE(auto partition_type, spec->PartitionType(*schema)); + auto partition_fields = partition_type->fields(); + Schema partition_schema( + std::vector(partition_fields.begin(), partition_fields.end())); + + // Inclusive projection (fast negative): project the bound filter into the partition + // space and build an evaluator over the spec's partition values. + auto inclusive_projection = Projections::Inclusive(*spec, *schema, IsCaseSensitive()); + ICEBERG_ASSIGN_OR_RAISE(auto inclusive_expr, + inclusive_projection->Project(bound_filter)); + ICEBERG_ASSIGN_OR_RAISE( + auto inclusive_evaluator, + Evaluator::Make(partition_schema, inclusive_expr, IsCaseSensitive())); + + // Strict projection (fast positive). + auto strict_projection = Projections::Strict(*spec, *schema, IsCaseSensitive()); + ICEBERG_ASSIGN_OR_RAISE(auto strict_expr, strict_projection->Project(bound_filter)); + ICEBERG_ASSIGN_OR_RAISE( + auto strict_evaluator, + Evaluator::Make(partition_schema, strict_expr, IsCaseSensitive())); + + for (const auto& file : AddedDataFiles()) { + if (file == nullptr) { + return ValidationFailed( + "Cannot validate added files match overwrite filter: null data file"); + } + + // Step 1: inclusive projection — fast negative. If the file's partition cannot match + // the (inclusively projected) filter, the file is definitively outside the range. + ICEBERG_ASSIGN_OR_RAISE(bool inclusive_match, + inclusive_evaluator->Evaluate(file->partition)); + if (!inclusive_match) { + return ValidationFailed( + "Cannot commit file {}: added file does not match overwrite filter (outside " + "the overwrite range)", + file->file_path); + } + + // Step 2: strict projection — fast positive. If the whole partition is guaranteed in + // range, the file is accepted without inspecting metrics. + ICEBERG_ASSIGN_OR_RAISE(bool strict_match, + strict_evaluator->Evaluate(file->partition)); + if (strict_match) { + continue; + } + + // Step 3: file-level proof via strict metrics. A false result — including the case + // where metrics are missing or insufficient — fails validation conservatively rather + // than silently accepting a file that cannot be proved fully in range. + ICEBERG_ASSIGN_OR_RAISE(bool metrics_match, + strict_metrics_evaluator->Evaluate(*file)); + if (!metrics_match) { + return ValidationFailed( + "Cannot commit file {}: added file is not fully contained in the overwrite " + "filter", + file->file_path); + } + } + + return {}; +} + +} // namespace iceberg diff --git a/src/iceberg/update/overwrite_files.h b/src/iceberg/update/overwrite_files.h new file mode 100644 index 000000000..59f25914d --- /dev/null +++ b/src/iceberg/update/overwrite_files.h @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/update/overwrite_files.h + +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/merging_snapshot_update.h" +#include "iceberg/util/data_file_set.h" + +namespace iceberg { + +/// \brief Overwrite data files in a table. +/// +/// OverwriteFiles expresses a logical overwrite transaction: either explicit file +/// replacement (`DeleteFile(old)` + `AddFile(new)`) or range-based replacement +/// (`OverwriteByRowFilter(expr)` + `AddFile(new...)`). It is a concrete subclass of +/// MergingSnapshotUpdate that adds only the overwrite-specific builder surface and the +/// overwrite-specific `Validate(...)` logic, reusing the inherited filter → write → +/// merge pipeline, summary building, commit retry, and cleanup unchanged. +class ICEBERG_EXPORT OverwriteFiles : public MergingSnapshotUpdate { + public: + /// \brief Create a new OverwriteFiles instance. + /// + /// Mirrors FastAppend::Make: validates inputs and returns a heap instance. + /// + /// \param table_name The name of the table + /// \param ctx The transaction context to use for this update + /// \return A Result containing the OverwriteFiles instance or an error + static Result> Make( + std::string table_name, std::shared_ptr ctx); + + ~OverwriteFiles() override; + + // --- Overwrite builder surface (chained, ErrorCollector-deferred) --- + + /// \brief Add a new data file to the overwrite. + /// + /// \param file The data file to add + /// \return Reference to this for method chaining + OverwriteFiles& AddFile(const std::shared_ptr& file); + + /// \brief Delete an existing data file as part of the overwrite. + /// + /// \param file The data file to delete + /// \return Reference to this for method chaining + OverwriteFiles& DeleteFile(const std::shared_ptr& file); + + /// \brief Bulk equivalent of repeated DeleteFile(...) plus explicit delete-file + /// removal. + /// + /// \param data_files_to_delete The data files to delete + /// \param delete_files_to_delete The delete files to remove + /// \return Reference to this for method chaining + OverwriteFiles& DeleteFiles(const DataFileSet& data_files_to_delete, + const DeleteFileSet& delete_files_to_delete); + + /// \brief Overwrite all rows matching the given expression. + /// + /// \param expr The row filter expression defining the overwrite range + /// \return Reference to this for method chaining + OverwriteFiles& OverwriteByRowFilter(std::shared_ptr expr); + + // --- Concurrency / correctness controls --- + + /// \brief Set the lower bound snapshot id for concurrency scans. + /// + /// \param snapshot_id The starting snapshot id + /// \return Reference to this for method chaining + OverwriteFiles& ValidateFromSnapshot(int64_t snapshot_id); + + /// \brief Set an explicit conflict-detection filter. + /// + /// \param expr The conflict-detection filter expression + /// \return Reference to this for method chaining + OverwriteFiles& ConflictDetectionFilter(std::shared_ptr expr); + + /// \brief Enable validation that no concurrent commit added conflicting data files. + /// + /// \return Reference to this for method chaining + OverwriteFiles& ValidateNoConflictingData(); + + /// \brief Enable validation that no concurrent commit added conflicting deletes. + /// + /// \return Reference to this for method chaining + OverwriteFiles& ValidateNoConflictingDeletes(); + + /// \brief Enable strict validation that every added file is fully within the + /// overwrite range. + /// + /// \return Reference to this for method chaining + OverwriteFiles& ValidateAddedFilesMatchOverwriteFilter(); + + /// \brief Set case sensitivity for binding, projection, and metrics evaluation. + /// + /// Forwards to the protected MergingSnapshotUpdate::CaseSensitive(bool); the public + /// name differs to avoid the public/protected name clash and keep a fluent return. + /// + /// \param case_sensitive Whether matching should be case-sensitive + /// \return Reference to this for method chaining + OverwriteFiles& WithCaseSensitivity(bool case_sensitive); + + // --- SnapshotUpdate / MergingSnapshotUpdate overrides --- + + std::string operation() override; + + Status Validate(const TableMetadata& current_metadata, + const std::shared_ptr& snapshot) override; + + private: + explicit OverwriteFiles(std::string table_name, + std::shared_ptr ctx); + + /// \brief Select the conflict-detection filter from the configured state. + std::shared_ptr DataConflictDetectionFilter() const; + + /// \brief Verify every added data file is fully contained in the row filter. + Status ValidateAddedFilesMatchOverwriteFilterImpl(const TableMetadata& metadata); + + std::optional starting_snapshot_id_; + std::shared_ptr conflict_detection_filter_; + DataFileSet deleted_data_files_; + bool validate_no_conflicting_data_ = false; + bool validate_no_conflicting_deletes_ = false; + bool validate_added_files_match_overwrite_filter_ = false; +}; + +} // namespace iceberg