Skip to content

Commit 8436b72

Browse files
feat: integration s3 with arrow filesystem
1 parent e7f1d0f commit 8436b72

File tree

5 files changed

+262
-0
lines changed

5 files changed

+262
-0
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ add_subdirectory(util)
171171
if(ICEBERG_BUILD_BUNDLE)
172172
set(ICEBERG_BUNDLE_SOURCES
173173
arrow/arrow_fs_file_io.cc
174+
arrow/arrow_s3_file_io.cc
174175
arrow/metadata_column_util.cc
175176
avro/avro_data_util.cc
176177
avro/avro_direct_decoder.cc

src/iceberg/arrow/arrow_file_io.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,19 @@
2020
#pragma once
2121

2222
#include <memory>
23+
#include <string>
2324

2425
#include "iceberg/file_io.h"
2526
#include "iceberg/iceberg_bundle_export.h"
27+
#include "iceberg/result.h"
2628

2729
namespace iceberg::arrow {
2830

2931
ICEBERG_BUNDLE_EXPORT std::unique_ptr<FileIO> MakeMockFileIO();
3032

3133
ICEBERG_BUNDLE_EXPORT std::unique_ptr<FileIO> MakeLocalFileIO();
3234

35+
ICEBERG_BUNDLE_EXPORT Result<std::unique_ptr<FileIO>> MakeS3FileIO(
36+
const std::string& uri);
37+
3338
} // namespace iceberg::arrow
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include <cstdlib>
21+
#include <mutex>
22+
#include <string_view>
23+
24+
#include <arrow/filesystem/filesystem.h>
25+
#if __has_include(<arrow/filesystem/s3fs.h>)
26+
#include <arrow/filesystem/s3fs.h>
27+
#define ICEBERG_ARROW_HAS_S3 1
28+
#else
29+
#define ICEBERG_ARROW_HAS_S3 0
30+
#endif
31+
32+
#include "iceberg/arrow/arrow_file_io.h"
33+
#include "iceberg/arrow/arrow_status_internal.h"
34+
35+
namespace iceberg::arrow {
36+
37+
namespace {
38+
39+
bool IsS3Uri(std::string_view uri) { return uri.rfind("s3://", 0) == 0; }
40+
41+
Status EnsureS3Initialized() {
42+
#if ICEBERG_ARROW_HAS_S3
43+
static std::once_flag init_flag;
44+
static ::arrow::Status init_status = ::arrow::Status::OK();
45+
std::call_once(init_flag, []() {
46+
::arrow::fs::S3GlobalOptions options;
47+
init_status = ::arrow::fs::InitializeS3(options);
48+
if (init_status.ok()) {
49+
std::atexit([]() { (void)::arrow::fs::FinalizeS3(); });
50+
}
51+
});
52+
if (!init_status.ok()) {
53+
return std::unexpected<Error>{
54+
{.kind = ::iceberg::arrow::ToErrorKind(init_status),
55+
.message = init_status.ToString()}};
56+
}
57+
return {};
58+
#else
59+
return NotImplemented("Arrow S3 support is not enabled");
60+
#endif
61+
}
62+
63+
Result<std::shared_ptr<::arrow::fs::FileSystem>> ResolveFileSystemFromUri(
64+
const std::string& uri, std::string* out_path) {
65+
if (IsS3Uri(uri)) {
66+
auto init_status = EnsureS3Initialized();
67+
if (!init_status.has_value()) {
68+
return std::unexpected<Error>(init_status.error());
69+
}
70+
}
71+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto fs, ::arrow::fs::FileSystemFromUri(uri, out_path));
72+
return fs;
73+
}
74+
75+
class ArrowUriFileIO : public FileIO {
76+
public:
77+
Result<std::string> ReadFile(const std::string& file_location,
78+
std::optional<size_t> length) override {
79+
std::string path;
80+
auto fs_result = ResolveFileSystemFromUri(file_location, &path);
81+
if (!fs_result.has_value()) {
82+
return std::unexpected<Error>(fs_result.error());
83+
}
84+
auto fs = std::move(fs_result).value();
85+
::arrow::fs::FileInfo file_info(path);
86+
if (length.has_value()) {
87+
file_info.set_size(length.value());
88+
}
89+
std::string content;
90+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, fs->OpenInputFile(file_info));
91+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file_size, file->GetSize());
92+
93+
content.resize(file_size);
94+
size_t remain = file_size;
95+
size_t offset = 0;
96+
while (remain > 0) {
97+
size_t read_length = std::min(remain, static_cast<size_t>(1024 * 1024));
98+
ICEBERG_ARROW_ASSIGN_OR_RETURN(
99+
auto read_bytes,
100+
file->Read(read_length, reinterpret_cast<uint8_t*>(&content[offset])));
101+
remain -= read_bytes;
102+
offset += read_bytes;
103+
}
104+
105+
return content;
106+
}
107+
108+
Status WriteFile(const std::string& file_location,
109+
std::string_view content) override {
110+
std::string path;
111+
auto fs_result = ResolveFileSystemFromUri(file_location, &path);
112+
if (!fs_result.has_value()) {
113+
return std::unexpected<Error>(fs_result.error());
114+
}
115+
auto fs = std::move(fs_result).value();
116+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, fs->OpenOutputStream(path));
117+
ICEBERG_ARROW_RETURN_NOT_OK(file->Write(content.data(), content.size()));
118+
ICEBERG_ARROW_RETURN_NOT_OK(file->Flush());
119+
ICEBERG_ARROW_RETURN_NOT_OK(file->Close());
120+
return {};
121+
}
122+
123+
Status DeleteFile(const std::string& file_location) override {
124+
std::string path;
125+
auto fs_result = ResolveFileSystemFromUri(file_location, &path);
126+
if (!fs_result.has_value()) {
127+
return std::unexpected<Error>(fs_result.error());
128+
}
129+
auto fs = std::move(fs_result).value();
130+
ICEBERG_ARROW_RETURN_NOT_OK(fs->DeleteFile(path));
131+
return {};
132+
}
133+
};
134+
135+
} // namespace
136+
137+
Result<std::unique_ptr<FileIO>> MakeS3FileIO(const std::string& uri) {
138+
if (!IsS3Uri(uri)) {
139+
return InvalidArgument("S3 URI must start with s3://");
140+
}
141+
#if !ICEBERG_ARROW_HAS_S3
142+
return NotImplemented("Arrow S3 support is not enabled");
143+
#else
144+
std::string path;
145+
auto fs_result = ResolveFileSystemFromUri(uri, &path);
146+
if (!fs_result.has_value()) {
147+
return std::unexpected<Error>(fs_result.error());
148+
}
149+
(void)path;
150+
return std::make_unique<ArrowUriFileIO>();
151+
#endif
152+
}
153+
154+
} // namespace iceberg::arrow

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ if(ICEBERG_BUILD_BUNDLE)
136136
USE_BUNDLE
137137
SOURCES
138138
arrow_fs_file_io_test.cc
139+
arrow_s3_file_io_test.cc
139140
arrow_test.cc
140141
gzip_decompress_test.cc
141142
metadata_io_test.cc
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include <cstdlib>
21+
#include <string>
22+
23+
#if __has_include(<arrow/filesystem/s3fs.h>)
24+
#include <arrow/filesystem/s3fs.h>
25+
#endif
26+
#include <gtest/gtest.h>
27+
28+
#include "iceberg/arrow/arrow_file_io.h"
29+
#include "iceberg/test/matchers.h"
30+
31+
namespace iceberg::arrow {
32+
33+
#if __has_include(<arrow/filesystem/s3fs.h>)
34+
namespace {
35+
class ArrowS3Environment final : public ::testing::Environment {
36+
public:
37+
void TearDown() override { (void)::arrow::fs::FinalizeS3(); }
38+
};
39+
} // namespace
40+
#endif
41+
42+
TEST(ArrowS3FileIOTest, RejectsNonS3Uri) {
43+
auto result = MakeS3FileIO("file:///tmp/not-s3");
44+
EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
45+
EXPECT_THAT(result, HasErrorMessage("s3://"));
46+
}
47+
48+
#if __has_include(<arrow/filesystem/s3fs.h>)
49+
TEST(ArrowS3FileIOTest, RequiresS3SupportAtBuildTime) {
50+
auto result = MakeS3FileIO("s3://bucket/path");
51+
if (!result.has_value()) {
52+
EXPECT_NE(result.error().kind, ErrorKind::kNotImplemented);
53+
}
54+
}
55+
#else
56+
TEST(ArrowS3FileIOTest, RequiresS3SupportAtBuildTime) {
57+
auto result = MakeS3FileIO("s3://warehouse/iceberg_example");
58+
EXPECT_THAT(result, IsError(ErrorKind::kNotImplemented));
59+
}
60+
#endif
61+
62+
TEST(ArrowS3FileIOTest, ReadWriteFile) {
63+
const char* base_uri = std::getenv("ICEBERG_TEST_S3_URI");
64+
if (base_uri == nullptr || std::string(base_uri).empty()) {
65+
GTEST_SKIP() << "Set ICEBERG_TEST_S3_URI to enable S3 IO test";
66+
}
67+
68+
auto io_res = MakeS3FileIO(base_uri);
69+
if (!io_res.has_value()) {
70+
if (io_res.error().kind == ErrorKind::kNotImplemented) {
71+
GTEST_SKIP() << "Arrow S3 support is not enabled";
72+
}
73+
FAIL() << "MakeS3FileIO failed: " << io_res.error().message;
74+
}
75+
76+
auto io = std::move(io_res.value());
77+
std::string object_uri = base_uri;
78+
if (!object_uri.ends_with('/')) {
79+
object_uri += '/';
80+
}
81+
object_uri += "iceberg_s3_io_test.txt";
82+
auto write_res = io->WriteFile(object_uri, "hello s3");
83+
ASSERT_THAT(write_res, IsOk());
84+
85+
auto read_res = io->ReadFile(object_uri, std::nullopt);
86+
ASSERT_THAT(read_res, IsOk());
87+
EXPECT_THAT(read_res, HasValue(::testing::Eq("hello s3")));
88+
89+
auto del_res = io->DeleteFile(object_uri);
90+
EXPECT_THAT(del_res, IsOk());
91+
}
92+
93+
} // namespace iceberg::arrow
94+
95+
#if __has_include(<arrow/filesystem/s3fs.h>)
96+
int main(int argc, char** argv) {
97+
::testing::InitGoogleTest(&argc, argv);
98+
::testing::AddGlobalTestEnvironment(new iceberg::arrow::ArrowS3Environment());
99+
return RUN_ALL_TESTS();
100+
}
101+
#endif

0 commit comments

Comments
 (0)