Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
286 changes: 276 additions & 10 deletions arrow-select/src/coalesce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@
//!
//! [`filter`]: crate::filter::filter
//! [`take`]: crate::take::take
use crate::filter::filter_record_batch;
use crate::filter::{
FilterBuilder, FilterPredicate, IndexIterator, IterationStrategy, SlicesIterator,
filter_record_batch,
};
use crate::take::take_record_batch;
use arrow_array::cast::AsArray;
use arrow_array::types::{BinaryViewType, StringViewType};
use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, downcast_primitive};
use arrow_schema::{ArrowError, DataType, SchemaRef};
Expand Down Expand Up @@ -147,6 +151,13 @@ pub struct BatchCoalescer {
biggest_coalesce_batch_size: Option<usize>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum FusedFilterPath {
InlineBinaryViewOnly,
MixedPrimitiveInlineBinaryView,
Unsupported,
}

impl BatchCoalescer {
/// Create a new `BatchCoalescer`
///
Expand Down Expand Up @@ -238,6 +249,18 @@ impl BatchCoalescer {
batch: RecordBatch,
filter: &BooleanArray,
) -> Result<(), ArrowError> {
match classify_fused_filter_path(&batch) {
FusedFilterPath::InlineBinaryViewOnly => {
return self.push_batch_with_filter_fused_inline_binary_view_only(batch, filter);
}
FusedFilterPath::MixedPrimitiveInlineBinaryView => {
return self.push_batch_with_filter_fused_mixed_primitive_inline_binary_view(
batch, filter,
);
}
FusedFilterPath::Unsupported => {}
}

// TODO: optimize this to avoid materializing (copying the results
// of filter to a new batch)
let filtered_batch = filter_record_batch(&batch, filter)?;
Expand Down Expand Up @@ -566,6 +589,142 @@ impl BatchCoalescer {
}
}

#[inline]
fn classify_fused_filter_path(batch: &RecordBatch) -> FusedFilterPath {
let mut has_inline_binary_view = false;
let mut has_primitive = false;

// The fused path only supports fully inline BinaryView columns, optionally
// mixed with primitive columns. Any non-inline BinaryView still uses the
// existing filter_record_batch path.
for (field, array) in batch.schema().fields().iter().zip(batch.columns()) {
if field.data_type().is_primitive() {
has_primitive = true;
continue;
}

let Some(binary_view) = array.as_binary_view_opt() else {
return FusedFilterPath::Unsupported;
};

if !binary_view.data_buffers().is_empty() {
return FusedFilterPath::Unsupported;
}

has_inline_binary_view = true;
}

match (has_inline_binary_view, has_primitive) {
(true, true) => FusedFilterPath::MixedPrimitiveInlineBinaryView,
(true, false) => FusedFilterPath::InlineBinaryViewOnly,
(false, _) => FusedFilterPath::Unsupported,
}
}

impl BatchCoalescer {
#[inline]
fn push_batch_with_filter_fused_inline_binary_view_only(
&mut self,
batch: RecordBatch,
filter: &BooleanArray,
) -> Result<(), ArrowError> {
self.push_batch_with_filter_fused_inline_binary_view(batch, filter)
}

#[inline]
fn push_batch_with_filter_fused_mixed_primitive_inline_binary_view(
&mut self,
batch: RecordBatch,
filter: &BooleanArray,
) -> Result<(), ArrowError> {
self.push_batch_with_filter_fused_inline_binary_view(batch, filter)
}

fn push_batch_with_filter_fused_inline_binary_view(
&mut self,
batch: RecordBatch,
filter: &BooleanArray,
) -> Result<(), ArrowError> {
if filter.len() > batch.num_rows() {
return Err(ArrowError::InvalidArgumentError(format!(
"Filter predicate of length {} is larger than target array of length {}",
filter.len(),
batch.num_rows()
)));
}

let mut filter_builder = FilterBuilder::new(filter);
if batch.num_columns() > 1 {
filter_builder = filter_builder.optimize();
}
let predicate = filter_builder.build();
let selected_count = predicate.count();

if selected_count == 0 {
return Ok(());
}

if selected_count == batch.num_rows() && filter.len() == batch.num_rows() {
return self.push_batch(batch);
}

if let Some(limit) = self.biggest_coalesce_batch_size {
if selected_count > limit {
let filtered_batch = predicate.filter_record_batch(&batch)?;
return self.push_batch(filtered_batch);
}
}

// For dense inline filters, the existing filter kernel remains faster.
if selected_count.saturating_mul(4) > filter.len() {
let filtered_batch = predicate.filter_record_batch(&batch)?;
return self.push_batch(filtered_batch);
}

let space_in_batch = self.target_batch_size - self.buffered_rows;
if selected_count > space_in_batch {
let filtered_batch = predicate.filter_record_batch(&batch)?;
return self.push_batch(filtered_batch);
}

let (_schema, arrays, _num_rows) = batch.into_parts();

if arrays.len() != self.in_progress_arrays.len() {
return Err(ArrowError::InvalidArgumentError(format!(
"Batch has {} columns but BatchCoalescer expects {}",
arrays.len(),
self.in_progress_arrays.len()
)));
}

self.in_progress_arrays
.iter_mut()
.zip(arrays)
.for_each(|(in_progress, array)| {
in_progress.set_source(Some(array));
});

let result = (|| {
for in_progress in self.in_progress_arrays.iter_mut() {
in_progress.copy_rows_by_filter(&predicate)?;
}

self.buffered_rows += selected_count;
if self.buffered_rows >= self.target_batch_size {
self.finish_buffered_batch()?;
}

Ok(())
})();

for in_progress in self.in_progress_arrays.iter_mut() {
in_progress.set_source(None);
}

result
}
}

/// Return a new `InProgressArray` for the given data type
fn create_in_progress_array(data_type: &DataType, batch_size: usize) -> Box<dyn InProgressArray> {
macro_rules! instantiate_primitive {
Expand Down Expand Up @@ -611,6 +770,39 @@ trait InProgressArray: std::fmt::Debug + Send + Sync {
/// Return an error if the source array is not set
fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError>;

/// Copy rows selected by `filter` from the current source array.
fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(), ArrowError> {
match filter.strategy() {
IterationStrategy::None => Ok(()),
IterationStrategy::All => self.copy_rows(0, filter.count()),
IterationStrategy::Slices(slices) => {
for &(start, end) in slices {
self.copy_rows(start, end - start)?;
}
Ok(())
}
IterationStrategy::SlicesIterator => {
for (start, end) in SlicesIterator::new(filter.filter_array()) {
self.copy_rows(start, end - start)?;
}
Ok(())
}
IterationStrategy::Indices(indices) => self.copy_rows_by_indices(indices),
IterationStrategy::IndexIterator => {
let indices = IndexIterator::new(filter.filter_array(), filter.count()).collect();
self.copy_rows_by_indices(&indices)
}
}
}

/// Copy rows at the specified indices from the current source array.
fn copy_rows_by_indices(&mut self, indices: &[usize]) -> Result<(), ArrowError> {
for &idx in indices {
self.copy_rows(idx, 1)?;
}
Ok(())
}

/// Finish the currently in-progress array and return it as an `ArrayRef`
fn finish(&mut self) -> Result<ArrayRef, ArrowError>;
}
Expand Down Expand Up @@ -1197,6 +1389,78 @@ mod tests {
.run();
}

#[test]
fn test_binary_view_filtered() {
let values: Vec<Option<&[u8]>> = vec![
Some(b"foo"),
None,
Some(b"A longer string that is more than 12 bytes"),
];

let binary_view =
BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
let batch =
RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
let filter = BooleanArray::from_iter((0..1000).map(|idx| Some(idx % 2 == 0)));

Test::new("coalesce_binary_view_filtered")
.with_batch(batch.clone())
.with_filter(filter.clone())
.with_batch(batch)
.with_filter(filter)
.with_batch_size(256)
.with_expected_output_sizes(vec![256, 256, 256, 232])
.run();
}

#[test]
fn test_binary_view_filtered_inline() {
let values: Vec<Option<&[u8]>> = vec![Some(b"foo"), None, Some(b"barbaz")];

let binary_view =
BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
let batch =
RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
let filter = BooleanArray::from_iter((0..1000).map(|idx| Some(idx % 3 != 1)));

Test::new("coalesce_binary_view_filtered_inline")
.with_batch(batch.clone())
.with_filter(filter.clone())
.with_batch(batch)
.with_filter(filter)
.with_batch_size(300)
.with_expected_output_sizes(vec![300, 300, 300, 300, 134])
.run();
}

#[test]
fn test_mixed_inline_binary_view_filtered() {
let int_values = Int32Array::from_iter((0..1000).map(Some));
let float_values = arrow_array::Float64Array::from_iter((0..1000).map(|v| Some(v as f64)));
let binary_values: Vec<Option<&[u8]>> = vec![Some(b"foo"), None, Some(b"barbaz")];
let binary_view = BinaryViewArray::from_iter(
std::iter::repeat(binary_values.iter()).flatten().take(1000),
);

let batch = RecordBatch::try_from_iter(vec![
("i", Arc::new(int_values) as ArrayRef),
("f", Arc::new(float_values) as ArrayRef),
("b", Arc::new(binary_view) as ArrayRef),
])
.unwrap();

let filter = BooleanArray::from_iter((0..1000).map(|idx| Some(idx % 3 != 1)));

Test::new("coalesce_mixed_inline_binary_view_filtered")
.with_batch(batch.clone())
.with_filter(filter.clone())
.with_batch(batch)
.with_filter(filter)
.with_batch_size(300)
.with_expected_output_sizes(vec![300, 300, 300, 300, 134])
.run();
}

#[derive(Debug, Clone, PartialEq)]
struct ExpectedLayout {
len: usize,
Expand Down Expand Up @@ -1701,18 +1965,20 @@ mod tests {
let (schema, mut columns, row_count) = batch.into_parts();

for column in columns.iter_mut() {
let Some(string_view) = column.as_string_view_opt() else {
if let Some(string_view) = column.as_string_view_opt() {
// Re-create the StringViewArray to ensure memory layout is
// consistent
let mut builder = StringViewBuilder::new();
for s in string_view.iter() {
builder.append_option(s);
}
*column = Arc::new(builder.finish());
continue;
};
}

// Re-create the StringViewArray to ensure memory layout is
// consistent
let mut builder = StringViewBuilder::new();
for s in string_view.iter() {
builder.append_option(s);
if let Some(binary_view) = column.as_binary_view_opt() {
*column = Arc::new(BinaryViewArray::from_iter(binary_view.iter()));
}
// Update the column with the new StringViewArray
*column = Arc::new(builder.finish());
}

let options = RecordBatchOptions::new().with_row_count(Some(row_count));
Expand Down
Loading
Loading