Skip to content

Commit 8a319c1

Browse files
committed
feat: implement set snapshot
1 parent bc2e026 commit 8a319c1

File tree

13 files changed

+480
-40
lines changed

13 files changed

+480
-40
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ set(ICEBERG_SOURCES
8686
type.cc
8787
update/expire_snapshots.cc
8888
update/pending_update.cc
89+
update/set_snapshot.cc
8990
update/snapshot_update.cc
9091
update/update_location.cc
9192
update/update_partition_spec.cc

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ iceberg_sources = files(
104104
'type.cc',
105105
'update/expire_snapshots.cc',
106106
'update/pending_update.cc',
107+
'update/set_snapshot.cc',
107108
'update/snapshot_update.cc',
108109
'update/update_location.cc',
109110
'update/update_partition_spec.cc',

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ if(ICEBERG_BUILD_BUNDLE)
172172
USE_BUNDLE
173173
SOURCES
174174
expire_snapshots_test.cc
175+
set_snapshot_test.cc
175176
transaction_test.cc
176177
update_location_test.cc
177178
update_partition_spec_test.cc
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/update/set_snapshot.h"
21+
22+
#include <memory>
23+
24+
#include <gmock/gmock.h>
25+
#include <gtest/gtest.h>
26+
27+
#include "iceberg/result.h"
28+
#include "iceberg/snapshot.h"
29+
#include "iceberg/test/matchers.h"
30+
#include "iceberg/test/update_test_base.h"
31+
#include "iceberg/transaction.h"
32+
33+
namespace iceberg {
34+
35+
// Test fixture for SetSnapshot tests
36+
class SetSnapshotTest : public UpdateTestBase {
37+
protected:
38+
// Snapshot IDs from TableMetadataV2Valid.json
39+
static constexpr int64_t kOldestSnapshotId = 3051729675574597004;
40+
static constexpr int64_t kCurrentSnapshotId = 3055729675574597004;
41+
42+
// Timestamps from TableMetadataV2Valid.json
43+
static constexpr int64_t kOldestSnapshotTimestamp = 1515100955770;
44+
static constexpr int64_t kCurrentSnapshotTimestamp = 1555100955770;
45+
};
46+
47+
TEST_F(SetSnapshotTest, SetCurrentSnapshotValid) {
48+
ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
49+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
50+
EXPECT_EQ(set_snapshot->kind(), PendingUpdate::Kind::kSetSnapshot);
51+
52+
set_snapshot->SetCurrentSnapshot(kOldestSnapshotId);
53+
54+
ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
55+
EXPECT_EQ(snapshot_id, kOldestSnapshotId);
56+
57+
// Commit and verify the change was persisted
58+
EXPECT_THAT(set_snapshot->Commit(), IsOk());
59+
EXPECT_THAT(transaction->Commit(), IsOk());
60+
ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
61+
ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot());
62+
EXPECT_EQ(current_snapshot->snapshot_id, kOldestSnapshotId);
63+
}
64+
65+
TEST_F(SetSnapshotTest, SetCurrentSnapshotToCurrentSnapshot) {
66+
ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
67+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
68+
set_snapshot->SetCurrentSnapshot(kCurrentSnapshotId);
69+
70+
ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
71+
EXPECT_EQ(snapshot_id, kCurrentSnapshotId);
72+
}
73+
74+
TEST_F(SetSnapshotTest, SetCurrentSnapshotInvalid) {
75+
ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
76+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
77+
// Try to set to a non-existent snapshot
78+
int64_t invalid_snapshot_id = 9999999999999999;
79+
set_snapshot->SetCurrentSnapshot(invalid_snapshot_id);
80+
81+
auto result = set_snapshot->Apply();
82+
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
83+
EXPECT_THAT(result, HasErrorMessage("is not found"));
84+
}
85+
86+
TEST_F(SetSnapshotTest, RollbackToValid) {
87+
ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
88+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
89+
// Rollback to the oldest snapshot (which is an ancestor)
90+
set_snapshot->RollbackTo(kOldestSnapshotId);
91+
92+
ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
93+
EXPECT_EQ(snapshot_id, kOldestSnapshotId);
94+
}
95+
96+
TEST_F(SetSnapshotTest, RollbackToInvalidSnapshot) {
97+
ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
98+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
99+
// Try to rollback to a non-existent snapshot
100+
int64_t invalid_snapshot_id = 9999999999999999;
101+
set_snapshot->RollbackTo(invalid_snapshot_id);
102+
103+
auto result = set_snapshot->Apply();
104+
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
105+
EXPECT_THAT(result, HasErrorMessage("unknown snapshot id"));
106+
}
107+
108+
TEST_F(SetSnapshotTest, RollbackToTimeValidOldestSnapshot) {
109+
ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
110+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
111+
// Rollback to a time between the two snapshots
112+
// This should select the oldest snapshot
113+
int64_t time_between = (kOldestSnapshotTimestamp + kCurrentSnapshotTimestamp) / 2;
114+
set_snapshot->RollbackToTime(time_between);
115+
116+
ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
117+
EXPECT_EQ(snapshot_id, kOldestSnapshotId);
118+
}
119+
120+
TEST_F(SetSnapshotTest, RollbackToTimeBeforeAnySnapshot) {
121+
ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
122+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
123+
// Try to rollback to a time before any snapshot
124+
int64_t time_before_all = kOldestSnapshotTimestamp - 1000000;
125+
set_snapshot->RollbackToTime(time_before_all);
126+
127+
// Should fail - no snapshot older than the specified time
128+
auto result = set_snapshot->Apply();
129+
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
130+
EXPECT_THAT(result, HasErrorMessage("no valid snapshot older than"));
131+
}
132+
133+
TEST_F(SetSnapshotTest, RollbackToTimeExactMatch) {
134+
ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
135+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
136+
// Rollback to a timestamp just after the oldest snapshot
137+
int64_t time_just_after_oldest = kOldestSnapshotTimestamp + 1;
138+
set_snapshot->RollbackToTime(time_just_after_oldest);
139+
140+
// Apply and verify - should return the oldest snapshot
141+
ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
142+
EXPECT_EQ(snapshot_id, kOldestSnapshotId);
143+
}
144+
145+
TEST_F(SetSnapshotTest, ApplyWithoutChanges) {
146+
ICEBERG_UNWRAP_OR_FAIL(auto transaction, table_->NewTransaction());
147+
ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, transaction->NewSetSnapshot());
148+
ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply());
149+
EXPECT_EQ(snapshot_id, kCurrentSnapshotId);
150+
151+
EXPECT_THAT(set_snapshot->Commit(), IsOk());
152+
EXPECT_THAT(transaction->Commit(), IsOk());
153+
ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
154+
ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot());
155+
EXPECT_EQ(current_snapshot->snapshot_id, kCurrentSnapshotId);
156+
}
157+
158+
} // namespace iceberg

