Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
379 changes: 190 additions & 189 deletions bindings/cpp/include/fluss.hpp

Large diffs are not rendered by default.

33 changes: 32 additions & 1 deletion bindings/cpp/src/admin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include "fluss.hpp"
#include "lib.rs.h"
#include "rust/cxx.h"
#include "type_lowering.hpp"
#include <arrow/c/bridge.h>
#include <exception>

namespace fluss {
Expand Down Expand Up @@ -58,8 +60,37 @@ Result Admin::CreateTable(const TablePath& table_path, const TableDescriptor& de
}

auto ffi_path = utils::to_ffi_table_path(table_path);
auto ffi_desc = utils::to_ffi_table_descriptor(descriptor);

// A MAP/ROW column can't go through the flat FFI encoding, so the schema is
// sent over Arrow instead (explicit via FromArrow, or lowered from native
// columns). Rust derives the columns from it, so the flat columns are dropped
// here; primary keys / metadata still come from the descriptor.
std::shared_ptr<arrow::Schema> arrow_schema = descriptor.schema.arrow_schema;
if (!arrow_schema) {
for (const auto& col : descriptor.schema.columns) {
if (detail::is_compound(col.data_type)) {
arrow_schema = detail::columns_to_arrow_schema(descriptor.schema.columns);
break;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to understand why we break here, what if multiple columns are of row/map type?

@fresh-borzoni fresh-borzoni Jun 9, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's just a detection loop here, "is any datatype is complex/nested here in schema?", once the answer is -yes, we can short-circuit.

}
}
}

if (arrow_schema) {
TableDescriptor arrow_desc = descriptor;
arrow_desc.schema.columns.clear();
auto ffi_desc = utils::to_ffi_table_descriptor(arrow_desc);
size_t schema_ptr = 0;
try {
schema_ptr = detail::export_arrow_schema(*arrow_schema);
} catch (const std::exception& e) {
return utils::make_client_error(e.what());
}
auto ffi_result =
admin_->create_table_arrow(ffi_path, ffi_desc, schema_ptr, ignore_if_exists);
return utils::from_ffi_result(ffi_result);
}

auto ffi_desc = utils::to_ffi_table_descriptor(descriptor);
auto ffi_result = admin_->create_table(ffi_path, ffi_desc, ignore_if_exists);
return utils::from_ffi_result(ffi_result);
}
Expand Down
31 changes: 19 additions & 12 deletions bindings/cpp/src/ffi_converter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,18 +253,6 @@ inline ffi::FfiTableDescriptor to_ffi_table_descriptor(const TableDescriptor& de
inline Column from_ffi_column(const ffi::FfiColumn& ffi_col) {
auto type_id = static_cast<TypeId>(ffi_col.data_type);
if (type_id == TypeId::Array) {
if (ffi_col.element_data_type == 0) {
throw std::runtime_error("Malformed ARRAY column '" + std::string(ffi_col.name) +
"': missing element_data_type");
}
if (ffi_col.array_nesting < 0) {
throw std::runtime_error("Malformed ARRAY column '" + std::string(ffi_col.name) +
"': array_nesting must be non-negative");
}
if (ffi_col.element_data_type == static_cast<int32_t>(TypeId::Array)) {
throw std::runtime_error("Malformed ARRAY column '" + std::string(ffi_col.name) +
"': leaf element_data_type cannot be ARRAY");
}
auto is_supported_leaf_type = [](int32_t leaf_type) {
switch (static_cast<TypeId>(leaf_type)) {
case TypeId::Boolean:
Expand All @@ -288,6 +276,25 @@ inline Column from_ffi_column(const ffi::FfiColumn& ffi_col) {
return false;
}
};
// ROW/MAP element schema can't pass through the flat FFI column; give the
// array a non-null element of the right kind so element_type() is safe to deref.
auto element_id = static_cast<TypeId>(ffi_col.element_data_type);
if (element_id == TypeId::Map || element_id == TypeId::Row) {
return Column{std::string(ffi_col.name), DataType::Array(DataType(element_id)),
std::string(ffi_col.comment)};
}
if (ffi_col.element_data_type == 0) {
throw std::runtime_error("Malformed ARRAY column '" + std::string(ffi_col.name) +
"': missing element_data_type");
}
if (ffi_col.array_nesting < 0) {
throw std::runtime_error("Malformed ARRAY column '" + std::string(ffi_col.name) +
"': array_nesting must be non-negative");
}
if (ffi_col.element_data_type == static_cast<int32_t>(TypeId::Array)) {
throw std::runtime_error("Malformed ARRAY column '" + std::string(ffi_col.name) +
"': leaf element_data_type cannot be ARRAY");
}
if (!is_supported_leaf_type(ffi_col.element_data_type)) {
throw std::runtime_error("Malformed ARRAY column '" + std::string(ffi_col.name) +
"': unsupported leaf element_data_type " +
Expand Down
Loading
Loading