feat(parquet): fuse level encoding passes and compact level representation#9653
feat(parquet): fuse level encoding passes and compact level representation#9653HippoBaro wants to merge 6 commits intoapache:mainfrom
Conversation
335fb81 to
44dae05
Compare
|
This is a continuation of the work done in #9447 to improve runtime performance around sparse and/or highly uniform columns. As such this may be of interest to @alamb and @etseidl. 5a1d3d7 adds three benchmarks that exercise the code path this series optimizes. I created a PR (#9654) to merge those separately if needed so the benchmark bot can have a baseline to compare against. Thanks! |
44dae05 to
7902e69
Compare
parquet/src/encodings/rle.rs
Outdated
| /// to add more repetitions without per-element overhead. | ||
| #[inline] | ||
| pub fn is_accumulating(&self, value: u64) -> bool { | ||
| self.repeat_count > 8 && self.current_value == value |
There was a problem hiding this comment.
should this be '>= 8'?
Also, given the discussion in #7739, I think it's time to at least replace the magic 8 with a constant.
There was a problem hiding this comment.
should this be '>= 8'?
The RLE encoder transitions to accumulation mode after the 8th value has been buffered and flush_buffered_values() has committed the RLE decision.
Also, given the discussion in #7739, I think it's time to at least replace the magic 8 with a constant.
I agree! Happy to add that at the end of this series.
There was a problem hiding this comment.
The RLE encoder transitions to accumulation mode after the 8th value has been buffered and
flush_buffered_values()has committed the RLE decision.
Here's my understanding: a repeated value is added wit put. The repeat_count is incremented, and it reaches 8. This does not trigger the return branch, and continues on. num_buffered_values is currently 7, the value is added to the buffered_values array, and num_buffered_values is incremented to 8. This triggers flush_buffered_values(). flush_buffered_values() sees that repeat_count is 8, so it simply sets num_buffered_values to 0 and potentially ends a previous bit-packed run by writing the run length indicator, and returns. We then return from put with repeat_count still 8, num_buffered_values = 0, and we're now in accumulating mode. If is_accumulating() is called after a this put() (which seems to always be the case), I think `>= 8' is correct.
There was a problem hiding this comment.
Thank you for pushing back. You are absolutely right. > was correct but spurious. >= is exactly right. It's fixed in the latest version, and I added a unit test for good measure.
There was a problem hiding this comment.
Also, c891c35 introduces a new constant BIT_PACK_GROUP_SIZE as requested, and also swaps the leftover literals which referred to the count of bits in a byte with u8::BITS from the std.
|
Thanks @HippoBaro, this looks impressive. I'm still looking, but haven't found any obvious problems yet. Gads, every time I delve this deep into parquet I go a little mad 😵💫. I think the RLE encoder could use a little refactoring/comment improvements to make the flow a little more obvious. Not as part of this PR though. |
etseidl
left a comment
There was a problem hiding this comment.
Flushing a few comments. More tomorrow.
| let iter = std::iter::repeat_n(info.max_def_level, len); | ||
| def_levels.extend(iter); |
There was a problem hiding this comment.
Could this case (which I think is nullable but no nulls) also make use of the uniform levels?
parquet/src/column/writer/mod.rs
Outdated
| let mut values_to_write = 0usize; | ||
| let max_def = self.descr.max_def_level(); | ||
| self.def_levels_encoder | ||
| .put_with_observer(levels, |level, count| { |
There was a problem hiding this comment.
❤️ When I added the histograms I wasn't happy with the redundancy here. Nice fix!
| /// Bulk-emit `count` uniform null def/rep levels. If the level Vecs are | ||
| /// still empty, stores a compact `uniform_levels` tuple instead of | ||
| /// materializing the Vecs. Otherwise falls back to extending them. | ||
| fn extend_uniform_null_levels(&mut self, def_val: i16, rep_val: i16, count: usize) { |
There was a problem hiding this comment.
I'll preface with the admission I'm not all that familiar with this part of the code. So if this is called after some levels have been added to the vecs, it will extend the vecs. What happens in the reverse case, after setting uniform_levels an attempt is made to extend the level vecs?
There was a problem hiding this comment.
Yep that was a footgun! The latest patch introduces a better state machine where transitions from uniform to dense is explicit: appending a run with a different value to a Uniform automatically now materializes it into a vec first.
| /// When set, all def/rep levels are a single repeated value and the | ||
| /// Vec fields above are empty. Tuple: (def_value, rep_value, count). | ||
| /// This avoids materializing large Vecs for entirely-null columns. | ||
| uniform_levels: Option<(i16, i16, usize)>, |
There was a problem hiding this comment.
I wonder if the logic around these optionals and extend_uniform_null_levels could be made clearer with an enum. The None case for def_levels/rep_levels also seems similar to a uniform value of 0. So maybe it could look something like
enum LevelData {
Vec(Vec<i16>),
Uniform(i16, usize),
}
struct ArrayLevels {
def_levels: LevelData,
rep_levels: LevelData,
...
}
Add `is_accumulating()` and `extend_run()` methods to `RleEncoder` that allow callers to detect when the encoder is in RLE accumulation mode and bulk-extend runs without per-element overhead. Add `put_with_observer()` to `LevelEncoder` that calls an `FnMut(i16, usize)` observer for each run of identical values during encoding. This allows callers to piggyback counting and histogram updates into the encoding pass without extra iterations over the level buffer. Refactor `put()` to delegate to it with a no-op observer. Previously, `write_mini_batch()` made 3 separate passes over each level array: one to count non-null values or row boundaries, one to update the level histogram, and one to RLE-encode. Now all three operations happen in a single pass via the observer closure. Remove the separate `update_definition_level_histogram()` and `update_repetition_level_histogram()` methods from PageMetrics. Add `LevelHistogram::update_n()` for batch histogram updates. The encoding loop now checks if the encoder entered RLE accumulation mode after a call to `RleEncoder::put()`. When it does, it scans ahead for the rest of the run and batches the observer call with the full run length, enabling O(1) histogram and counting updates per RLE run. Benchmark results (vs baseline): primitive_sparse_99pct_null/default 15.2 ms (was 40.3 ms, −62%) primitive_sparse_99pct_null/parquet_2 16.1 ms (was 43.5 ms, −63%) primitive_sparse_99pct_null/zstd_parquet_2 17.0 ms (was 44.4 ms, −62%) list_primitive_sparse_99pct_null/default 17.4 ms (was 39.9 ms, −56%) list_primitive_sparse_99pct_null/parquet_2 16.7 ms (was 39.9 ms, −58%) list_primitive_sparse_99pct_null/zstd_p2 16.8 ms (was 40.7 ms, −59%) primitive_all_null/default 8.8 ms (was 38.0 ms, −77%) primitive_all_null/parquet_2 8.8 ms (was 36.9 ms, −76%) primitive_all_null/zstd_parquet_2 8.9 ms (was 36.1 ms, −75%) Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
Restructure `write_list()` to accumulate consecutive null and empty rows and flush them in a single `visit_leaves()` call using `extend(repeat_n(...))`, instead of calling `visit_leaves()` per row. With sparse data (99% nulls), a 4096-row batch previously triggered ~4000 individual tree traversals, each pushing a single value per leaf. Now consecutive null/empty runs are collapsed into one traversal that extends all leaf level buffers in bulk. This follows the same pattern already used by `write_struct()`. The `write_non_null_slice` path is unchanged since each non-null row has different offsets and cannot be batched. Benchmark results (vs previous commit): list_primitive_sparse_99pct_null/default 10.5 ms (was 17.4 ms, −40%) list_primitive_sparse_99pct_null/parquet_2 10.5 ms (was 16.7 ms, −37%) list_primitive_sparse_99pct_null/zstd_p2 10.6 ms (was 16.8 ms, −37%) Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
When every element in a list, struct, or fixed-size list array is null, short-circuit level building before the row loop and store a compact `(def_value, rep_value, count)` tuple on `ArrayLevels` instead of materializing `Vec<i16>` buffers. The same fast path applies at the leaf level in `write_levels()` when `logical_nulls` covers every row. On the write side, `ArrowColumnWriter` detects the `uniform_levels` tuple and calls a dedicated `write_uniform_null_batch()` that encodes def/rep levels via `RleEncoder::put_n()` in O(1) amortized time, bypassing the normal mini-batch chunking and per-element iteration. A new `LevelEncoder::put_n_with_observer()` fuses encoding with histogram and counting updates in a single call. `write_uniform_null_batch` chunks at the configured page row count limit to respect page boundaries. Also defers `non_null_indices.reserve()` to branches that actually populate it, avoiding an unnecessary allocation for all-null arrays. Benchmark results (vs previous commit): primitive_all_null/default 192 µs (was 8.8 ms, −97.8%) primitive_all_null/parquet_2 193 µs (was 8.8 ms, −97.8%) primitive_all_null/zstd_parquet_2 250 µs (was 8.9 ms, −97.2%) Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
Replace the ad hoc level and non-null index vectors with `LevelData` and `ValueSelection`, so the writer can represent absent, uniform, dense, and sparse cases directly instead of always materializing the worst-case shape. This keeps the common paths cheap, removes the dedicated uniform null fast path by folding it into the generic semantic writer, and preserves the old all-null throughput by keeping page-sized chunking for uniform batches. Extends the compact Uniform/Dense representations (introduced for all-null columns in the previous commit) to non-null columns, yielding the same allocation, batching, and encoding benefits for the common non-null case. Benchmark results (vs previous commit): primitive_non_null/default 57.8 ms (was 63.4 ms, −9%) primitive_non_null/parquet_2 78.0 ms (was 85.1 ms, −8%) struct_non_null/default 27.3 ms (was 29.9 ms, −9%) struct_non_null/parquet_2 36.1 ms (was 38.2 ms, −6%) struct_non_null/zstd_parquet_2 47.3 ms (was 50.9 ms, −7%) Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
The literal `8` appeared in two distinct roles throughout `RleEncoder`, `RleDecoder`, and their tests. Replacing each with a named constant makes the intent explicit and prevents the two meanings from being confused. `BIT_PACK_GROUP_SIZE = 8` The Parquet RLE/bit-packing hybrid format always bit-packs values in multiples of this count (spec: "we always bit-pack a multiple of 8 values at a time"). Every occurrence related to the staging buffer size, the repeat-count threshold that triggers the RLE decision, and the group-count arithmetic in bit-packed headers now uses this name. `u8::BITS` (= 8, from std) Used wherever a bit-count is divided by 8 to obtain a byte-count (e.g. `ceil(bit_width, u8::BITS as usize)`). This is a bits-per-byte conversion, a fundamentally different concept from the packing-group size. No behaviour change. Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
7902e69 to
c891c35
Compare
|
Thanks for the reviews! I've reworked the branch to address all feedback. Sorry for the delay, it took me a while to experiment. The main structural change is a The enum LevelData {
Absent,
Materialized(Vec<i16>),
Uniform { value: i16, count: usize },
}
The resulting refactor has a larger LoC footprint, but the API is arguably much cleaner and robust. Also, rebased as per #9656 (review) |
|
Thanks @HippoBaro. I'll try to make some time to review the changes. Probably not today but hopefully tomorrow... 🤞 |
|
run benchmark arrow_writer |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing faster_sparse_columns_encoding (c891c35) to aac969d (merge-base) diff File an issue against this benchmark runner |
# Which issue does this PR close? - None, but relates to #9653 # Rationale for this change #9653 introduces optimizations related to non-null uniform workloads. This adds benchmarks so we can quantify them. # What changes are included in this PR? Add three new benchmark cases to the arrow_writer benchmark suite for evaluating write performance on struct columns at varying null densities: * `struct_non_null`: a nullable struct with 0% null rows and non-nullable primitive children; * `struct_sparse_99pct_null`: a nullable struct with 99% null rows, exercising null batching through one level of struct nesting; * `struct_all_null`: a nullable struct with 100% null rows, exercising the uniform-null path through struct nesting. Baseline results (Apple M1 Max): ``` struct_non_null/default 29.9 ms struct_non_null/parquet_2 38.2 ms struct_non_null/zstd_parquet_2 50.9 ms struct_sparse_99pct_null/default 7.2 ms struct_sparse_99pct_null/parquet_2 7.3 ms struct_sparse_99pct_null/zstd_p2 8.1 ms struct_all_null/default 83.3 µs struct_all_null/parquet_2 82.5 µs struct_all_null/zstd_parquet_2 106.6 µs ``` # Are these changes tested? N/A # Are there any user-facing changes? None Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
|
run benchmark arrow_writer |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing faster_sparse_columns_encoding (6c73ac7) to adf9308 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
I am surprised by the few regressions above, such as: I can't reproduce these locally. I get: Are these known to be noisy? |
Yes. They are extremely twitchy. I always take them with a grain of salt or ten. 😅 |
|
I've now run multiple passes of the arrow_writer bench on my workstation and there appear to be no regressions due to this PR. And the speed ups are quite impressive 😄 Details |
|
@kszucs do you have time to look at this PR? It touches on your CDC code. |
Which issue does this PR close?
Rationale for this change
See issue for details. The Parquet column writer currently does per-value work during level encoding regardless of data sparsity, even though the output encoding (RLE) is proportional to the number of runs.
What changes are included in this PR?
Three incremental commits, each building on the previous:
Fuse level encoding with counting and histogram updates.
write_mini_batch()previously made three separate passes over each level array: count non-nulls, update the level histogram, and RLE-encode. Now all three happen in a single pass via an observer callback onLevelEncoder. When the RLE encoder enters accumulation mode, the loop scans ahead for the full run length and batches the observer call. This makes counting and histogram updates O(1) per run.Batch consecutive null/empty rows in
write_list. Consecutive null or empty list entries are now collapsed into a singlevisit_leaves()call that bulk-extends all leaf level buffers, instead of one tree traversal per null row. Mirrors the approach already used bywrite_struct().Short-circuit entirely-null columns. When every element in an array is null, skip
Vec<i16>level-buffer materialization entirely and store a compact(def_value, rep_value, count)tuple. The writer encodes this viaRleEncoder::put_n()in O(1) amortized time, bypassing the normal mini-batch loop.Are these changes tested?
All tests passing. I added some benchmark to exercice the heavy and all-null code paths, alongside the existing 25% sparseness benchmarks:
Non-nullable column benchmarks are within noise, as expected since they have no definition levels to optimize.
Are there any user-facing changes?
None.