Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 27 additions & 14 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl FallbackEncoder {
}

/// Encode `values` to the in-progress page
fn encode<T>(&mut self, values: T, indices: &[usize])
fn encode<T>(&mut self, values: T, indices: impl ExactSizeIterator<Item = usize>)
where
T: ArrayAccessor + Copy,
T::Item: AsRef<[u8]>,
Expand All @@ -174,7 +174,7 @@ impl FallbackEncoder {
match &mut self.encoder {
FallbackEncoderImpl::Plain { buffer } => {
for idx in indices {
let value = values.value(*idx);
let value = values.value(idx);
let value = value.as_ref();
buffer.extend_from_slice((value.len() as u32).as_bytes());
buffer.extend_from_slice(value);
Expand All @@ -183,7 +183,7 @@ impl FallbackEncoder {
}
FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
for idx in indices {
let value = values.value(*idx);
let value = values.value(idx);
let value = value.as_ref();
lengths.put(&[value.len() as i32]).unwrap();
buffer.extend_from_slice(value);
Expand All @@ -197,7 +197,7 @@ impl FallbackEncoder {
suffix_lengths,
} => {
for idx in indices {
let value = values.value(*idx);
let value = values.value(idx);
let value = value.as_ref();
let mut prefix_length = 0;

Expand Down Expand Up @@ -343,15 +343,15 @@ struct DictEncoder {

impl DictEncoder {
/// Encode `values` to the in-progress page
fn encode<T>(&mut self, values: T, indices: &[usize])
fn encode<T>(&mut self, values: T, indices: impl ExactSizeIterator<Item = usize>)
where
T: ArrayAccessor + Copy,
T::Item: AsRef<[u8]>,
{
self.indices.reserve(indices.len());

for idx in indices {
let value = values.value(*idx);
let value = values.value(idx);
let interned = self.interner.intern(value.as_ref());
self.indices.push(interned);
self.variable_length_bytes += value.as_ref().len() as i64;
Expand Down Expand Up @@ -466,12 +466,25 @@ impl ColumnValueEncoder for ByteArrayEncoder {
})
}

fn write(&mut self, _values: &Self::Values, _offset: usize, _len: usize) -> Result<()> {
unreachable!("should call write_gather instead")
fn write(&mut self, values: &Self::Values, offset: usize, len: usize) -> Result<()> {
downcast_op!(
values.data_type(),
values,
encode,
offset..offset + len,
self
);
Ok(())
}

fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()> {
downcast_op!(values.data_type(), values, encode, indices, self);
downcast_op!(
values.data_type(),
values,
encode,
indices.iter().copied(),
self
);
Ok(())
}

Expand Down Expand Up @@ -554,15 +567,16 @@ impl ColumnValueEncoder for ByteArrayEncoder {
/// Encodes the provided `values` and `indices` to `encoder`
///
/// This is a free function so it can be used with `downcast_op!`
fn encode<T>(values: T, indices: &[usize], encoder: &mut ByteArrayEncoder)
fn encode<T, I>(values: T, indices: I, encoder: &mut ByteArrayEncoder)
where
T: ArrayAccessor + Copy,
T::Item: Copy + Ord + AsRef<[u8]>,
I: ExactSizeIterator<Item = usize> + Clone,
{
if encoder.statistics_enabled != EnabledStatistics::None {
if let Some(accumulator) = encoder.geo_stats_accumulator.as_mut() {
update_geo_stats_accumulator(accumulator.as_mut(), values, indices.iter().cloned());
} else if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) {
update_geo_stats_accumulator(accumulator.as_mut(), values, indices.clone());
} else if let Some((min, max)) = compute_min_max(values, indices.clone()) {
if encoder.min_value.as_ref().is_none_or(|m| m > &min) {
encoder.min_value = Some(min);
}
Expand All @@ -575,8 +589,7 @@ where

// encode the values into bloom filter if enabled
if let Some(bloom_filter) = &mut encoder.bloom_filter {
let valid = indices.iter().cloned();
for idx in valid {
for idx in indices.clone() {
bloom_filter.insert(values.value(idx).as_ref());
}
}
Expand Down
Loading
Loading