diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index b641bb75e..2b21be9fb 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -244,6 +244,7 @@ if(ICEBERG_BUILD_BUNDLE) arrow/arrow_io.cc arrow/s3/arrow_s3_file_io.cc arrow/arrow_register.cc + arrow/literal_util.cc arrow/metadata_column_util.cc avro/avro_data_util.cc avro/avro_direct_decoder.cc diff --git a/src/iceberg/arrow/literal_util.cc b/src/iceberg/arrow/literal_util.cc new file mode 100644 index 000000000..4382fbb3e --- /dev/null +++ b/src/iceberg/arrow/literal_util.cc @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/arrow/literal_util_internal.h" +#include "iceberg/type.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/formatter.h" // IWYU pragma: keep +#include "iceberg/util/macros.h" + +namespace iceberg::arrow { + +namespace { + +Result> ToArrowType(const PrimitiveType& type) { + switch (type.type_id()) { + case TypeId::kBoolean: + return ::arrow::boolean(); + case TypeId::kInt: + return ::arrow::int32(); + case TypeId::kLong: + return ::arrow::int64(); + case TypeId::kFloat: + return ::arrow::float32(); + case TypeId::kDouble: + return ::arrow::float64(); + case TypeId::kDecimal: { + const DecimalType& decimal_type = internal::checked_cast(type); + return ::arrow::decimal128(decimal_type.precision(), decimal_type.scale()); + } + case TypeId::kDate: + return ::arrow::date32(); + case TypeId::kTime: + return ::arrow::time64(::arrow::TimeUnit::MICRO); + case TypeId::kTimestamp: + return ::arrow::timestamp(::arrow::TimeUnit::MICRO); + case TypeId::kTimestampTz: + return ::arrow::timestamp(::arrow::TimeUnit::MICRO, "UTC"); + case TypeId::kTimestampNs: + return ::arrow::timestamp(::arrow::TimeUnit::NANO); + case TypeId::kTimestampTzNs: + return ::arrow::timestamp(::arrow::TimeUnit::NANO, "UTC"); + case TypeId::kString: + return ::arrow::utf8(); + case TypeId::kBinary: + return ::arrow::binary(); + case TypeId::kFixed: { + const FixedType& fixed_type = internal::checked_cast(type); + return ::arrow::fixed_size_binary(static_cast(fixed_type.length())); + } + case TypeId::kUuid: + return ::arrow::fixed_size_binary(16); + default: + return NotSupported("Cannot convert {} to an Arrow type", type); + } +} + +Result> ToArrowBuffer( + const std::vector& bytes) { + ICEBERG_ARROW_ASSIGN_OR_RETURN(std::unique_ptr<::arrow::Buffer> buffer, + ::arrow::AllocateBuffer(bytes.size())); + std::memcpy(buffer->mutable_data(), bytes.data(), bytes.size()); + return std::shared_ptr<::arrow::Buffer>(std::move(buffer)); +} + +} // namespace + +Result> ToArrowScalar(const Literal& literal) { + if (literal.type() == nullptr) { + return InvalidArgument("Cannot convert a literal without type to an Arrow scalar"); + } + + if (literal.IsAboveMax() || literal.IsBelowMin()) { + return NotSupported("Cannot convert {} to an Arrow scalar", literal); + } + + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<::arrow::DataType> arrow_type, + ToArrowType(*literal.type())); + if (literal.IsNull()) { + return ::arrow::MakeNullScalar(std::move(arrow_type)); + } + + const Literal::Value& value = literal.value(); + switch (literal.type()->type_id()) { + case TypeId::kBoolean: + return std::make_shared<::arrow::BooleanScalar>(std::get(value)); + case TypeId::kInt: + return std::make_shared<::arrow::Int32Scalar>(std::get(value)); + case TypeId::kLong: + return std::make_shared<::arrow::Int64Scalar>(std::get(value)); + case TypeId::kFloat: + return std::make_shared<::arrow::FloatScalar>(std::get(value)); + case TypeId::kDouble: + return std::make_shared<::arrow::DoubleScalar>(std::get(value)); + case TypeId::kDecimal: { + const Decimal& decimal = std::get(value); + ::arrow::Decimal128 arrow_decimal( + static_cast(decimal.value() >> 64), + static_cast(decimal.value() & ~uint64_t{0})); + return std::make_shared<::arrow::Decimal128Scalar>(arrow_decimal, + std::move(arrow_type)); + } + case TypeId::kDate: + return std::make_shared<::arrow::Date32Scalar>(std::get(value)); + case TypeId::kTime: + return std::make_shared<::arrow::Time64Scalar>(std::get(value), + std::move(arrow_type)); + case TypeId::kTimestamp: + case TypeId::kTimestampTz: + case TypeId::kTimestampNs: + case TypeId::kTimestampTzNs: + return std::make_shared<::arrow::TimestampScalar>(std::get(value), + std::move(arrow_type)); + case TypeId::kString: + return std::make_shared<::arrow::StringScalar>(std::get(value)); + case TypeId::kBinary: { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<::arrow::Buffer> buffer, + ToArrowBuffer(std::get>(value))); + return std::make_shared<::arrow::BinaryScalar>(std::move(buffer)); + } + case TypeId::kFixed: { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<::arrow::Buffer> buffer, + ToArrowBuffer(std::get>(value))); + return std::make_shared<::arrow::FixedSizeBinaryScalar>(std::move(buffer), + std::move(arrow_type)); + } + case TypeId::kUuid: { + const Uuid& uuid = std::get(value); + ICEBERG_ASSIGN_OR_RAISE( + std::shared_ptr<::arrow::Buffer> buffer, + ToArrowBuffer(std::vector(uuid.bytes().begin(), uuid.bytes().end()))); + return std::make_shared<::arrow::FixedSizeBinaryScalar>(std::move(buffer), + std::move(arrow_type)); + } + default: + return NotSupported("Cannot convert {} literal to an Arrow scalar", + *literal.type()); + } +} + +Result> MakeDefaultArray( + const Literal& literal, const std::shared_ptr<::arrow::DataType>& type, + int64_t num_rows, ::arrow::MemoryPool* pool) { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<::arrow::Scalar> scalar, + ToArrowScalar(literal)); + ICEBERG_ARROW_ASSIGN_OR_RETURN(std::shared_ptr<::arrow::Array> array, + ::arrow::MakeArrayFromScalar(*scalar, num_rows, pool)); + if (!array->type()->Equals(*type)) { + ICEBERG_ARROW_ASSIGN_OR_RETURN(::arrow::Datum cast_result, + ::arrow::compute::Cast(array, type)); + return cast_result.make_array(); + } + return array; +} + +Status AppendDefaultToBuilder(const Literal& literal, ::arrow::ArrayBuilder* builder) { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<::arrow::Scalar> scalar, + ToArrowScalar(literal)); + if (!scalar->type->Equals(*builder->type())) { + ICEBERG_ARROW_ASSIGN_OR_RETURN(scalar, scalar->CastTo(builder->type())); + } + ICEBERG_ARROW_RETURN_NOT_OK(builder->AppendScalar(*scalar)); + return {}; +} + +} // namespace iceberg::arrow diff --git a/src/iceberg/arrow/literal_util_internal.h b/src/iceberg/arrow/literal_util_internal.h new file mode 100644 index 000000000..26813b80e --- /dev/null +++ b/src/iceberg/arrow/literal_util_internal.h @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include + +#include "iceberg/expression/literal.h" +#include "iceberg/result.h" + +namespace iceberg::arrow { + +/// \brief Convert a primitive literal to an Arrow scalar of its canonical Arrow type. +/// +/// A null literal converts to a null scalar of the corresponding Arrow type. +Result> ToArrowScalar(const Literal& literal); + +/// \brief Create an Arrow array of `num_rows` rows where every row holds the literal +/// value, e.g. to materialize a missing column with a default value. +/// +/// The array is cast to `type` when the literal's canonical Arrow type differs. +Result> MakeDefaultArray( + const Literal& literal, const std::shared_ptr<::arrow::DataType>& type, + int64_t num_rows, ::arrow::MemoryPool* pool); + +/// \brief Append the literal value once to `builder`, e.g. to materialize a missing +/// field with a default value while building rows. +Status AppendDefaultToBuilder(const Literal& literal, ::arrow::ArrayBuilder* builder); + +} // namespace iceberg::arrow diff --git a/src/iceberg/avro/avro_data_util.cc b/src/iceberg/avro/avro_data_util.cc index fb2f58bd1..64935306b 100644 --- a/src/iceberg/avro/avro_data_util.cc +++ b/src/iceberg/avro/avro_data_util.cc @@ -31,6 +31,7 @@ #include #include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/arrow/literal_util_internal.h" #include "iceberg/avro/avro_data_util_internal.h" #include "iceberg/avro/avro_schema_util_internal.h" #include "iceberg/metadata_columns.h" @@ -87,6 +88,9 @@ Status AppendStructToBuilder(const ::avro::NodePtr& avro_node, metadata_context, field_builder)); } else if (field_projection.kind == FieldProjection::Kind::kNull) { ICEBERG_ARROW_RETURN_NOT_OK(field_builder->AppendNull()); + } else if (field_projection.kind == FieldProjection::Kind::kDefault) { + ICEBERG_RETURN_UNEXPECTED(arrow::AppendDefaultToBuilder( + std::get(field_projection.from), field_builder)); } else if (field_projection.kind == FieldProjection::Kind::kMetadata) { int32_t field_id = expected_field.field_id(); if (field_id == MetadataColumns::kFilePathColumnId) { @@ -462,6 +466,11 @@ Status AppendFieldToBuilder(const ::avro::NodePtr& avro_node, return {}; } + if (projection.kind == FieldProjection::Kind::kDefault) { + return arrow::AppendDefaultToBuilder(std::get(projection.from), + array_builder); + } + if (avro_node->type() == ::avro::AVRO_UNION) { size_t branch = avro_datum.unionBranch(); if (avro_node->leafAt(branch)->type() == ::avro::AVRO_NULL) { diff --git a/src/iceberg/avro/avro_direct_decoder.cc b/src/iceberg/avro/avro_direct_decoder.cc index 19ce77bbd..7adb77468 100644 --- a/src/iceberg/avro/avro_direct_decoder.cc +++ b/src/iceberg/avro/avro_direct_decoder.cc @@ -29,6 +29,7 @@ #include #include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/arrow/literal_util_internal.h" #include "iceberg/avro/avro_direct_decoder_internal.h" #include "iceberg/avro/avro_schema_util_internal.h" #include "iceberg/metadata_columns.h" @@ -209,6 +210,9 @@ Status DecodeStructToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& auto* field_builder = struct_builder->field_builder(static_cast(proj_idx)); if (field_projection.kind == FieldProjection::Kind::kNull) { ICEBERG_ARROW_RETURN_NOT_OK(field_builder->AppendNull()); + } else if (field_projection.kind == FieldProjection::Kind::kDefault) { + ICEBERG_RETURN_UNEXPECTED(arrow::AppendDefaultToBuilder( + std::get(field_projection.from), field_builder)); } else if (field_projection.kind == FieldProjection::Kind::kMetadata) { int32_t field_id = expected_field.field_id(); if (field_id == MetadataColumns::kFilePathColumnId) { diff --git a/src/iceberg/avro/avro_schema_util.cc b/src/iceberg/avro/avro_schema_util.cc index 4ecd87ebc..31a2493d8 100644 --- a/src/iceberg/avro/avro_schema_util.cc +++ b/src/iceberg/avro/avro_schema_util.cc @@ -730,6 +730,10 @@ Result ProjectStruct(const StructType& struct_type, iter->second.local_index, prune_source)); } else if (MetadataColumns::IsMetadataColumn(field_id)) { child_projection.kind = FieldProjection::Kind::kMetadata; + } else if (expected_field.initial_default().has_value()) { + // Rows written before the field existed assume its `initial-default` value. + child_projection.kind = FieldProjection::Kind::kDefault; + child_projection.from = expected_field.initial_default()->get(); } else if (expected_field.optional()) { child_projection.kind = FieldProjection::Kind::kNull; } else { diff --git a/src/iceberg/data/delete_filter.cc b/src/iceberg/data/delete_filter.cc index 5f21a32de..9f742cf69 100644 --- a/src/iceberg/data/delete_filter.cc +++ b/src/iceberg/data/delete_filter.cc @@ -368,9 +368,8 @@ Result MergeField(SchemaField& existing, const SchemaField& required) { if (!changed) { return false; } - existing = SchemaField(existing.field_id(), std::string(existing.name()), - std::make_shared(std::move(fields)), - existing.optional(), std::string(existing.doc())); + existing = existing.WithIdAndType(existing.field_id(), + std::make_shared(std::move(fields))); return true; } diff --git a/src/iceberg/json_serde.cc b/src/iceberg/json_serde.cc index c72b7da57..b9a44ff98 100644 --- a/src/iceberg/json_serde.cc +++ b/src/iceberg/json_serde.cc @@ -27,6 +27,8 @@ #include #include "iceberg/constants.h" +#include "iceberg/expression/json_serde_internal.h" +#include "iceberg/expression/literal.h" #include "iceberg/json_serde_internal.h" #include "iceberg/name_mapping.h" #include "iceberg/partition_field.h" @@ -298,6 +300,15 @@ nlohmann::json ToJson(const SchemaField& field) { if (!field.doc().empty()) { json[kDoc] = field.doc(); } + // Defaults are validated to be primitive literals matching the field type, so + // single-value serialization cannot fail here. + if (field.initial_default().has_value()) { + ICEBERG_ASSIGN_OR_THROW(json[kInitialDefault], + ToJson(field.initial_default()->get())); + } + if (field.write_default().has_value()) { + ICEBERG_ASSIGN_OR_THROW(json[kWriteDefault], ToJson(field.write_default()->get())); + } return json; } @@ -310,7 +321,6 @@ nlohmann::json ToJson(const Type& type) { nlohmann::json fields_json = nlohmann::json::array(); for (const auto& field : struct_type.fields()) { fields_json.push_back(ToJson(field)); - // TODO(gangwu): add default values } json[kFields] = fields_json; return json; @@ -552,9 +562,23 @@ Result> FieldFromJson(const nlohmann::json& json) { ICEBERG_ASSIGN_OR_RAISE(auto name, GetJsonValue(json, kName)); ICEBERG_ASSIGN_OR_RAISE(auto required, GetJsonValue(json, kRequired)); ICEBERG_ASSIGN_OR_RAISE(auto doc, GetJsonValueOrDefault(json, kDoc)); - - return std::make_unique(field_id, std::move(name), std::move(type), - !required, doc); + ICEBERG_ASSIGN_OR_RAISE(std::optional initial_default_json, + GetJsonValueOptional(json, kInitialDefault)); + ICEBERG_ASSIGN_OR_RAISE(std::optional write_default_json, + GetJsonValueOptional(json, kWriteDefault)); + + SchemaField field(field_id, std::move(name), std::move(type), !required, doc); + if (initial_default_json.has_value()) { + ICEBERG_ASSIGN_OR_RAISE(Literal literal, + LiteralFromJson(*initial_default_json, field.type().get())); + field = field.WithInitialDefault(std::move(literal)); + } + if (write_default_json.has_value()) { + ICEBERG_ASSIGN_OR_RAISE(Literal literal, + LiteralFromJson(*write_default_json, field.type().get())); + field = field.WithWriteDefault(std::move(literal)); + } + return std::make_unique(std::move(field)); } Result> SchemaFromJson(const nlohmann::json& json) { diff --git a/src/iceberg/parquet/parquet_data_util.cc b/src/iceberg/parquet/parquet_data_util.cc index 0c7c6c2ca..b6473edcc 100644 --- a/src/iceberg/parquet/parquet_data_util.cc +++ b/src/iceberg/parquet/parquet_data_util.cc @@ -24,6 +24,7 @@ #include #include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/arrow/literal_util_internal.h" #include "iceberg/metadata_columns.h" #include "iceberg/parquet/parquet_data_util_internal.h" #include "iceberg/schema.h" @@ -119,6 +120,11 @@ Result> ProjectStructArray( ICEBERG_ASSIGN_OR_RAISE( projected_array, MakeNullArray(output_arrow_type, struct_array->length(), pool)); + } else if (field_projection.kind == FieldProjection::Kind::kDefault) { + ICEBERG_ASSIGN_OR_RAISE( + projected_array, + arrow::MakeDefaultArray(std::get(field_projection.from), + output_arrow_type, struct_array->length(), pool)); } else if (field_projection.kind == FieldProjection::Kind::kMetadata) { int32_t field_id = projected_field.field_id(); if (field_id == MetadataColumns::kFilePathColumnId) { diff --git a/src/iceberg/parquet/parquet_schema_util.cc b/src/iceberg/parquet/parquet_schema_util.cc index 39e321d9f..cba8a15bf 100644 --- a/src/iceberg/parquet/parquet_schema_util.cc +++ b/src/iceberg/parquet/parquet_schema_util.cc @@ -336,6 +336,10 @@ Result ProjectStruct( child_projection, ProjectField(field, parquet_field, iter->second.local_index)); } else if (MetadataColumns::IsMetadataColumn(field_id)) { child_projection.kind = FieldProjection::Kind::kMetadata; + } else if (field.initial_default().has_value()) { + // Rows written before the field existed assume its `initial-default` value. + child_projection.kind = FieldProjection::Kind::kDefault; + child_projection.from = field.initial_default()->get(); } else if (field.optional()) { child_projection.kind = FieldProjection::Kind::kNull; } else { diff --git a/src/iceberg/schema.cc b/src/iceberg/schema.cc index fcac43c78..502e80b3f 100644 --- a/src/iceberg/schema.cc +++ b/src/iceberg/schema.cc @@ -116,9 +116,8 @@ std::shared_ptr ReassignTypeIds(const std::shared_ptr& type, SchemaField ReassignField(const SchemaField& field, int32_t new_id, const Schema::GetId& get_id, Schema::IdMap& ids_to_reassigned, Schema::IdMap& ids_to_original) { - return {new_id, std::string(field.name()), - ReassignTypeIds(field.type(), get_id, ids_to_reassigned, ids_to_original), - field.optional(), std::string(field.doc())}; + return field.WithIdAndType( + new_id, ReassignTypeIds(field.type(), get_id, ids_to_reassigned, ids_to_original)); } std::vector ReassignIds(std::vector fields, @@ -447,7 +446,15 @@ Status Schema::Validate(int32_t format_version) const { } } - // TODO(GuoTao.yu): Check default values when they are supported + // Column default values (both initial-default and write-default) require v3+. + if (field.initial_default().has_value() || field.write_default().has_value()) { + if (format_version < TableMetadata::kMinFormatVersionDefaultValues) { + return InvalidSchema( + "Invalid default value for {}: default values are not supported until v{}", + field.name(), TableMetadata::kMinFormatVersionDefaultValues); + } + ICEBERG_RETURN_UNEXPECTED(field.Validate()); + } } return {}; diff --git a/src/iceberg/schema_field.cc b/src/iceberg/schema_field.cc index 206915ec2..bcc1d36f9 100644 --- a/src/iceberg/schema_field.cc +++ b/src/iceberg/schema_field.cc @@ -21,9 +21,12 @@ #include #include +#include +#include "iceberg/expression/literal.h" #include "iceberg/type.h" #include "iceberg/util/formatter.h" // IWYU pragma: keep +#include "iceberg/util/macros.h" namespace iceberg { @@ -55,6 +58,80 @@ bool SchemaField::optional() const { return optional_; } std::string_view SchemaField::doc() const { return doc_; } +std::optional> SchemaField::initial_default() + const { + if (initial_default_ == nullptr) { + return std::nullopt; + } + return std::cref(*initial_default_); +} + +std::optional> SchemaField::write_default() const { + if (write_default_ == nullptr) { + return std::nullopt; + } + return std::cref(*write_default_); +} + +SchemaField SchemaField::WithIdAndType(int32_t field_id, + std::shared_ptr type) const { + SchemaField copy = *this; + copy.field_id_ = field_id; + copy.type_ = std::move(type); + return copy; +} + +SchemaField SchemaField::WithInitialDefault(Literal initial_default) const { + SchemaField copy = *this; + copy.initial_default_ = std::make_shared(std::move(initial_default)); + return copy; +} + +SchemaField SchemaField::WithInitialDefault( + std::optional> initial_default) const { + SchemaField copy = *this; + copy.initial_default_ = initial_default.has_value() + ? std::make_shared(initial_default->get()) + : nullptr; + return copy; +} + +SchemaField SchemaField::WithWriteDefault(Literal write_default) const { + SchemaField copy = *this; + copy.write_default_ = std::make_shared(std::move(write_default)); + return copy; +} + +SchemaField SchemaField::WithWriteDefault( + std::optional> write_default) const { + SchemaField copy = *this; + copy.write_default_ = write_default.has_value() + ? std::make_shared(write_default->get()) + : nullptr; + return copy; +} + +namespace { + +Status ValidateDefault(const SchemaField& field, const Literal& value, + std::string_view kind) { + if (value.IsNull() || value.IsAboveMax() || value.IsBelowMin()) { + return InvalidSchema("Invalid {} value for {}: must be a non-null value", kind, + field.name()); + } + if (field.type() == nullptr || !field.type()->is_primitive()) { + return InvalidSchema("Invalid {} value for {}: {} (must be null)", kind, field.name(), + value); + } + if (*value.type() != *field.type()) { + return InvalidSchema("{} of field {} has type {} but expected {}", kind, field.name(), + *value.type(), *field.type()); + } + return {}; +} + +} // namespace + Status SchemaField::Validate() const { if (name_.empty()) [[unlikely]] { return InvalidSchema("SchemaField cannot have empty name"); @@ -62,6 +139,13 @@ Status SchemaField::Validate() const { if (type_ == nullptr) [[unlikely]] { return InvalidSchema("SchemaField cannot have null type"); } + if (initial_default_ != nullptr) { + ICEBERG_RETURN_UNEXPECTED( + ValidateDefault(*this, *initial_default_, "initial-default")); + } + if (write_default_ != nullptr) { + ICEBERG_RETURN_UNEXPECTED(ValidateDefault(*this, *write_default_, "write-default")); + } return {}; } @@ -72,9 +156,23 @@ std::string SchemaField::ToString() const { return result; } +namespace { + +bool DefaultEquals(const std::shared_ptr& lhs, + const std::shared_ptr& rhs) { + if (lhs == nullptr || rhs == nullptr) { + return lhs == rhs; + } + return *lhs == *rhs; +} + +} // namespace + bool SchemaField::Equals(const SchemaField& other) const { return field_id_ == other.field_id_ && name_ == other.name_ && *type_ == *other.type_ && - optional_ == other.optional_; + optional_ == other.optional_ && + DefaultEquals(initial_default_, other.initial_default_) && + DefaultEquals(write_default_, other.write_default_); } } // namespace iceberg diff --git a/src/iceberg/schema_field.h b/src/iceberg/schema_field.h index fd20226a5..f1371ca57 100644 --- a/src/iceberg/schema_field.h +++ b/src/iceberg/schema_field.h @@ -24,7 +24,9 @@ /// type (e.g. a struct). #include +#include #include +#include #include #include @@ -71,6 +73,22 @@ class ICEBERG_EXPORT SchemaField : public iceberg::util::Formattable { /// \brief Get the field documentation. std::string_view doc() const; + /// \brief Get the default value for this field used when reading rows written + /// before the field existed (v3 `initial-default`). Empty if absent. + /// + /// The returned reference is a non-owning view into a value owned by this field; + /// it remains valid for the lifetime of this SchemaField. + [[nodiscard]] std::optional> initial_default() + const; + + /// \brief Get the default value for this field used when a writer does not + /// supply a value (v3 `write-default`). Empty if absent. + /// + /// The returned reference is a non-owning view into a value owned by this field; + /// it remains valid for the lifetime of this SchemaField. + [[nodiscard]] std::optional> write_default() + const; + [[nodiscard]] std::string ToString() const override; Status Validate() const; @@ -91,6 +109,38 @@ class ICEBERG_EXPORT SchemaField : public iceberg::util::Formattable { return copy; } + /// \brief Return a copy of this field with a different field id and type, preserving + /// every other attribute (name, nullability, doc, and default values). + /// + /// Used when fields are rebuilt during id reassignment or type rewriting; keeping + /// the copy semantics in one place ensures no attribute is silently dropped. + [[nodiscard]] SchemaField WithIdAndType(int32_t field_id, + std::shared_ptr type) const; + + /// \brief Return a copy of this field with the given `initial-default` value. + /// + /// The returned field takes ownership of the value. + [[nodiscard]] SchemaField WithInitialDefault(Literal initial_default) const; + + /// \brief Return a copy of this field with the `initial-default` value copied from + /// `initial_default`, or without one if it is empty. + /// + /// The referenced value is copied; no reference to it is retained. + [[nodiscard]] SchemaField WithInitialDefault( + std::optional> initial_default) const; + + /// \brief Return a copy of this field with the given `write-default` value. + /// + /// The returned field takes ownership of the value. + [[nodiscard]] SchemaField WithWriteDefault(Literal write_default) const; + + /// \brief Return a copy of this field with the `write-default` value copied from + /// `write_default`, or without one if it is empty. + /// + /// The referenced value is copied; no reference to it is retained. + [[nodiscard]] SchemaField WithWriteDefault( + std::optional> write_default) const; + private: /// \brief Compare two fields for equality. [[nodiscard]] bool Equals(const SchemaField& other) const; @@ -100,6 +150,11 @@ class ICEBERG_EXPORT SchemaField : public iceberg::util::Formattable { std::shared_ptr type_; bool optional_; std::string doc_; + // Default values are owned by this field and never mutated after being set; copies + // of the field share the same payload (reference-counted) instead of deep-copying, + // like `type_` above. Sharing is unobservable because the payload is immutable. + std::shared_ptr initial_default_; + std::shared_ptr write_default_; }; } // namespace iceberg diff --git a/src/iceberg/schema_util.cc b/src/iceberg/schema_util.cc index 4ff678fc6..5ff828258 100644 --- a/src/iceberg/schema_util.cc +++ b/src/iceberg/schema_util.cc @@ -172,10 +172,14 @@ Result ProjectNested(const Type& expected_type, const Type& sou iter->second.local_index, prune_source)); } else if (MetadataColumns::IsMetadataColumn(field_id)) { child_projection.kind = FieldProjection::Kind::kMetadata; + } else if (expected_field.initial_default().has_value()) { + // Rows written before the field existed assume its `initial-default` value. + child_projection.kind = FieldProjection::Kind::kDefault; + child_projection.from = expected_field.initial_default()->get(); } else if (expected_field.optional()) { child_projection.kind = FieldProjection::Kind::kNull; } else { - // TODO(gangwu): support default value for v3 and constant value + // TODO(gangwu): support constant value return InvalidSchema("Missing required field: {}", expected_field.ToString()); } result.children.emplace_back(std::move(child_projection)); diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 2d56d7f35..5d147e36f 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -241,7 +241,8 @@ if(ICEBERG_BUILD_BUNDLE) data_writer_test.cc delete_filter_test.cc delete_loader_test.cc - file_scan_task_reader_test.cc) + file_scan_task_reader_test.cc + literal_util_test.cc) endif() diff --git a/src/iceberg/test/avro_data_test.cc b/src/iceberg/test/avro_data_test.cc index 7731f58d3..fc2d9cfaf 100644 --- a/src/iceberg/test/avro_data_test.cc +++ b/src/iceberg/test/avro_data_test.cc @@ -31,6 +31,7 @@ #include "iceberg/avro/avro_data_util_internal.h" #include "iceberg/avro/avro_schema_util_internal.h" +#include "iceberg/expression/literal.h" #include "iceberg/schema.h" #include "iceberg/schema_internal.h" #include "iceberg/schema_util.h" @@ -662,6 +663,43 @@ TEST(AppendDatumToBuilderTest, StructWithMissingOptionalField) { avro_data, expected_json)); } +TEST(AppendDatumToBuilderTest, StructWithMissingDefaultFields) { + Schema iceberg_schema({ + SchemaField::MakeRequired(1, "id", iceberg::int32()), + // Missing required field with an initial-default: filled with the default. + SchemaField::MakeRequired(2, "score", iceberg::int64()) + .WithInitialDefault(Literal::Long(100)), + // Missing optional field with an initial-default: also filled, not null. + SchemaField::MakeOptional(3, "grade", iceberg::string()) + .WithInitialDefault(Literal::String("A")), + }); + + // Create Avro schema that only has the id field (missing score and grade) + std::string avro_schema_json = R"({ + "type": "record", + "name": "person", + "fields": [ + {"name": "id", "type": "int", "field-id": 1} + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + std::vector<::avro::GenericDatum> avro_data; + for (int i = 0; i < 2; ++i) { + ::avro::GenericDatum avro_datum(avro_schema.root()); + auto& record = avro_datum.value<::avro::GenericRecord>(); + record.fieldAt(0).value() = i + 1; + avro_data.push_back(avro_datum); + } + + const std::string expected_json = R"([ + {"id": 1, "score": 100, "grade": "A"}, + {"id": 2, "score": 100, "grade": "A"} + ])"; + ASSERT_NO_FATAL_FAILURE(VerifyAppendDatumToBuilder(iceberg_schema, avro_schema.root(), + avro_data, expected_json)); +} + TEST(AppendDatumToBuilderTest, NestedStructWithMissingOptionalFields) { Schema iceberg_schema({ SchemaField::MakeRequired(1, "id", iceberg::int32()), diff --git a/src/iceberg/test/avro_test.cc b/src/iceberg/test/avro_test.cc index ef86ef9e2..a45090116 100644 --- a/src/iceberg/test/avro_test.cc +++ b/src/iceberg/test/avro_test.cc @@ -37,6 +37,7 @@ #include "iceberg/avro/avro_register.h" #include "iceberg/avro/avro_stream_internal.h" #include "iceberg/avro/avro_writer.h" +#include "iceberg/expression/literal.h" #include "iceberg/file_reader.h" #include "iceberg/metadata_columns.h" #include "iceberg/schema.h" @@ -224,6 +225,34 @@ TEST_F(AvroReaderTest, ReadTwoFields) { ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); } +TEST_F(AvroReaderTest, ReadMissingFieldsWithDefaults) { + // The file contains only fields 1 and 2; the projected schema adds fields 3 and 4 + // with initial-defaults, which are filled for all rows written before the columns + // existed. + CreateSimpleAvroFile(); + + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", std::make_shared()), + SchemaField::MakeOptional(2, "name", std::make_shared()), + SchemaField::MakeRequired(3, "score", std::make_shared()) + .WithInitialDefault(Literal::Long(100)), + SchemaField::MakeOptional(4, "status", std::make_shared()) + .WithInitialDefault(Literal::String("active")), + }); + + auto reader_result = ReaderFactoryRegistry::Open( + FileFormatType::kAvro, + {.path = temp_avro_file_, .io = file_io_, .projection = schema}); + ASSERT_THAT(reader_result, IsOk()); + auto reader = std::move(reader_result.value()); + + ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, + R"([[1, "Alice", 100, "active"], + [2, "Bob", 100, "active"], + [3, "Charlie", 100, "active"]])")); + ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); +} + TEST_F(AvroReaderTest, RoundTripWithGenericFileIO) { file_io_ = std::make_shared(); temp_avro_file_ = CreateNewTempFilePathWithSuffix(".avro"); diff --git a/src/iceberg/test/literal_util_test.cc b/src/iceberg/test/literal_util_test.cc new file mode 100644 index 000000000..894f3b53e --- /dev/null +++ b/src/iceberg/test/literal_util_test.cc @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/arrow/literal_util_internal.h" +#include "iceberg/expression/literal.h" +#include "iceberg/test/matchers.h" +#include "iceberg/type.h" +#include "iceberg/util/uuid.h" + +namespace iceberg::arrow { + +namespace { + +struct ToArrowScalarParam { + std::string test_name; + Literal literal; + std::shared_ptr<::arrow::Scalar> expected; +}; + +class ToArrowScalarTest : public ::testing::TestWithParam {}; + +TEST_P(ToArrowScalarTest, ConvertsTypeAndValue) { + const auto& param = GetParam(); + + ICEBERG_UNWRAP_OR_FAIL(auto scalar, ToArrowScalar(param.literal)); + ASSERT_TRUE(scalar->type->Equals(*param.expected->type)) + << "actual type: " << scalar->type->ToString(); + ASSERT_TRUE(scalar->Equals(*param.expected)) << "actual value: " << scalar->ToString(); +} + +std::shared_ptr<::arrow::Buffer> BufferOf(const std::vector& bytes) { + return ::arrow::Buffer::FromString(std::string(bytes.begin(), bytes.end())); +} + +const std::vector kUuidBytes = {0xF7, 0x9C, 0x3E, 0x09, 0x67, 0x7C, 0x4B, 0xBD, + 0xA4, 0x79, 0x3F, 0x34, 0x9C, 0xB7, 0x85, 0xE7}; + +INSTANTIATE_TEST_SUITE_P( + AllPrimitiveTypes, ToArrowScalarTest, + ::testing::Values( + ToArrowScalarParam{"Boolean", Literal::Boolean(true), + std::make_shared<::arrow::BooleanScalar>(true)}, + ToArrowScalarParam{"Int", Literal::Int(42), + std::make_shared<::arrow::Int32Scalar>(42)}, + ToArrowScalarParam{"Long", Literal::Long(42), + std::make_shared<::arrow::Int64Scalar>(42)}, + ToArrowScalarParam{"Float", Literal::Float(1.5F), + std::make_shared<::arrow::FloatScalar>(1.5F)}, + ToArrowScalarParam{"Double", Literal::Double(2.5), + std::make_shared<::arrow::DoubleScalar>(2.5)}, + ToArrowScalarParam{"String", Literal::String("iceberg"), + std::make_shared<::arrow::StringScalar>("iceberg")}, + ToArrowScalarParam{ + "Binary", Literal::Binary({0x01, 0x02}), + std::make_shared<::arrow::BinaryScalar>(BufferOf({0x01, 0x02}))}, + ToArrowScalarParam{"Fixed", Literal::Fixed({0xAB, 0xCD}), + std::make_shared<::arrow::FixedSizeBinaryScalar>( + BufferOf({0xAB, 0xCD}), ::arrow::fixed_size_binary(2))}, + ToArrowScalarParam{"Uuid", Literal::UUID(Uuid::FromBytes(kUuidBytes).value()), + std::make_shared<::arrow::FixedSizeBinaryScalar>( + BufferOf(kUuidBytes), ::arrow::fixed_size_binary(16))}, + ToArrowScalarParam{ + "Decimal", Literal::Decimal(12345, /*precision=*/9, /*scale=*/2), + std::make_shared<::arrow::Decimal128Scalar>(::arrow::Decimal128(12345), + ::arrow::decimal128(9, 2))}, + ToArrowScalarParam{ + "NegativeDecimal", Literal::Decimal(-12345, /*precision=*/9, /*scale=*/2), + std::make_shared<::arrow::Decimal128Scalar>(::arrow::Decimal128(-12345), + ::arrow::decimal128(9, 2))}, + ToArrowScalarParam{"Date", Literal::Date(19000), + std::make_shared<::arrow::Date32Scalar>(19000)}, + ToArrowScalarParam{"Time", Literal::Time(3600000000), + std::make_shared<::arrow::Time64Scalar>( + 3600000000, ::arrow::time64(::arrow::TimeUnit::MICRO))}, + ToArrowScalarParam{ + "Timestamp", Literal::Timestamp(1672531200000000), + std::make_shared<::arrow::TimestampScalar>( + 1672531200000000, ::arrow::timestamp(::arrow::TimeUnit::MICRO))}, + ToArrowScalarParam{"TimestampTz", Literal::TimestampTz(1672531200000000), + std::make_shared<::arrow::TimestampScalar>( + 1672531200000000, + ::arrow::timestamp(::arrow::TimeUnit::MICRO, "UTC"))}, + ToArrowScalarParam{"TimestampNs", Literal::TimestampNs(1672531200000000000), + std::make_shared<::arrow::TimestampScalar>( + 1672531200000000000, + ::arrow::timestamp(::arrow::TimeUnit::NANO))}, + ToArrowScalarParam{"TimestampTzNs", Literal::TimestampTzNs(1672531200000000000), + std::make_shared<::arrow::TimestampScalar>( + 1672531200000000000, + ::arrow::timestamp(::arrow::TimeUnit::NANO, "UTC"))}), + [](const ::testing::TestParamInfo& info) { + return info.param.test_name; + }); + +TEST(LiteralUtilTest, NullLiteralBecomesNullScalar) { + ICEBERG_UNWRAP_OR_FAIL(auto scalar, ToArrowScalar(Literal::Null(iceberg::int32()))); + ASSERT_TRUE(scalar->type->Equals(*::arrow::int32())); + ASSERT_FALSE(scalar->is_valid); +} + +TEST(LiteralUtilTest, SentinelLiteralsAreRejected) { + // Casting to a narrower type may produce AboveMax/BelowMin sentinels; they have + // no value to materialize. + ICEBERG_UNWRAP_OR_FAIL( + auto above_max, + Literal::Long(std::numeric_limits::max()).CastTo(iceberg::int32())); + ASSERT_TRUE(above_max.IsAboveMax()); + ASSERT_THAT(ToArrowScalar(above_max), IsError(ErrorKind::kNotSupported)); +} + +TEST(LiteralUtilTest, MakeDefaultArrayFillsAllRows) { + ICEBERG_UNWRAP_OR_FAIL( + auto array, MakeDefaultArray(Literal::Int(7), ::arrow::int32(), /*num_rows=*/3, + ::arrow::default_memory_pool())); + ASSERT_EQ(array->length(), 3); + ASSERT_EQ(array->null_count(), 0); + const auto& int_array = static_cast(*array); + for (int64_t i = 0; i < 3; i++) { + ASSERT_EQ(int_array.Value(i), 7); + } +} + +TEST(LiteralUtilTest, MakeDefaultArrayCastsToTargetType) { + // The target Arrow type prevails when it differs from the literal's natural type. + ICEBERG_UNWRAP_OR_FAIL( + auto array, MakeDefaultArray(Literal::Int(7), ::arrow::int64(), /*num_rows=*/2, + ::arrow::default_memory_pool())); + ASSERT_TRUE(array->type()->Equals(*::arrow::int64())); + const auto& long_array = static_cast(*array); + ASSERT_EQ(long_array.Value(0), 7); +} + +TEST(LiteralUtilTest, AppendDefaultToBuilderAppendsOneValue) { + ::arrow::Int64Builder builder; + ASSERT_THAT(AppendDefaultToBuilder(Literal::Long(42), &builder), IsOk()); + ASSERT_THAT(AppendDefaultToBuilder(Literal::Long(42), &builder), IsOk()); + + std::shared_ptr<::arrow::Array> array; + ASSERT_TRUE(builder.Finish(&array).ok()); + ASSERT_EQ(array->length(), 2); + const auto& long_array = static_cast(*array); + ASSERT_EQ(long_array.Value(0), 42); + ASSERT_EQ(long_array.Value(1), 42); +} + +} // namespace + +} // namespace iceberg::arrow diff --git a/src/iceberg/test/parquet_data_test.cc b/src/iceberg/test/parquet_data_test.cc index 606ad8ca5..7d1c4748e 100644 --- a/src/iceberg/test/parquet_data_test.cc +++ b/src/iceberg/test/parquet_data_test.cc @@ -28,6 +28,7 @@ #include #include +#include "iceberg/expression/literal.h" #include "iceberg/parquet/parquet_data_util_internal.h" #include "iceberg/schema.h" #include "iceberg/schema_internal.h" @@ -409,6 +410,67 @@ TEST(ProjectRecordBatchTest, StructWithMissingOptionalField) { input_json, expected_json)); } +TEST(ProjectRecordBatchTest, StructWithMissingDefaultFields) { + Schema projected_schema({ + SchemaField::MakeRequired(1, "id", int32()), + // Missing required field with an initial-default: filled with the default. + SchemaField::MakeRequired(2, "score", int64()) + .WithInitialDefault(Literal::Long(100)), + // Missing optional field with an initial-default: also filled, not null. + SchemaField::MakeOptional(3, "grade", string()) + .WithInitialDefault(Literal::String("A")), + }); + + Schema source_schema({ + SchemaField::MakeRequired(1, "id", int32()), + }); + + const std::string input_json = R"([ + {"id": 1}, + {"id": 2} + ])"; + const std::string expected_json = R"([ + {"id": 1, "score": 100, "grade": "A"}, + {"id": 2, "score": 100, "grade": "A"} + ])"; + + ASSERT_NO_FATAL_FAILURE(VerifyProjectRecordBatch(projected_schema, source_schema, + input_json, expected_json)); +} + +TEST(ProjectRecordBatchTest, NestedStructWithMissingDefaultField) { + Schema projected_schema({ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired( + 2, "person", + std::make_shared(std::vector{ + SchemaField::MakeRequired(3, "name", string()), + SchemaField::MakeRequired(4, "age", int32()) // Missing in source + .WithInitialDefault(Literal::Int(18)), + })), + }); + + Schema source_schema({ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired(2, "person", + std::make_shared(std::vector{ + SchemaField::MakeRequired(3, "name", string()), + })), + }); + + const std::string input_json = R"([ + {"id": 100, "person": {"name": "Employee0"}}, + {"id": 101, "person": {"name": "Employee1"}} + ])"; + const std::string expected_json = R"([ + {"id": 100, "person": {"name": "Employee0", "age": 18}}, + {"id": 101, "person": {"name": "Employee1", "age": 18}} + ])"; + + ASSERT_NO_FATAL_FAILURE(VerifyProjectRecordBatch(projected_schema, source_schema, + input_json, expected_json)); +} + TEST(ProjectRecordBatchTest, NestedStructWithMissingOptionalFields) { Schema projected_schema({ SchemaField::MakeRequired(1, "id", int32()), diff --git a/src/iceberg/test/parquet_test.cc b/src/iceberg/test/parquet_test.cc index ee1cbc931..fc6953d64 100644 --- a/src/iceberg/test/parquet_test.cc +++ b/src/iceberg/test/parquet_test.cc @@ -38,6 +38,7 @@ #include "iceberg/arrow/arrow_io_internal.h" #include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/expression/literal.h" #include "iceberg/file_reader.h" #include "iceberg/file_writer.h" #include "iceberg/metadata_columns.h" @@ -262,6 +263,35 @@ TEST_F(ParquetReaderTest, ReadTwoFields) { ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); } +TEST_F(ParquetReaderTest, ReadMissingFieldsWithDefaults) { + // The file contains only fields 1 and 2; the projected schema adds fields 3 and 4 + // with initial-defaults, which are filled for all rows written before the columns + // existed. + CreateSimpleParquetFile(); + + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeOptional(2, "name", string()), + SchemaField::MakeRequired(3, "score", int64()) + .WithInitialDefault(Literal::Long(100)), + SchemaField::MakeOptional(4, "status", string()) + .WithInitialDefault(Literal::String("active")), + }); + + auto reader_result = ReaderFactoryRegistry::Open( + FileFormatType::kParquet, + {.path = temp_parquet_file_, .io = file_io_, .projection = schema}); + ASSERT_THAT(reader_result, IsOk()) + << "Failed to create reader: " << reader_result.error().message; + auto reader = std::move(reader_result.value()); + + ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, + R"([[1, "Foo", 100, "active"], + [2, "Bar", 100, "active"], + [3, "Baz", 100, "active"]])")); + ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); +} + TEST_F(ParquetReaderTest, RoundTripWithGenericFileIO) { auto file_io = std::make_shared(); auto path = CreateNewTempFilePathWithSuffix(".parquet"); diff --git a/src/iceberg/test/resources/TableMetadataV3Valid.json b/src/iceberg/test/resources/TableMetadataV3Valid.json new file mode 100644 index 000000000..712d4b5bd --- /dev/null +++ b/src/iceberg/test/resources/TableMetadataV3Valid.json @@ -0,0 +1,123 @@ +{ + "format-version": 3, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", + "location": "s3://bucket/test/location", + "last-sequence-number": 34, + "next-row-id": 0, + "last-updated-ms": 1602638573590, + "last-column-id": 3, + "current-schema-id": 1, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "fields": [ + { + "id": 1, + "name": "x", + "required": true, + "type": "long" + } + ] + }, + { + "type": "struct", + "schema-id": 1, + "identifier-field-ids": [ + 1, + 2 + ], + "fields": [ + { + "id": 1, + "name": "x", + "required": true, + "type": "long" + }, + { + "id": 2, + "name": "y", + "required": true, + "type": "long", + "doc": "comment" + }, + { + "id": 3, + "name": "z", + "required": true, + "type": "long" + } + ] + } + ], + "default-spec-id": 0, + "partition-specs": [ + { + "spec-id": 0, + "fields": [ + { + "name": "x", + "transform": "identity", + "source-id": 1, + "field-id": 1000 + } + ] + } + ], + "last-partition-id": 1000, + "default-sort-order-id": 3, + "sort-orders": [ + { + "order-id": 3, + "fields": [ + { + "transform": "identity", + "source-id": 2, + "direction": "asc", + "null-order": "nulls-first" + }, + { + "transform": "bucket[4]", + "source-id": 3, + "direction": "desc", + "null-order": "nulls-last" + } + ] + } + ], + "properties": {}, + "current-snapshot-id": 3055729675574597004, + "snapshots": [ + { + "snapshot-id": 3051729675574597004, + "timestamp-ms": 1515100955770, + "sequence-number": 0, + "summary": { + "operation": "append" + }, + "manifest-list": "s3://a/b/1.avro" + }, + { + "snapshot-id": 3055729675574597004, + "parent-snapshot-id": 3051729675574597004, + "timestamp-ms": 1555100955770, + "sequence-number": 1, + "summary": { + "operation": "append" + }, + "manifest-list": "s3://a/b/2.avro", + "schema-id": 1 + } + ], + "snapshot-log": [ + { + "snapshot-id": 3051729675574597004, + "timestamp-ms": 1515100955770 + }, + { + "snapshot-id": 3055729675574597004, + "timestamp-ms": 1555100955770 + } + ], + "metadata-log": [] +} diff --git a/src/iceberg/test/schema_json_test.cc b/src/iceberg/test/schema_json_test.cc index 08275a45c..71fc474f4 100644 --- a/src/iceberg/test/schema_json_test.cc +++ b/src/iceberg/test/schema_json_test.cc @@ -24,6 +24,7 @@ #include #include +#include "iceberg/expression/literal.h" #include "iceberg/json_serde_internal.h" #include "iceberg/schema.h" #include "iceberg/schema_field.h" @@ -137,6 +138,50 @@ TEST(SchemaJsonTest, RoundTrip) { ASSERT_EQ(dumped_json, json); } +TEST(SchemaJsonTest, FieldWithDefaultValuesRoundTrip) { + constexpr std::string_view json = + R"({"fields":[{"id":1,"initial-default":42,"name":"id","required":true,"type":"int","write-default":7},{"id":2,"initial-default":"n/a","name":"name","required":false,"type":"string"}],"schema-id":1,"type":"struct"})"; + + ICEBERG_UNWRAP_OR_FAIL(auto schema, SchemaFromJson(nlohmann::json::parse(json))); + ASSERT_EQ(schema->fields().size(), 2); + + const auto& field1 = schema->fields()[0]; + ASSERT_TRUE(field1.initial_default().has_value()); + ASSERT_EQ(field1.initial_default()->get(), Literal::Int(42)); + ASSERT_TRUE(field1.write_default().has_value()); + ASSERT_EQ(field1.write_default()->get(), Literal::Int(7)); + + const auto& field2 = schema->fields()[1]; + ASSERT_TRUE(field2.initial_default().has_value()); + ASSERT_EQ(field2.initial_default()->get(), Literal::String("n/a")); + ASSERT_FALSE(field2.write_default().has_value()); + + ASSERT_EQ(ToJson(*schema).dump(), json); +} + +TEST(SchemaJsonTest, FieldWithMismatchedDefaultValueFails) { + constexpr std::string_view json = + R"({"fields":[{"id":1,"initial-default":"oops","name":"id","required":true,"type":"int"}],"schema-id":1,"type":"struct"})"; + + auto result = SchemaFromJson(nlohmann::json::parse(json)); + ASSERT_FALSE(result.has_value()); +} + +TEST(SchemaJsonTest, NestedFieldWithDefaultValuesRoundTrip) { + constexpr std::string_view json = + R"({"fields":[{"id":1,"name":"person","required":true,"type":{"fields":[{"id":2,"initial-default":18,"name":"age","required":true,"type":"int","write-default":21}],"type":"struct"}}],"schema-id":1,"type":"struct"})"; + + ICEBERG_UNWRAP_OR_FAIL(auto schema, SchemaFromJson(nlohmann::json::parse(json))); + const auto& person = schema->fields()[0]; + const auto& nested = dynamic_cast(*person.type()).fields()[0]; + ASSERT_TRUE(nested.initial_default().has_value()); + ASSERT_EQ(nested.initial_default()->get(), Literal::Int(18)); + ASSERT_TRUE(nested.write_default().has_value()); + ASSERT_EQ(nested.write_default()->get(), Literal::Int(21)); + + ASSERT_EQ(ToJson(*schema).dump(), json); +} + TEST(SchemaJsonTest, UnknownFieldRoundTrip) { constexpr std::string_view json = R"({"fields":[{"id":1,"name":"mystery","required":false,"type":"unknown"}],"schema-id":1,"type":"struct"})"; diff --git a/src/iceberg/test/schema_test.cc b/src/iceberg/test/schema_test.cc index 8f1b20035..44b9a60f5 100644 --- a/src/iceberg/test/schema_test.cc +++ b/src/iceberg/test/schema_test.cc @@ -25,6 +25,7 @@ #include #include +#include "iceberg/expression/literal.h" #include "iceberg/result.h" #include "iceberg/schema_field.h" #include "iceberg/table_metadata.h" @@ -133,6 +134,57 @@ TEST(SchemaTest, ValidateRejectsV3TypesBeforeFormatV3) { iceberg::IsOk()); } +TEST(SchemaTest, ValidateRejectsDefaultValuesBeforeFormatV3) { + iceberg::Schema schema({iceberg::SchemaField(1, "id", iceberg::int32(), false) + .WithInitialDefault(iceberg::Literal::Int(42))}); + + auto status = schema.Validate(2); + ASSERT_THAT(status, iceberg::IsError(iceberg::ErrorKind::kInvalidSchema)); + EXPECT_THAT(status, + iceberg::HasErrorMessage("default values are not supported until v3")); + + EXPECT_THAT(schema.Validate(iceberg::TableMetadata::kSupportedTableFormatVersion), + iceberg::IsOk()); +} + +TEST(SchemaTest, ReassignIdsPreservesDefaultValues) { + // Rebuilding fields during ID reassignment must carry default values along. + iceberg::Schema schema({iceberg::SchemaField(1, "id", iceberg::int32(), false) + .WithInitialDefault(iceberg::Literal::Int(42)) + .WithWriteDefault(iceberg::Literal::Int(7))}, + iceberg::Schema::kInitialSchemaId, + /*get_id=*/[](int32_t id) { return id + 100; }); + + ICEBERG_UNWRAP_OR_FAIL(auto field, schema.FindFieldById(101)); + ASSERT_TRUE(field.has_value()); + ASSERT_TRUE(field->get().initial_default().has_value()); + EXPECT_EQ(field->get().initial_default()->get(), iceberg::Literal::Int(42)); + ASSERT_TRUE(field->get().write_default().has_value()); + EXPECT_EQ(field->get().write_default()->get(), iceberg::Literal::Int(7)); +} + +TEST(SchemaTest, ValidateRejectsWriteDefaultBeforeFormatV3) { + iceberg::Schema schema({iceberg::SchemaField(1, "id", iceberg::int32(), false) + .WithWriteDefault(iceberg::Literal::Int(7))}); + + auto status = schema.Validate(2); + ASSERT_THAT(status, iceberg::IsError(iceberg::ErrorKind::kInvalidSchema)); + EXPECT_THAT(status, + iceberg::HasErrorMessage("default values are not supported until v3")); + + EXPECT_THAT(schema.Validate(iceberg::TableMetadata::kSupportedTableFormatVersion), + iceberg::IsOk()); +} + +TEST(SchemaTest, ValidateRejectsMismatchedDefaultValue) { + iceberg::Schema schema({iceberg::SchemaField(1, "id", iceberg::int32(), false) + .WithWriteDefault(iceberg::Literal::String("oops"))}); + + auto status = schema.Validate(iceberg::TableMetadata::kSupportedTableFormatVersion); + ASSERT_THAT(status, iceberg::IsError(iceberg::ErrorKind::kInvalidSchema)); + EXPECT_THAT(status, iceberg::HasErrorMessage("write-default")); +} + TEST(SchemaTest, ValidateRejectsInvalidUnknownFields) { iceberg::Schema required_unknown_schema( {iceberg::SchemaField(1, "mystery", iceberg::unknown(), false)}); diff --git a/src/iceberg/test/schema_util_test.cc b/src/iceberg/test/schema_util_test.cc index ee075006f..23e986fad 100644 --- a/src/iceberg/test/schema_util_test.cc +++ b/src/iceberg/test/schema_util_test.cc @@ -24,6 +24,7 @@ #include #include +#include "iceberg/expression/literal.h" #include "iceberg/metadata_columns.h" #include "iceberg/schema.h" #include "iceberg/schema_field.h" @@ -179,6 +180,58 @@ TEST(SchemaUtilTest, ProjectMissingRequiredField) { ASSERT_THAT(projection_result, HasErrorMessage("Missing required field")); } +TEST(SchemaUtilTest, ProjectMissingRequiredFieldWithInitialDefault) { + Schema source_schema = CreateFlatSchema(); + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", iceberg::int64()), + SchemaField::MakeRequired(/*field_id=*/10, "extra", iceberg::int32()) + .WithInitialDefault(Literal::Int(42)), + }); + + auto projection_result = + Project(expected_schema, source_schema, /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 2); + AssertProjectedField(projection.fields[0], 0); + ASSERT_EQ(projection.fields[1].kind, FieldProjection::Kind::kDefault); + ASSERT_EQ(std::get(projection.fields[1].from), Literal::Int(42)); +} + +TEST(SchemaUtilTest, ProjectMissingOptionalFieldWithInitialDefault) { + // An optional field with an initial-default reads the default, not null. + Schema source_schema = CreateFlatSchema(); + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", iceberg::int64()), + SchemaField::MakeOptional(/*field_id=*/10, "extra", iceberg::string()) + .WithInitialDefault(Literal::String("n/a")), + }); + + auto projection_result = + Project(expected_schema, source_schema, /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 2); + ASSERT_EQ(projection.fields[1].kind, FieldProjection::Kind::kDefault); + ASSERT_EQ(std::get(projection.fields[1].from), Literal::String("n/a")); +} + +TEST(SchemaUtilTest, ProjectPresentFieldIgnoresInitialDefault) { + // initial-default only applies when the field is missing from the data file. + Schema source_schema = CreateFlatSchema(); + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "id", iceberg::int64()) + .WithInitialDefault(Literal::Long(-1)), + }); + + auto projection_result = + Project(expected_schema, source_schema, /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + AssertProjectedField(projection_result->fields[0], 0); +} + TEST(SchemaUtilTest, ProjectMetadataColumn) { Schema source_schema = CreateFlatSchema(); Schema expected_schema({ diff --git a/src/iceberg/test/update_schema_test.cc b/src/iceberg/test/update_schema_test.cc index 07872e69a..6cb14fd41 100644 --- a/src/iceberg/test/update_schema_test.cc +++ b/src/iceberg/test/update_schema_test.cc @@ -19,11 +19,13 @@ #include "iceberg/update/update_schema.h" +#include #include #include #include +#include "iceberg/expression/literal.h" #include "iceberg/schema.h" #include "iceberg/schema_field.h" #include "iceberg/test/matchers.h" @@ -82,6 +84,210 @@ TEST_F(UpdateSchemaTest, AddRequiredColumnWithAllowIncompatible) { EXPECT_EQ(new_field.doc(), "A required string column"); } +/// Default values require a v3 table for Apply() to validate successfully. +class UpdateSchemaDefaultValueTest : public UpdateSchemaTest { + protected: + std::string MetadataResource() const override { return "TableMetadataV3Valid.json"; } +}; + +TEST_F(UpdateSchemaTest, AddColumnWithDefaultValueRequiresV3) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema()); + update->AddColumn("new_col", int32(), "An integer column", Literal::Int(42)); + + auto result = update->Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kInvalidSchema)); + EXPECT_THAT(result, HasErrorMessage("default values are not supported until v3")); +} + +TEST_F(UpdateSchemaDefaultValueTest, AddColumnWithDefaultValue) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema()); + update->AddColumn("new_col", int32(), "An integer column", Literal::Int(42)); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + ICEBERG_UNWRAP_OR_FAIL(auto new_field_opt, result.schema->FindFieldByName("new_col")); + ASSERT_TRUE(new_field_opt.has_value()); + + const auto& new_field = new_field_opt->get(); + ASSERT_TRUE(new_field.initial_default().has_value()); + EXPECT_EQ(new_field.initial_default()->get(), Literal::Int(42)); + ASSERT_TRUE(new_field.write_default().has_value()); + EXPECT_EQ(new_field.write_default()->get(), Literal::Int(42)); +} + +TEST_F(UpdateSchemaDefaultValueTest, AddRequiredColumnWithDefaultValue) { + // A required column with a default does not need AllowIncompatibleChanges(): + // old rows read the initial-default instead of null. + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema()); + update->AddRequiredColumn("required_col", string(), "A required string column", + Literal::String("n/a")); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + ICEBERG_UNWRAP_OR_FAIL(auto new_field_opt, + result.schema->FindFieldByName("required_col")); + ASSERT_TRUE(new_field_opt.has_value()); + + const auto& new_field = new_field_opt->get(); + EXPECT_FALSE(new_field.optional()); + ASSERT_TRUE(new_field.initial_default().has_value()); + EXPECT_EQ(new_field.initial_default()->get(), Literal::String("n/a")); + ASSERT_TRUE(new_field.write_default().has_value()); + EXPECT_EQ(new_field.write_default()->get(), Literal::String("n/a")); +} + +TEST_F(UpdateSchemaDefaultValueTest, AddColumnWithMismatchedDefaultValueFails) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema()); + update->AddColumn("new_col", int32(), "An integer column", Literal::String("oops")); + + auto result = update->Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Cannot cast default value")); +} + +TEST_F(UpdateSchemaDefaultValueTest, AddColumnWithNarrowingDefaultValueFails) { + // CastTo signals narrowing with AboveMax/BelowMin sentinels; they must not be + // stored as defaults. + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema()); + update->AddColumn("new_col", int32(), "An integer column", + Literal::Long(std::numeric_limits::max())); + + auto result = update->Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Cannot cast default value")); +} + +TEST_F(UpdateSchemaDefaultValueTest, UpdateColumnDefault) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema()); + update->AddColumn("new_col", int32(), "An integer column", Literal::Int(42)) + .UpdateColumnDefault("new_col", Literal::Int(7)); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + ICEBERG_UNWRAP_OR_FAIL(auto new_field_opt, result.schema->FindFieldByName("new_col")); + ASSERT_TRUE(new_field_opt.has_value()); + + const auto& new_field = new_field_opt->get(); + // initial-default is fixed at column addition; write-default is updated. + ASSERT_TRUE(new_field.initial_default().has_value()); + EXPECT_EQ(new_field.initial_default()->get(), Literal::Int(42)); + ASSERT_TRUE(new_field.write_default().has_value()); + EXPECT_EQ(new_field.write_default()->get(), Literal::Int(7)); +} + +TEST_F(UpdateSchemaDefaultValueTest, UpdateColumnDefaultOnExistingColumn) { + // Updating the write-default of a pre-existing column must survive Apply(). + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema()); + update->UpdateColumnDefault("x", Literal::Long(0)); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + ICEBERG_UNWRAP_OR_FAIL(auto field_opt, result.schema->FindFieldByName("x")); + ASSERT_TRUE(field_opt.has_value()); + + const auto& field = field_opt->get(); + EXPECT_FALSE(field.initial_default().has_value()); + ASSERT_TRUE(field.write_default().has_value()); + EXPECT_EQ(field.write_default()->get(), Literal::Long(0)); +} + +TEST_F(UpdateSchemaDefaultValueTest, UpdateColumnDefaultClearsWithNullopt) { + // Passing std::nullopt removes the write-default (Java parity with null). + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema()); + update->AddColumn("new_col", int32(), "An integer column", Literal::Int(42)) + .UpdateColumnDefault("new_col", std::nullopt); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + ICEBERG_UNWRAP_OR_FAIL(auto field_opt, result.schema->FindFieldByName("new_col")); + ASSERT_TRUE(field_opt.has_value()); + + const auto& field = field_opt->get(); + // initial-default stays; write-default is cleared. + ASSERT_TRUE(field.initial_default().has_value()); + EXPECT_EQ(field.initial_default()->get(), Literal::Int(42)); + EXPECT_FALSE(field.write_default().has_value()); +} + +TEST_F(UpdateSchemaDefaultValueTest, AddNestedColumnPreservesNestedDefaults) { + // The added column's type gets fresh field ids; defaults on its nested fields must + // survive the reassignment. + auto nested_type = std::make_shared(std::vector{ + SchemaField::MakeRequired(/*field_id=*/100, "inner", int32()) + .WithInitialDefault(Literal::Int(5)) + .WithWriteDefault(Literal::Int(9))}); + + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema()); + update->AddColumn("outer", nested_type, "A nested column"); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + ICEBERG_UNWRAP_OR_FAIL(auto outer_opt, result.schema->FindFieldByName("outer")); + ASSERT_TRUE(outer_opt.has_value()); + + const auto& outer_struct = + internal::checked_cast(*outer_opt->get().type()); + ASSERT_EQ(outer_struct.fields().size(), 1); + const SchemaField& inner = outer_struct.fields()[0]; + ASSERT_TRUE(inner.initial_default().has_value()); + EXPECT_EQ(inner.initial_default()->get(), Literal::Int(5)); + ASSERT_TRUE(inner.write_default().has_value()); + EXPECT_EQ(inner.write_default()->get(), Literal::Int(9)); +} + +TEST_F(UpdateSchemaDefaultValueTest, UpdateColumnDefaultCastsToColumnType) { + // Match Java: an int default for a long column is cast, not rejected. + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema()); + update->UpdateColumnDefault("x", Literal::Int(5)); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + ICEBERG_UNWRAP_OR_FAIL(auto field_opt, result.schema->FindFieldByName("x")); + ASSERT_TRUE(field_opt.has_value()); + + const auto& field = field_opt->get(); + ASSERT_TRUE(field.write_default().has_value()); + EXPECT_EQ(field.write_default()->get(), Literal::Long(5)); +} + +TEST_F(UpdateSchemaDefaultValueTest, RequireColumnAddedWithDefault) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema()); + update->AddColumn("new_col", int32(), "An integer column", Literal::Int(42)) + .RequireColumn("new_col"); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + ICEBERG_UNWRAP_OR_FAIL(auto new_field_opt, result.schema->FindFieldByName("new_col")); + ASSERT_TRUE(new_field_opt.has_value()); + EXPECT_FALSE(new_field_opt->get().optional()); +} + +TEST_F(UpdateSchemaDefaultValueTest, UpdateColumnDocPreservesDefaultValues) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema()); + update->AddColumn("new_col", int32(), "An integer column", Literal::Int(42)) + .UpdateColumnDoc("new_col", "updated doc"); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + ICEBERG_UNWRAP_OR_FAIL(auto field_opt, result.schema->FindFieldByName("new_col")); + ASSERT_TRUE(field_opt.has_value()); + + const auto& field = field_opt->get(); + EXPECT_EQ(field.doc(), "updated doc"); + ASSERT_TRUE(field.initial_default().has_value()); + EXPECT_EQ(field.initial_default()->get(), Literal::Int(42)); + ASSERT_TRUE(field.write_default().has_value()); + EXPECT_EQ(field.write_default()->get(), Literal::Int(42)); +} + +TEST_F(UpdateSchemaDefaultValueTest, UpdateColumnTypePromotesDefaultValues) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema()); + update->AddColumn("new_col", int32(), "An integer column", Literal::Int(42)) + .UpdateColumn("new_col", int64()); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + ICEBERG_UNWRAP_OR_FAIL(auto field_opt, result.schema->FindFieldByName("new_col")); + ASSERT_TRUE(field_opt.has_value()); + + const auto& field = field_opt->get(); + EXPECT_EQ(field.type(), int64()); + ASSERT_TRUE(field.initial_default().has_value()); + EXPECT_EQ(field.initial_default()->get(), Literal::Long(42)); + ASSERT_TRUE(field.write_default().has_value()); + EXPECT_EQ(field.write_default()->get(), Literal::Long(42)); +} + TEST_F(UpdateSchemaTest, AddMultipleColumns) { ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema()); update->AddColumn("col1", int32(), "First column") diff --git a/src/iceberg/update/update_schema.cc b/src/iceberg/update/update_schema.cc index 0e7f147b0..ce2be0790 100644 --- a/src/iceberg/update/update_schema.cc +++ b/src/iceberg/update/update_schema.cc @@ -29,6 +29,7 @@ #include #include +#include "iceberg/expression/literal.h" #include "iceberg/json_serde_internal.h" #include "iceberg/name_mapping.h" #include "iceberg/schema.h" @@ -39,6 +40,7 @@ #include "iceberg/type.h" #include "iceberg/util/checked_cast.h" #include "iceberg/util/error_collector.h" +#include "iceberg/util/formatter.h" // IWYU pragma: keep #include "iceberg/util/macros.h" #include "iceberg/util/string_util.h" #include "iceberg/util/type_util.h" @@ -207,12 +209,12 @@ class ApplyChangesVisitor { // any child fields that were added. if (update_it != updates_.end()) { - const auto& update_field = update_it->second; - return SchemaField(field_id, update_field->name(), std::move(result_type), - update_field->optional(), update_field->doc()); + // The updated field already carries the new name/doc/nullability/defaults; only + // its id and (recursively rewritten) type need to be applied. + return update_it->second->WithIdAndType(field_id, std::move(result_type)); } else if (result_type != field.type()) { - return SchemaField(field_id, field.name(), std::move(result_type), field.optional(), - field.doc()); + // A nested type was rewritten by recursion; preserve every other attribute. + return field.WithIdAndType(field_id, std::move(result_type)); } else { return field; } @@ -339,39 +341,43 @@ UpdateSchema& UpdateSchema::CaseSensitive(bool case_sensitive) { } UpdateSchema& UpdateSchema::AddColumn(std::string_view name, std::shared_ptr type, - std::string_view doc) { + std::string_view doc, + std::optional default_value) { ICEBERG_BUILDER_CHECK(!name.contains('.'), "Cannot add column with ambiguous name: {}, use " "AddColumn(parent, name, type, doc)", name); - return AddColumnInternal(std::nullopt, name, /*is_optional=*/true, std::move(type), - doc); + return AddColumnInternal(std::nullopt, name, /*is_optional=*/true, std::move(type), doc, + std::move(default_value)); } UpdateSchema& UpdateSchema::AddColumn(std::optional parent, std::string_view name, std::shared_ptr type, - std::string_view doc) { + std::string_view doc, + std::optional default_value) { return AddColumnInternal(std::move(parent), name, /*is_optional=*/true, std::move(type), - doc); + doc, std::move(default_value)); } UpdateSchema& UpdateSchema::AddRequiredColumn(std::string_view name, std::shared_ptr type, - std::string_view doc) { + std::string_view doc, + std::optional default_value) { ICEBERG_BUILDER_CHECK(!name.contains('.'), "Cannot add column with ambiguous name: {}, use " "AddRequiredColumn(parent, name, type, doc)", name); return AddColumnInternal(std::nullopt, name, /*is_optional=*/false, std::move(type), - doc); + doc, std::move(default_value)); } UpdateSchema& UpdateSchema::AddRequiredColumn(std::optional parent, std::string_view name, std::shared_ptr type, - std::string_view doc) { + std::string_view doc, + std::optional default_value) { return AddColumnInternal(std::move(parent), name, /*is_optional=*/false, - std::move(type), doc); + std::move(type), doc, std::move(default_value)); } UpdateSchema& UpdateSchema::UpdateColumn(std::string_view name, @@ -393,8 +399,29 @@ UpdateSchema& UpdateSchema::UpdateColumn(std::string_view name, "Cannot change column type: {}: {} -> {}", name, field.type()->ToString(), new_type->ToString()); - updates_[field_id] = std::make_shared( - field.field_id(), field.name(), new_type, field.optional(), field.doc()); + // Promote any default values along with the column type. + auto promote_default = [&](std::optional> value) + -> Result> { + if (!value.has_value()) { + return std::nullopt; + } + ICEBERG_ASSIGN_OR_RAISE(Literal promoted, value->get().CastTo(new_type)); + return std::optional(std::move(promoted)); + }; + ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::optional initial_default, + promote_default(field.initial_default())); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::optional write_default, + promote_default(field.write_default())); + + SchemaField updated(field.field_id(), field.name(), new_type, field.optional(), + field.doc()); + if (initial_default.has_value()) { + updated = updated.WithInitialDefault(std::move(*initial_default)); + } + if (write_default.has_value()) { + updated = updated.WithWriteDefault(std::move(*write_default)); + } + updates_[field_id] = std::make_shared(std::move(updated)); return *this; } @@ -414,9 +441,48 @@ UpdateSchema& UpdateSchema::UpdateColumnDoc(std::string_view name, return *this; } + updates_[field_id] = std::make_shared( + SchemaField(field.field_id(), field.name(), field.type(), field.optional(), + std::string(new_doc)) + .WithInitialDefault(field.initial_default()) + .WithWriteDefault(field.write_default())); + + return *this; +} + +UpdateSchema& UpdateSchema::UpdateColumnDefault(std::string_view name, + std::optional new_default) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto field_opt, FindFieldForUpdate(name)); + ICEBERG_BUILDER_CHECK(field_opt.has_value(), "Cannot update missing column: {}", name); + + const auto& field = field_opt->get(); + int32_t field_id = field.field_id(); + + ICEBERG_BUILDER_CHECK(!deletes_.contains(field_id), + "Cannot update a column that will be deleted: {}", field.name()); + + // An empty default clears the column's write-default. + if (!new_default.has_value()) { + updates_[field_id] = + std::make_shared(field.WithWriteDefault(std::nullopt)); + return *this; + } + + ICEBERG_BUILDER_CHECK(field.type()->is_primitive(), + "Invalid default value for {}: {} (must be null)", *field.type(), + *new_default); + ICEBERG_BUILDER_ASSIGN_OR_RETURN_WITH_ERROR( + Literal typed_default, + new_default->CastTo(std::static_pointer_cast(field.type())), + "Cannot cast default value to {}: {}", *field.type(), *new_default); + // CastTo reports narrowing by returning sentinel values instead of failing. + ICEBERG_BUILDER_CHECK(!typed_default.IsNull() && !typed_default.IsAboveMax() && + !typed_default.IsBelowMin(), + "Cannot cast default value to {}: {}", *field.type(), + *new_default); + updates_[field_id] = - std::make_shared(field.field_id(), field.name(), field.type(), - field.optional(), std::string(new_doc)); + std::make_shared(field.WithWriteDefault(std::move(typed_default))); return *this; } @@ -438,8 +504,10 @@ UpdateSchema& UpdateSchema::RenameColumn(std::string_view name, update_it != updates_.end() ? *update_it->second : field; updates_[field_id] = std::make_shared( - base_field.field_id(), std::string(new_name), base_field.type(), - base_field.optional(), base_field.doc()); + SchemaField(base_field.field_id(), std::string(new_name), base_field.type(), + base_field.optional(), base_field.doc()) + .WithInitialDefault(base_field.initial_default()) + .WithWriteDefault(base_field.write_default())); auto it = std::ranges::find(identifier_field_names_, name); if (it != identifier_field_names_.end()) { @@ -468,9 +536,10 @@ UpdateSchema& UpdateSchema::UpdateColumnRequirementInternal(std::string_view nam return *this; } - // TODO(GuotaoYu): support added column with default value - // bool is_defaulted_add = IsAdded(name) && field.initial_default() != null; - bool is_defaulted_add = false; + // A column added in this update with a default value can be made required: rows + // written before the change read the initial-default instead of null. + bool is_defaulted_add = added_name_to_id_.contains(CaseSensitivityAwareName(name)) && + field.initial_default().has_value(); ICEBERG_BUILDER_CHECK(is_optional || is_defaulted_add || allow_incompatible_changes_, "Cannot change column nullability: {}: optional -> required", @@ -623,7 +692,8 @@ Result UpdateSchema::Apply() { UpdateSchema& UpdateSchema::AddColumnInternal(std::optional parent, std::string_view name, bool is_optional, std::shared_ptr type, - std::string_view doc) { + std::string_view doc, + std::optional default_value) { int32_t parent_id = kTableRootId; std::string full_name; @@ -677,7 +747,7 @@ UpdateSchema& UpdateSchema::AddColumnInternal(std::optional pa } ICEBERG_BUILDER_CHECK( - is_optional || allow_incompatible_changes_, + is_optional || default_value.has_value() || allow_incompatible_changes_, "Incompatible change: cannot add required column without a default value: {}", full_name); @@ -691,11 +761,28 @@ UpdateSchema& UpdateSchema::AddColumnInternal(std::optional pa AssignFreshIdVisitor id_assigner([this]() { return AssignNewColumnId(); }); auto type_with_fresh_ids = id_assigner.Visit(type); - auto new_field = std::make_shared(new_id, std::string(name), - std::move(type_with_fresh_ids), - is_optional, std::string(doc)); + SchemaField new_field(new_id, std::string(name), std::move(type_with_fresh_ids), + is_optional, std::string(doc)); + if (default_value.has_value()) { + ICEBERG_BUILDER_CHECK(new_field.type()->is_primitive(), + "Invalid default value for {}: {} (must be null)", + *new_field.type(), *default_value); + ICEBERG_BUILDER_ASSIGN_OR_RETURN_WITH_ERROR( + Literal typed_default, + default_value->CastTo(std::static_pointer_cast(new_field.type())), + "Cannot cast default value to {}: {}", *new_field.type(), *default_value); + // CastTo reports narrowing by returning sentinel values instead of failing. + ICEBERG_BUILDER_CHECK(!typed_default.IsNull() && !typed_default.IsAboveMax() && + !typed_default.IsBelowMin(), + "Cannot cast default value to {}: {}", *new_field.type(), + *default_value); + // The default of a new column applies both to existing rows (initial-default) + // and to writers that do not supply a value (write-default). + new_field = new_field.WithInitialDefault(typed_default) + .WithWriteDefault(std::move(typed_default)); + } - updates_[new_id] = std::move(new_field); + updates_[new_id] = std::make_shared(std::move(new_field)); parent_to_added_ids_[parent_id].push_back(new_id); return *this; diff --git a/src/iceberg/update/update_schema.h b/src/iceberg/update/update_schema.h index 2be3732a0..7b5a09515 100644 --- a/src/iceberg/update/update_schema.h +++ b/src/iceberg/update/update_schema.h @@ -30,6 +30,7 @@ #include #include +#include "iceberg/expression/literal.h" #include "iceberg/iceberg_export.h" #include "iceberg/result.h" #include "iceberg/type_fwd.h" @@ -41,10 +42,6 @@ namespace iceberg { /// /// When committing, these changes will be applied to the current table metadata. /// Commit conflicts will not be resolved and will result in a CommitFailed error. -/// -/// TODO(Guotao Yu): Add support for V3 default values when adding columns. Currently, all -/// added columns use null as the default value, but Iceberg V3 supports custom -/// default values for new columns. class ICEBERG_EXPORT UpdateSchema : public PendingUpdate { public: static Result> Make( @@ -78,15 +75,19 @@ class ICEBERG_EXPORT UpdateSchema : public PendingUpdate { /// If type is a nested type, its field IDs are reassigned when added to the /// existing schema. /// - /// The added column will be optional with a null default value. + /// Without a default value, the added column is optional with a null default. + /// With a default value (v3+), it is used as both the `initial-default` and the + /// `write-default` of the new column. /// /// \param name Name for the new column. /// \param type Type for the new column. /// \param doc Documentation string for the new column. + /// \param default_value Optional default value for the new column (v3+). /// \return Reference to this for method chaining. /// \note InvalidArgument will be reported if name contains ".". UpdateSchema& AddColumn(std::string_view name, std::shared_ptr type, - std::string_view doc = ""); + std::string_view doc = "", + std::optional default_value = std::nullopt); /// \brief Add a new optional column to a nested struct with documentation. /// @@ -102,22 +103,28 @@ class ICEBERG_EXPORT UpdateSchema : public PendingUpdate { /// If type is a nested type, its field IDs are reassigned when added to the /// existing schema. /// - /// The added column will be optional with a null default value. + /// Without a default value, the added column is optional with a null default. + /// With a default value (v3+), it is used as both the `initial-default` and the + /// `write-default` of the new column. /// /// \param parent Name of the parent struct to which the column will be added. /// \param name Name for the new column. /// \param type Type for the new column. /// \param doc Documentation string for the new column. + /// \param default_value Optional default value for the new column (v3+). /// \return Reference to this for method chaining. /// \note InvalidArgument will be reported if parent doesn't identify a struct. UpdateSchema& AddColumn(std::optional parent, std::string_view name, - std::shared_ptr type, std::string_view doc = ""); + std::shared_ptr type, std::string_view doc = "", + std::optional default_value = std::nullopt); /// \brief Add a new required top-level column with documentation. /// /// Adding a required column without a default is an incompatible change that can /// break reading older data. To suppress exceptions thrown when an incompatible - /// change is detected, call AllowIncompatibleChanges(). + /// change is detected, call AllowIncompatibleChanges() or provide a default value + /// (v3+), which is used as both the `initial-default` and the `write-default` of + /// the new column. /// /// Because "." may be interpreted as a column path separator or may be used in /// field names, it is not allowed in names passed to this method. To add to nested @@ -130,16 +137,20 @@ class ICEBERG_EXPORT UpdateSchema : public PendingUpdate { /// \param name Name for the new column. /// \param type Type for the new column. /// \param doc Documentation string for the new column. + /// \param default_value Optional default value for the new column (v3+). /// \return Reference to this for method chaining. /// \note InvalidArgument will be reported if name contains ".". UpdateSchema& AddRequiredColumn(std::string_view name, std::shared_ptr type, - std::string_view doc = ""); + std::string_view doc = "", + std::optional default_value = std::nullopt); /// \brief Add a new required column to a nested struct with documentation. /// /// Adding a required column without a default is an incompatible change that can /// break reading older data. To suppress exceptions thrown when an incompatible - /// change is detected, call AllowIncompatibleChanges(). + /// change is detected, call AllowIncompatibleChanges() or provide a default value + /// (v3+), which is used as both the `initial-default` and the `write-default` of + /// the new column. /// /// The parent name is used to find the parent using Schema::FindFieldByName(). If /// the parent name is null or empty, the new column will be added to the root as a @@ -157,11 +168,13 @@ class ICEBERG_EXPORT UpdateSchema : public PendingUpdate { /// \param name Name for the new column. /// \param type Type for the new column. /// \param doc Documentation string for the new column. + /// \param default_value Optional default value for the new column (v3+). /// \return Reference to this for method chaining. /// \note InvalidArgument will be reported if parent doesn't identify a struct. UpdateSchema& AddRequiredColumn(std::optional parent, std::string_view name, std::shared_ptr type, - std::string_view doc = ""); + std::string_view doc = "", + std::optional default_value = std::nullopt); /// \brief Rename a column in the schema. /// @@ -210,6 +223,22 @@ class ICEBERG_EXPORT UpdateSchema : public PendingUpdate { /// the column will be deleted. UpdateSchema& UpdateColumnDoc(std::string_view name, std::string_view new_doc); + /// \brief Update the `write-default` value for a column (v3+). + /// + /// The name is used to find the column to update using Schema::FindFieldByName(). + /// The column's `initial-default` is not changed: it is fixed when the column is + /// added and applies to rows that predate the column. + /// + /// \param name Name of the column to update the default value for. + /// \param new_default Replacement `write-default` value for the column, or + /// `std::nullopt` to clear it. + /// \return Reference to this for method chaining. + /// \note InvalidArgument will be reported if name doesn't identify a column in the + /// schema or if + /// the column will be deleted. + UpdateSchema& UpdateColumnDefault(std::string_view name, + std::optional new_default); + /// \brief Update a column to be optional. /// /// \param name Name of the column to mark optional. @@ -367,7 +396,8 @@ class ICEBERG_EXPORT UpdateSchema : public PendingUpdate { /// \return Reference to this for method chaining. UpdateSchema& AddColumnInternal(std::optional parent, std::string_view name, bool is_optional, - std::shared_ptr type, std::string_view doc); + std::shared_ptr type, std::string_view doc, + std::optional default_value); /// \brief Internal implementation for updating column requirement (optional/required). /// diff --git a/src/iceberg/util/type_util.cc b/src/iceberg/util/type_util.cc index cb01be08f..1e22073a4 100644 --- a/src/iceberg/util/type_util.cc +++ b/src/iceberg/util/type_util.cc @@ -216,8 +216,7 @@ Result> PruneColumnVisitor::Visit(const SchemaField& field SchemaField PruneColumnVisitor::MakeField(const SchemaField& field, std::shared_ptr type) { - return {field.field_id(), std::string(field.name()), std::move(type), field.optional(), - std::string(field.doc())}; + return field.WithIdAndType(field.field_id(), std::move(type)); } Result> PruneColumnVisitor::Visit( @@ -364,9 +363,7 @@ std::shared_ptr AssignFreshIdVisitor::Visit(const StructType& type) std::vector fresh_fields; for (size_t i = 0; i < type.fields().size(); ++i) { const auto& field = type.fields()[i]; - fresh_fields.emplace_back(fresh_ids[i], std::string(field.name()), - Visit(field.type()), field.optional(), - std::string(field.doc())); + fresh_fields.push_back(field.WithIdAndType(fresh_ids[i], Visit(field.type()))); } return std::make_shared(std::move(fresh_fields)); } @@ -374,10 +371,8 @@ std::shared_ptr AssignFreshIdVisitor::Visit(const StructType& type) std::shared_ptr AssignFreshIdVisitor::Visit(const ListType& type) const { const auto& elem_field = type.fields()[0]; int32_t fresh_id = next_id_(); - SchemaField fresh_elem_field(fresh_id, std::string(elem_field.name()), - Visit(elem_field.type()), elem_field.optional(), - std::string(elem_field.doc())); - return std::make_shared(std::move(fresh_elem_field)); + return std::make_shared( + elem_field.WithIdAndType(fresh_id, Visit(elem_field.type()))); } std::shared_ptr AssignFreshIdVisitor::Visit(const MapType& type) const { @@ -387,14 +382,9 @@ std::shared_ptr AssignFreshIdVisitor::Visit(const MapType& type) const int32_t fresh_key_id = next_id_(); int32_t fresh_value_id = next_id_(); - SchemaField fresh_key_field(fresh_key_id, std::string(key_field.name()), - Visit(key_field.type()), key_field.optional(), - std::string(key_field.doc())); - SchemaField fresh_value_field(fresh_value_id, std::string(value_field.name()), - Visit(value_field.type()), value_field.optional(), - std::string(value_field.doc())); - return std::make_shared(std::move(fresh_key_field), - std::move(fresh_value_field)); + return std::make_shared( + key_field.WithIdAndType(fresh_key_id, Visit(key_field.type())), + value_field.WithIdAndType(fresh_value_id, Visit(value_field.type()))); } Result> AssignFreshIds(int32_t schema_id, const Schema& schema,