Skip to content

Commit 1bb9361

Browse files
committed
feat: add FileWriterFactory and writer stub implementations
Add complete API structure for Iceberg data file writers including: DataWriter: - Add DataWriterOptions struct with schema, partition spec, format, etc. - Add DataWriter class extending FileWriter - Stub implementation returning NotImplemented errors PositionDeleteWriter: - Add PositionDeleteWriterOptions struct with optional row schema - Add PositionDeleteWriter class with WriteDelete method - Stub implementation returning NotImplemented errors EqualityDeleteWriter: - Add EqualityDeleteWriterOptions struct with equality_field_ids - Add EqualityDeleteWriter class with equality_field_ids accessor - Stub implementation returning NotImplemented errors FileWriterFactory: - Add factory class for creating all three writer types - Implements NewDataWriter, NewPositionDeleteWriter, NewEqualityDeleteWriter - Supports configuration via SetEqualityDeleteConfig and SetPositionDeleteRowSchema - Factory methods construct options and delegate to concrete writer Make methods All implementations use pimpl idiom for ABI stability. Stub implementations will be replaced with full functionality in subsequent tasks. Related to #441 task 5
1 parent cf0fd37 commit 1bb9361

File tree

2 files changed

+413
-0
lines changed

2 files changed

+413
-0
lines changed