src/iceberg/transaction.cc

Lines changed: 52 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
#include "iceberg/catalog.h"
2626
#include "iceberg/schema.h"
27+
#include "iceberg/snapshot.h"
2728
#include "iceberg/table.h"
2829
#include "iceberg/table_metadata.h"
2930
#include "iceberg/table_properties.h"
@@ -32,6 +33,7 @@
3233
#include "iceberg/table_update.h"
3334
#include "iceberg/update/expire_snapshots.h"
3435
#include "iceberg/update/pending_update.h"
36+
#include "iceberg/update/set_snapshot.h"
3537
#include "iceberg/update/snapshot_update.h"
3638
#include "iceberg/update/update_location.h"
3739
#include "iceberg/update/update_partition_spec.h"
@@ -96,23 +98,35 @@ Status Transaction::AddUpdate(const std::shared_ptr<PendingUpdate>& update) {
9698

9799
Status Transaction::Apply(PendingUpdate& update) {
98100
switch (update.kind()) {
99-
case PendingUpdate::Kind::kUpdateProperties: {
100-
auto& update_properties = internal::checked_cast<UpdateProperties&>(update);
101-
ICEBERG_ASSIGN_OR_RAISE(auto result, update_properties.Apply());
102-
if (!result.updates.empty()) {
103-
metadata_builder_->SetProperties(std::move(result.updates));
101+
case PendingUpdate::Kind::kExpireSnapshots: {
102+
auto& expire_snapshots = internal::checked_cast<ExpireSnapshots&>(update);
103+
ICEBERG_ASSIGN_OR_RAISE(auto result, expire_snapshots.Apply());
104+
if (!result.snapshot_ids_to_remove.empty()) {
105+
metadata_builder_->RemoveSnapshots(std::move(result.snapshot_ids_to_remove));
104106
}
105-
if (!result.removals.empty()) {
106-
metadata_builder_->RemoveProperties(std::move(result.removals));
107+
if (!result.refs_to_remove.empty()) {
108+
for (const auto& ref_name : result.refs_to_remove) {
109+
metadata_builder_->RemoveRef(ref_name);
110+
}
107111
}
108-
if (result.format_version.has_value()) {
109-
metadata_builder_->UpgradeFormatVersion(result.format_version.value());
112+
if (!result.partition_spec_ids_to_remove.empty()) {
113+
metadata_builder_->RemovePartitionSpecs(
114+
std::move(result.partition_spec_ids_to_remove));
115+
}
116+
if (!result.schema_ids_to_remove.empty()) {
117+
metadata_builder_->RemoveSchemas(std::move(result.schema_ids_to_remove));
110118
}
111119
} break;
112-
case PendingUpdate::Kind::kUpdateSortOrder: {
113-
auto& update_sort_order = internal::checked_cast<UpdateSortOrder&>(update);
114-
ICEBERG_ASSIGN_OR_RAISE(auto sort_order, update_sort_order.Apply());
115-
metadata_builder_->SetDefaultSortOrder(std::move(sort_order));
120+
case PendingUpdate::Kind::kSetSnapshot: {
121+
auto& set_snapshot = internal::checked_cast<SetSnapshot&>(update);
122+
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, set_snapshot.Apply());
123+
metadata_builder_->SetBranchSnapshot(snapshot_id,
124+
std::string(SnapshotRef::kMainBranch));
125+
} break;
126+
case PendingUpdate::Kind::kUpdateLocation: {
127+
auto& update_location = internal::checked_cast<UpdateLocation&>(update);
128+
ICEBERG_ASSIGN_OR_RAISE(auto location, update_location.Apply());
129+
metadata_builder_->SetLocation(location);
116130
} break;
117131
case PendingUpdate::Kind::kUpdatePartitionSpec: {
118132
auto& update_partition_spec = internal::checked_cast<UpdatePartitionSpec&>(update);
@@ -123,12 +137,30 @@ Status Transaction::Apply(PendingUpdate& update) {
123137
metadata_builder_->AddPartitionSpec(std::move(result.spec));
124138
}
125139
} break;
140+
case PendingUpdate::Kind::kUpdateProperties: {
141+
auto& update_properties = internal::checked_cast<UpdateProperties&>(update);
142+
ICEBERG_ASSIGN_OR_RAISE(auto result, update_properties.Apply());
143+
if (!result.updates.empty()) {
144+
metadata_builder_->SetProperties(std::move(result.updates));
145+
}
146+
if (!result.removals.empty()) {
147+
metadata_builder_->RemoveProperties(std::move(result.removals));
148+
}
149+
if (result.format_version.has_value()) {
150+
metadata_builder_->UpgradeFormatVersion(result.format_version.value());
151+
}
152+
} break;
126153
case PendingUpdate::Kind::kUpdateSchema: {
127154
auto& update_schema = internal::checked_cast<UpdateSchema&>(update);
128155
ICEBERG_ASSIGN_OR_RAISE(auto result, update_schema.Apply());
129156
metadata_builder_->SetCurrentSchema(std::move(result.schema),
130157
result.new_last_column_id);
131158
} break;
159+
case PendingUpdate::Kind::kUpdateSortOrder: {
160+
auto& update_sort_order = internal::checked_cast<UpdateSortOrder&>(update);
161+
ICEBERG_ASSIGN_OR_RAISE(auto sort_order, update_sort_order.Apply());
162+
metadata_builder_->SetDefaultSortOrder(std::move(sort_order));
163+
} break;
132164
case PendingUpdate::Kind::kUpdateSnapshot: {
133165
const auto& base = metadata_builder_->current();
134166

@@ -165,30 +197,6 @@ Status Transaction::Apply(PendingUpdate& update) {
165197
metadata_builder_->AssignUUID();
166198
}
167199
} break;
168-
case PendingUpdate::Kind::kExpireSnapshots: {
169-
auto& expire_snapshots = internal::checked_cast<ExpireSnapshots&>(update);
170-
ICEBERG_ASSIGN_OR_RAISE(auto result, expire_snapshots.Apply());
171-
if (!result.snapshot_ids_to_remove.empty()) {
172-
metadata_builder_->RemoveSnapshots(std::move(result.snapshot_ids_to_remove));
173-
}
174-
if (!result.refs_to_remove.empty()) {
175-
for (const auto& ref_name : result.refs_to_remove) {
176-
metadata_builder_->RemoveRef(ref_name);
177-
}
178-
}
179-
if (!result.partition_spec_ids_to_remove.empty()) {
180-
metadata_builder_->RemovePartitionSpecs(
181-
std::move(result.partition_spec_ids_to_remove));
182-
}
183-
if (!result.schema_ids_to_remove.empty()) {
184-
metadata_builder_->RemoveSchemas(std::move(result.schema_ids_to_remove));
185-
}
186-
} break;
187-
case PendingUpdate::Kind::kUpdateLocation: {
188-
auto& update_location = internal::checked_cast<UpdateLocation&>(update);
189-
ICEBERG_ASSIGN_OR_RAISE(auto location, update_location.Apply());
190-
metadata_builder_->SetLocation(location);
191-
} break;
192200
default:
193201
return NotSupported("Unsupported pending update: {}",
194202
static_cast<int32_t>(update.kind()));
@@ -293,4 +301,11 @@ Result<std::shared_ptr<UpdateLocation>> Transaction::NewUpdateLocation() {
293301
return update_location;
294302
}
295303

304+
Result<std::shared_ptr<SetSnapshot>> Transaction::NewSetSnapshot() {
305+
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<SetSnapshot> set_snapshot,
306+
SetSnapshot::Make(shared_from_this()));
307+
ICEBERG_RETURN_UNEXPECTED(AddUpdate(set_snapshot));
308+
return set_snapshot;
309+
}
310+
296311
} // namespace iceberg

src/iceberg/transaction.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "iceberg/iceberg_export.h"
2727
#include "iceberg/result.h"
2828
#include "iceberg/type_fwd.h"
29+
#include "iceberg/update/set_snapshot.h"
2930

3031
namespace iceberg {
3132

@@ -85,6 +86,10 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this<Transacti
8586
/// \brief Create a new UpdateLocation to update the table location and commit the
8687
/// changes.
8788
Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();
89+
90+
/// \brief Create a new SetSnapshot to set the current snapshot or rollback to a
91+
/// previous snapshot and commit the changes.
92+
Result<std::shared_ptr<SetSnapshot>> NewSetSnapshot();
8893

8994
private:
9095
Transaction(std::shared_ptr<Table> table, Kind kind, bool auto_commit,

src/iceberg/type_fwd.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ class Transaction;
189189
/// \brief Update family.
190190
class ExpireSnapshots;
191191
class PendingUpdate;
192+
class SetSnapshot;
192193
class SnapshotUpdate;
193194
class UpdateLocation;
194195
class UpdatePartitionSpec;

src/iceberg/update/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ install_headers(
1919
[
2020
'expire_snapshots.h',
2121
'pending_update.h',
22+
'set_snapshot.h',
2223
'snapshot_update.h',
2324
'update_location.h',
2425
'update_partition_spec.h',

src/iceberg/update/pending_update.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
4343
public:
4444
enum class Kind : uint8_t {
4545
kExpireSnapshots,
46+
kSetSnapshot,
4647
kUpdateLocation,
4748
kUpdatePartitionSpec,
4849
kUpdateProperties,

0 commit comments

Comments
 (0)