Skip to content

Commit a785119

Browse files
authored
feat(rest): implement list table and update table (#484)
1 parent 95ec7a3 commit a785119

17 files changed

+1881
-56
lines changed

cmake_modules/IcebergBuildUtils.cmake

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,10 @@ function(add_iceberg_lib LIB_NAME)
157157
hidden
158158
VISIBILITY_INLINES_HIDDEN 1)
159159

160+
if(MSVC_TOOLCHAIN)
161+
target_compile_options(${LIB_NAME}_shared PRIVATE /bigobj)
162+
endif()
163+
160164
install(TARGETS ${LIB_NAME}_shared
161165
EXPORT iceberg_targets
162166
ARCHIVE DESTINATION ${INSTALL_ARCHIVE_DIR}
@@ -220,6 +224,10 @@ function(add_iceberg_lib LIB_NAME)
220224
target_compile_definitions(${LIB_NAME}_static PUBLIC ${VISIBILITY_NAME}_STATIC)
221225
endif()
222226

227+
if(MSVC_TOOLCHAIN)
228+
target_compile_options(${LIB_NAME}_static PRIVATE /bigobj)
229+
endif()
230+
223231
install(TARGETS ${LIB_NAME}_static
224232
EXPORT iceberg_targets
225233
ARCHIVE DESTINATION ${INSTALL_ARCHIVE_DIR}

src/iceberg/catalog/rest/http_client.cc

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -135,11 +135,33 @@ Status HandleFailureResponse(const cpr::Response& response,
135135
} // namespace
136136

137137
void HttpClient::PrepareSession(
138-
const std::string& path, const std::unordered_map<std::string, std::string>& params,
138+
const std::string& path, HttpMethod method,
139+
const std::unordered_map<std::string, std::string>& params,
139140
const std::unordered_map<std::string, std::string>& headers) {
140141
session_->SetUrl(cpr::Url{path});
141142
session_->SetParameters(GetParameters(params));
142143
session_->RemoveContent();
144+
// clear lingering POST mode state from prior requests. CURLOPT_POST is implicitly set
145+
// to 1 by POST requests, and this state is not reset by RemoveContent(), so we must
146+
// manually enforce HTTP GET to clear it.
147+
curl_easy_setopt(session_->GetCurlHolder()->handle, CURLOPT_HTTPGET, 1L);
148+
switch (method) {
149+
case HttpMethod::kGet:
150+
session_->PrepareGet();
151+
break;
152+
case HttpMethod::kPost:
153+
session_->PreparePost();
154+
break;
155+
case HttpMethod::kPut:
156+
session_->PreparePut();
157+
break;
158+
case HttpMethod::kDelete:
159+
session_->PrepareDelete();
160+
break;
161+
case HttpMethod::kHead:
162+
session_->PrepareHead();
163+
break;
164+
}
143165
auto final_headers = MergeHeaders(default_headers_, headers);
144166
session_->SetHeader(final_headers);
145167
}
@@ -163,7 +185,7 @@ Result<HttpResponse> HttpClient::Get(
163185
cpr::Response response;
164186
{
165187
std::lock_guard guard(session_mutex_);
166-
PrepareSession(path, params, headers);
188+
PrepareSession(path, HttpMethod::kGet, params, headers);
167189
response = session_->Get();
168190
}
169191

@@ -180,7 +202,7 @@ Result<HttpResponse> HttpClient::Post(
180202
cpr::Response response;
181203
{
182204
std::lock_guard guard(session_mutex_);
183-
PrepareSession(path, /*params=*/{}, headers);
205+
PrepareSession(path, HttpMethod::kPost, /*params=*/{}, headers);
184206
session_->SetBody(cpr::Body{body});
185207
response = session_->Post();
186208
}
@@ -205,7 +227,7 @@ Result<HttpResponse> HttpClient::PostForm(
205227
auto form_headers = headers;
206228
form_headers[kHeaderContentType] = kMimeTypeFormUrlEncoded;
207229

208-
PrepareSession(path, /*params=*/{}, form_headers);
230+
PrepareSession(path, HttpMethod::kPost, /*params=*/{}, form_headers);
209231
std::vector<cpr::Pair> pair_list;
210232
pair_list.reserve(form_data.size());
211233
for (const auto& [key, val] : form_data) {
@@ -228,7 +250,7 @@ Result<HttpResponse> HttpClient::Head(
228250
cpr::Response response;
229251
{
230252
std::lock_guard guard(session_mutex_);
231-
PrepareSession(path, /*params=*/{}, headers);
253+
PrepareSession(path, HttpMethod::kHead, /*params=*/{}, headers);
232254
response = session_->Head();
233255
}
234256

@@ -245,7 +267,7 @@ Result<HttpResponse> HttpClient::Delete(
245267
cpr::Response response;
246268
{
247269
std::lock_guard guard(session_mutex_);
248-
PrepareSession(path, params, headers);
270+
PrepareSession(path, HttpMethod::kDelete, params, headers);
249271
response = session_->Delete();
250272
}
251273

src/iceberg/catalog/rest/http_client.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <string>
2626
#include <unordered_map>
2727

28+
#include "iceberg/catalog/rest/endpoint.h"
2829
#include "iceberg/catalog/rest/iceberg_rest_export.h"
2930
#include "iceberg/catalog/rest/type_fwd.h"
3031
#include "iceberg/result.h"
@@ -109,7 +110,7 @@ class ICEBERG_REST_EXPORT HttpClient {
109110
const ErrorHandler& error_handler);
110111

111112
private:
112-
void PrepareSession(const std::string& path,
113+
void PrepareSession(const std::string& path, HttpMethod method,
113114
const std::unordered_map<std::string, std::string>& params,
114115
const std::unordered_map<std::string, std::string>& headers);
115116

src/iceberg/catalog/rest/json_internal.cc

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
#include "iceberg/partition_spec.h"
3232
#include "iceberg/sort_order.h"
3333
#include "iceberg/table_identifier.h"
34+
#include "iceberg/table_requirement.h"
35+
#include "iceberg/table_update.h"
3436
#include "iceberg/util/json_util_internal.h"
3537
#include "iceberg/util/macros.h"
3638

@@ -69,6 +71,8 @@ constexpr std::string_view kType = "type";
6971
constexpr std::string_view kCode = "code";
7072
constexpr std::string_view kStack = "stack";
7173
constexpr std::string_view kError = "error";
74+
constexpr std::string_view kIdentifier = "identifier";
75+
constexpr std::string_view kRequirements = "requirements";
7276

7377
} // namespace
7478

@@ -390,6 +394,75 @@ Result<CreateTableRequest> CreateTableRequestFromJson(const nlohmann::json& json
390394
return request;
391395
}
392396

397+
// CommitTableRequest serialization
398+
nlohmann::json ToJson(const CommitTableRequest& request) {
399+
nlohmann::json json;
400+
if (!request.identifier.name.empty()) {
401+
json[kIdentifier] = ToJson(request.identifier);
402+
}
403+
404+
nlohmann::json requirements_json = nlohmann::json::array();
405+
for (const auto& req : request.requirements) {
406+
requirements_json.push_back(ToJson(*req));
407+
}
408+
json[kRequirements] = std::move(requirements_json);
409+
410+
nlohmann::json updates_json = nlohmann::json::array();
411+
for (const auto& update : request.updates) {
412+
updates_json.push_back(ToJson(*update));
413+
}
414+
json[kUpdates] = std::move(updates_json);
415+
416+
return json;
417+
}
418+
419+
Result<CommitTableRequest> CommitTableRequestFromJson(const nlohmann::json& json) {
420+
CommitTableRequest request;
421+
if (json.contains(kIdentifier)) {
422+
ICEBERG_ASSIGN_OR_RAISE(auto identifier_json,
423+
GetJsonValue<nlohmann::json>(json, kIdentifier));
424+
ICEBERG_ASSIGN_OR_RAISE(request.identifier, TableIdentifierFromJson(identifier_json));
425+
}
426+
427+
ICEBERG_ASSIGN_OR_RAISE(auto requirements_json,
428+
GetJsonValue<nlohmann::json>(json, kRequirements));
429+
for (const auto& req_json : requirements_json) {
430+
ICEBERG_ASSIGN_OR_RAISE(auto requirement, TableRequirementFromJson(req_json));
431+
request.requirements.push_back(std::move(requirement));
432+
}
433+
434+
ICEBERG_ASSIGN_OR_RAISE(auto updates_json,
435+
GetJsonValue<nlohmann::json>(json, kUpdates));
436+
for (const auto& update_json : updates_json) {
437+
ICEBERG_ASSIGN_OR_RAISE(auto update, TableUpdateFromJson(update_json));
438+
request.updates.push_back(std::move(update));
439+
}
440+
441+
ICEBERG_RETURN_UNEXPECTED(request.Validate());
442+
return request;
443+
}
444+
445+
// CommitTableResponse serialization
446+
nlohmann::json ToJson(const CommitTableResponse& response) {
447+
nlohmann::json json;
448+
json[kMetadataLocation] = response.metadata_location;
449+
if (response.metadata) {
450+
json[kMetadata] = ToJson(*response.metadata);
451+
}
452+
return json;
453+
}
454+
455+
Result<CommitTableResponse> CommitTableResponseFromJson(const nlohmann::json& json) {
456+
CommitTableResponse response;
457+
ICEBERG_ASSIGN_OR_RAISE(response.metadata_location,
458+
GetJsonValue<std::string>(json, kMetadataLocation));
459+
ICEBERG_ASSIGN_OR_RAISE(auto metadata_json,
460+
GetJsonValue<nlohmann::json>(json, kMetadata));
461+
ICEBERG_ASSIGN_OR_RAISE(response.metadata, TableMetadataFromJson(metadata_json));
462+
ICEBERG_RETURN_UNEXPECTED(response.Validate());
463+
return response;
464+
}
465+
393466
#define ICEBERG_DEFINE_FROM_JSON(Model) \
394467
template <> \
395468
Result<Model> FromJson<Model>(const nlohmann::json& json) { \
@@ -409,5 +482,7 @@ ICEBERG_DEFINE_FROM_JSON(LoadTableResult)
409482
ICEBERG_DEFINE_FROM_JSON(RegisterTableRequest)
410483
ICEBERG_DEFINE_FROM_JSON(RenameTableRequest)
411484
ICEBERG_DEFINE_FROM_JSON(CreateTableRequest)
485+
ICEBERG_DEFINE_FROM_JSON(CommitTableRequest)
486+
ICEBERG_DEFINE_FROM_JSON(CommitTableResponse)
412487

413488
} // namespace iceberg::rest

src/iceberg/catalog/rest/json_internal.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ ICEBERG_DECLARE_JSON_SERDE(LoadTableResult)
5656
ICEBERG_DECLARE_JSON_SERDE(RegisterTableRequest)
5757
ICEBERG_DECLARE_JSON_SERDE(RenameTableRequest)
5858
ICEBERG_DECLARE_JSON_SERDE(CreateTableRequest)
59+
ICEBERG_DECLARE_JSON_SERDE(CommitTableRequest)
60+
ICEBERG_DECLARE_JSON_SERDE(CommitTableResponse)
5961

6062
#undef ICEBERG_DECLARE_JSON_SERDE
6163

src/iceberg/catalog/rest/rest_catalog.cc

Lines changed: 84 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
#include "iceberg/schema.h"
4343
#include "iceberg/sort_order.h"
4444
#include "iceberg/table.h"
45+
#include "iceberg/table_requirement.h"
46+
#include "iceberg/table_update.h"
4547
#include "iceberg/util/macros.h"
4648

4749
namespace iceberg::rest {
@@ -177,7 +179,7 @@ Result<std::vector<Namespace>> RestCatalog::ListNamespaces(const Namespace& ns)
177179
if (list_response.next_page_token.empty()) {
178180
return result;
179181
}
180-
next_token = list_response.next_page_token;
182+
next_token = std::move(list_response.next_page_token);
181183
}
182184
return result;
183185
}
@@ -246,9 +248,30 @@ Status RestCatalog::UpdateNamespaceProperties(
246248
return {};
247249
}
248250

249-
Result<std::vector<TableIdentifier>> RestCatalog::ListTables(
250-
[[maybe_unused]] const Namespace& ns) const {
251-
return NotImplemented("Not implemented");
251+
Result<std::vector<TableIdentifier>> RestCatalog::ListTables(const Namespace& ns) const {
252+
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::ListTables());
253+
254+
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Tables(ns));
255+
std::vector<TableIdentifier> result;
256+
std::string next_token;
257+
while (true) {
258+
std::unordered_map<std::string, std::string> params;
259+
if (!next_token.empty()) {
260+
params[kQueryParamPageToken] = next_token;
261+
}
262+
ICEBERG_ASSIGN_OR_RAISE(
263+
const auto response,
264+
client_->Get(path, params, /*headers=*/{}, *TableErrorHandler::Instance()));
265+
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
266+
ICEBERG_ASSIGN_OR_RAISE(auto list_response, ListTablesResponseFromJson(json));
267+
result.insert(result.end(), list_response.identifiers.begin(),
268+
list_response.identifiers.end());
269+
if (list_response.next_page_token.empty()) {
270+
return result;
271+
}
272+
next_token = std::move(list_response.next_page_token);
273+
}
274+
return result;
252275
}
253276

254277
Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
@@ -282,10 +305,33 @@ Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
282305
}
283306

284307
Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
285-
[[maybe_unused]] const TableIdentifier& identifier,
286-
[[maybe_unused]] const std::vector<std::unique_ptr<TableRequirement>>& requirements,
287-
[[maybe_unused]] const std::vector<std::unique_ptr<TableUpdate>>& updates) {
288-
return NotImplemented("Not implemented");
308+
const TableIdentifier& identifier,
309+
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
310+
const std::vector<std::unique_ptr<TableUpdate>>& updates) {
311+
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::UpdateTable());
312+
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
313+
314+
CommitTableRequest request{.identifier = identifier};
315+
request.requirements.reserve(requirements.size());
316+
for (const auto& req : requirements) {
317+
request.requirements.push_back(req->Clone());
318+
}
319+
request.updates.reserve(updates.size());
320+
for (const auto& update : updates) {
321+
request.updates.push_back(update->Clone());
322+
}
323+
324+
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
325+
ICEBERG_ASSIGN_OR_RAISE(
326+
const auto response,
327+
client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance()));
328+
329+
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
330+
ICEBERG_ASSIGN_OR_RAISE(auto commit_response, CommitTableResponseFromJson(json));
331+
332+
return Table::Make(identifier, std::move(commit_response.metadata),
333+
std::move(commit_response.metadata_location), file_io_,
334+
shared_from_this());
289335
}
290336

291337
Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
@@ -323,9 +369,17 @@ Result<bool> RestCatalog::TableExists(const TableIdentifier& identifier) const {
323369
client_->Head(path, /*headers=*/{}, *TableErrorHandler::Instance()));
324370
}
325371

326-
Status RestCatalog::RenameTable([[maybe_unused]] const TableIdentifier& from,
327-
[[maybe_unused]] const TableIdentifier& to) {
328-
return NotImplemented("Not implemented");
372+
Status RestCatalog::RenameTable(const TableIdentifier& from, const TableIdentifier& to) {
373+
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::RenameTable());
374+
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Rename());
375+
376+
RenameTableRequest request{.source = from, .destination = to};
377+
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
378+
ICEBERG_ASSIGN_OR_RAISE(
379+
const auto response,
380+
client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance()));
381+
382+
return {};
329383
}
330384

331385
Result<std::string> RestCatalog::LoadTableInternal(
@@ -352,9 +406,25 @@ Result<std::shared_ptr<Table>> RestCatalog::LoadTable(const TableIdentifier& ide
352406
}
353407

354408
Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
355-
[[maybe_unused]] const TableIdentifier& identifier,
356-
[[maybe_unused]] const std::string& metadata_file_location) {
357-
return NotImplemented("Not implemented");
409+
const TableIdentifier& identifier, const std::string& metadata_file_location) {
410+
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::RegisterTable());
411+
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Register(identifier.ns));
412+
413+
RegisterTableRequest request{
414+
.name = identifier.name,
415+
.metadata_location = metadata_file_location,
416+
};
417+
418+
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
419+
ICEBERG_ASSIGN_OR_RAISE(
420+
const auto response,
421+
client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance()));
422+
423+
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
424+
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
425+
return Table::Make(identifier, std::move(load_result.metadata),
426+
std::move(load_result.metadata_location), file_io_,
427+
shared_from_this());
358428
}
359429

360430
} // namespace iceberg::rest

0 commit comments

Comments
 (0)