src/iceberg/data/writer.cc

Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,247 @@ namespace iceberg {
2323

2424
FileWriter::~FileWriter() = default;
2525

26+
//=============================================================================
27+
// DataWriter::Impl
28+
//=============================================================================
29+
30+
class DataWriter::Impl {
31+
public:
32+
explicit Impl(const DataWriterOptions& options) : options_(options) {}
33+
34+
DataWriterOptions options_;
35+
36+
// TODO: Add the following when implementing:
37+
// - FileAppender or equivalent writer for the format
38+
// - Metrics collection
39+
// - Split offsets tracking
40+
// - Encryption key metadata handling
41+
};
42+
43+
//=============================================================================
44+
// DataWriter
45+
//=============================================================================
46+
47+
Result<std::unique_ptr<DataWriter>> DataWriter::Make(const DataWriterOptions& options) {
48+
return NotImplemented("DataWriter implementation not yet available");
49+
}
50+
51+
DataWriter::DataWriter(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
52+
53+
DataWriter::~DataWriter() = default;
54+
55+
Status DataWriter::Write(ArrowArray* data) {
56+
return NotImplemented("DataWriter::Write not yet implemented");
57+
}
58+
59+
Result<int64_t> DataWriter::Length() const {
60+
return NotImplemented("DataWriter::Length not yet implemented");
61+
}
62+
63+
Status DataWriter::Close() {
64+
return NotImplemented("DataWriter::Close not yet implemented");
65+
}
66+
67+
Result<FileWriter::WriteResult> DataWriter::Metadata() {
68+
return NotImplemented("DataWriter::Metadata not yet implemented");
69+
}
70+
71+
//=============================================================================
72+
// PositionDeleteWriter::Impl
73+
//=============================================================================
74+
75+
class PositionDeleteWriter::Impl {
76+
public:
77+
explicit Impl(const PositionDeleteWriterOptions& options) : options_(options) {}
78+
79+
PositionDeleteWriterOptions options_;
80+
81+
// TODO: Add the following when implementing:
82+
// - FileAppender for position delete format (file path + position columns)
83+
// - CharSequenceSet or equivalent for tracking referenced data files
84+
// - Metrics collection with field stripping for multi-file deletes
85+
// - Split offsets tracking
86+
// - Encryption key metadata handling
87+
};
88+
89+
//=============================================================================
90+
// PositionDeleteWriter
91+
//=============================================================================
92+
93+
Result<std::unique_ptr<PositionDeleteWriter>> PositionDeleteWriter::Make(
94+
const PositionDeleteWriterOptions& options) {
95+
return NotImplemented("PositionDeleteWriter implementation not yet available");
96+
}
97+
98+
PositionDeleteWriter::PositionDeleteWriter(std::unique_ptr<Impl> impl)
99+
: impl_(std::move(impl)) {}
100+
101+
PositionDeleteWriter::~PositionDeleteWriter() = default;
102+
103+
Status PositionDeleteWriter::Write(ArrowArray* data) {
104+
return NotImplemented("PositionDeleteWriter::Write not yet implemented");
105+
}
106+
107+
Status PositionDeleteWriter::WriteDelete(std::string_view file_path, int64_t pos) {
108+
return NotImplemented("PositionDeleteWriter::WriteDelete not yet implemented");
109+
}
110+
111+
Result<int64_t> PositionDeleteWriter::Length() const {
112+
return NotImplemented("PositionDeleteWriter::Length not yet implemented");
113+
}
114+
115+
Status PositionDeleteWriter::Close() {
116+
return NotImplemented("PositionDeleteWriter::Close not yet implemented");
117+
}
118+
119+
Result<FileWriter::WriteResult> PositionDeleteWriter::Metadata() {
120+
return NotImplemented("PositionDeleteWriter::Metadata not yet implemented");
121+
}
122+
123+
//=============================================================================
124+
// EqualityDeleteWriter::Impl
125+
//=============================================================================
126+
127+
class EqualityDeleteWriter::Impl {
128+
public:
129+
explicit Impl(const EqualityDeleteWriterOptions& options) : options_(options) {}
130+
131+
EqualityDeleteWriterOptions options_;
132+
133+
// TODO: Add the following when implementing:
134+
// - FileAppender for equality delete format
135+
// - Metrics collection
136+
// - Split offsets tracking
137+
// - Encryption key metadata handling
138+
};
139+
140+
//=============================================================================
141+
// EqualityDeleteWriter
142+
//=============================================================================
143+
144+
Result<std::unique_ptr<EqualityDeleteWriter>> EqualityDeleteWriter::Make(
145+
const EqualityDeleteWriterOptions& options) {
146+
return NotImplemented("EqualityDeleteWriter implementation not yet available");
147+
}
148+
149+
EqualityDeleteWriter::EqualityDeleteWriter(std::unique_ptr<Impl> impl)
150+
: impl_(std::move(impl)) {}
151+
152+
EqualityDeleteWriter::~EqualityDeleteWriter() = default;
153+
154+
Status EqualityDeleteWriter::Write(ArrowArray* data) {
155+
return NotImplemented("EqualityDeleteWriter::Write not yet implemented");
156+
}
157+
158+
Result<int64_t> EqualityDeleteWriter::Length() const {
159+
return NotImplemented("EqualityDeleteWriter::Length not yet implemented");
160+
}
161+
162+
Status EqualityDeleteWriter::Close() {
163+
return NotImplemented("EqualityDeleteWriter::Close not yet implemented");
164+
}
165+
166+
Result<FileWriter::WriteResult> EqualityDeleteWriter::Metadata() {
167+
return NotImplemented("EqualityDeleteWriter::Metadata not yet implemented");
168+
}
169+
170+
const std::vector<int32_t>& EqualityDeleteWriter::equality_field_ids() const {
171+
return impl_->options_.equality_field_ids;
172+
}
173+
174+
//=============================================================================
175+
// FileWriterFactory::Impl
176+
//=============================================================================
177+
178+
class FileWriterFactory::Impl {
179+
public:
180+
Impl(std::shared_ptr<Schema> schema, std::shared_ptr<PartitionSpec> spec,
181+
std::shared_ptr<FileIO> io, std::shared_ptr<WriterProperties> properties)
182+
: schema_(std::move(schema)),
183+
spec_(std::move(spec)),
184+
io_(std::move(io)),
185+
properties_(std::move(properties)) {}
186+
187+
std::shared_ptr<Schema> schema_;
188+
std::shared_ptr<PartitionSpec> spec_;
189+
std::shared_ptr<FileIO> io_;
190+
std::shared_ptr<WriterProperties> properties_;
191+
192+
std::shared_ptr<Schema> eq_delete_schema_;
193+
std::vector<int32_t> equality_field_ids_;
194+
std::shared_ptr<Schema> pos_delete_row_schema_;
195+
};
196+
197+
//=============================================================================
198+
// FileWriterFactory
199+
//=============================================================================
200+
201+
FileWriterFactory::FileWriterFactory(std::shared_ptr<Schema> schema,
202+
std::shared_ptr<PartitionSpec> spec,
203+
std::shared_ptr<FileIO> io,
204+
std::shared_ptr<WriterProperties> properties)
205+
: impl_(std::make_unique<Impl>(std::move(schema), std::move(spec), std::move(io),
206+
std::move(properties))) {}
207+
208+
FileWriterFactory::~FileWriterFactory() = default;
209+
210+
void FileWriterFactory::SetEqualityDeleteConfig(std::shared_ptr<Schema> eq_delete_schema,
211+
std::vector<int32_t> equality_field_ids) {
212+
impl_->eq_delete_schema_ = std::move(eq_delete_schema);
213+
impl_->equality_field_ids_ = std::move(equality_field_ids);
214+
}
215+
216+
void FileWriterFactory::SetPositionDeleteRowSchema(
217+
std::shared_ptr<Schema> pos_delete_row_schema) {
218+
impl_->pos_delete_row_schema_ = std::move(pos_delete_row_schema);
219+
}
220+
221+
Result<std::unique_ptr<DataWriter>> FileWriterFactory::NewDataWriter(
222+
const std::string& path, FileFormatType format, const PartitionValues& partition,
223+
std::optional<int32_t> sort_order_id) {
224+
DataWriterOptions options;
225+
options.path = path;
226+
options.schema = impl_->schema_;
227+
options.spec = impl_->spec_;
228+
options.partition = partition;
229+
options.format = format;
230+
options.io = impl_->io_;
231+
options.sort_order_id = sort_order_id;
232+
options.properties = impl_->properties_;
233+
234+
return DataWriter::Make(options);
235+
}
236+
237+
Result<std::unique_ptr<PositionDeleteWriter>> FileWriterFactory::NewPositionDeleteWriter(
238+
const std::string& path, FileFormatType format, const PartitionValues& partition) {
239+
PositionDeleteWriterOptions options;
240+
options.path = path;
241+
options.schema = impl_->schema_;
242+
options.spec = impl_->spec_;
243+
options.partition = partition;
244+
options.format = format;
245+
options.io = impl_->io_;
246+
options.row_schema = impl_->pos_delete_row_schema_;
247+
options.properties = impl_->properties_;
248+
249+
return PositionDeleteWriter::Make(options);
250+
}
251+
252+
Result<std::unique_ptr<EqualityDeleteWriter>> FileWriterFactory::NewEqualityDeleteWriter(
253+
const std::string& path, FileFormatType format, const PartitionValues& partition,
254+
std::optional<int32_t> sort_order_id) {
255+
EqualityDeleteWriterOptions options;
256+
options.path = path;
257+
options.schema = impl_->eq_delete_schema_ ? impl_->eq_delete_schema_ : impl_->schema_;
258+
options.spec = impl_->spec_;
259+
options.partition = partition;
260+
options.format = format;
261+
options.io = impl_->io_;
262+
options.equality_field_ids = impl_->equality_field_ids_;
263+
options.sort_order_id = sort_order_id;
264+
options.properties = impl_->properties_;
265+
266+
return EqualityDeleteWriter::Make(options);
267+
}
268+
26269
} // namespace iceberg

0 commit comments

Comments
 (0)