This document traces data through the system in detail, showing exactly how components interact.
Let's trace SELECT name, age FROM users WHERE age > 25 AND city = 'NYC' through every layer.
Before diving into details, here's the complete pipeline for this query:
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β SELECT name, age FROM users WHERE age > 25 AND city = 'NYC' β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β PIPELINE STRUCTURE (3 steps, filter columns first, then projection columns) β
β β
β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ β
β β STEP 0 β β STEP 1 β β STEP 2 β β
β β (ROOT) β β β β β β
β βββββββββββββββββββ€ βββββββββββββββββββ€ βββββββββββββββββββ€ β
β β Column: age β β Column: city β β Column: name β β
β β Filter: > 25 β β Filter: = 'NYC' β β Filter: (none) β β
β β β β β β β β
β β Scans pages, β β Loads city for β β Loads name for β β
β β creates batches β β surviving rows β β surviving rows β β
β ββββββββββ¬βββββββββ ββββββββββ¬βββββββββ ββββββββββ¬βββββββββ β
β β β β β
β β ββββββββββββββββ β ββββββββββββββββ β β
β βββββΆβ Channel ββββββββ΄ββββΆβ Channel ββββββββ΄ββββΆ Output β
β ββββββββββββββββ ββββββββββββββββ β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β DATA VOLUME AT EACH STAGE β
β β
β 187,500 rows 31,247 rows 8,234 rows 8,234 rows β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β β β β β β β β β
β β Source β βββββββΆ β age > 25 β βββββββΆ βcity=NYC β ββββββΆ β Output β β
β β β filter β β filter β β add β β β
β ββββββββββββ 83% ββββββββββββ 74% ββββββββββββ name ββββββββββββ β
β filtered filtered β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β LATE MATERIALIZATION BENEFIT β
β β
β Without late materialization: Load ALL 4 columns Γ 187,500 rows = 750k column values β
β With late materialization: age: 187k + city: 31k + name: 8k = 226k column values β
β β
β Savings: 70% less data loaded from storage β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
The pipeline processes data in morsels (page groups of ~50k rows). Multiple morsels flow through the pipeline simultaneously across worker threads:
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β TABLE: users (187,500 rows = 4 morsels) β
β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Morsel 0 β β Morsel 1 β β Morsel 2 β β Morsel 3 β β
β β rows 0-50k β β rows 50k-100kβ βrows 100k-150kβ βrows 150k-187kβ β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β EXECUTION TIMELINE (4 workers) β
β β
β Time ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββΆ β
β β
β Worker 0 βββββββββββββ ββββββββββ ββββββββ β
β β M0:S0 β β M0:S1 β βM0:S2 β β
β β (age) β β (city) β β(name)β β
β β
β Worker 1 βββββββββββββ ββββββββββ ββββββββ β
β β M1:S0 β β M1:S1 β βM1:S2 β β
β β (age) β β (city) β β(name)β β
β β
β Worker 2 βββββββββββββ ββββββββββ ββββββββ β
β β M2:S0 β β M2:S1 β βM2:S2 β β
β β (age) β β (city) β β(name)β β
β β
β Worker 3 ββββββββββ ββββββββ ββββββ β
β β M3:S0 β βM3:S1 β βM3:S2β (smaller morsel) β
β β (age) β β(city)β β(name) β
β β
β Legend: M0:S0 = Morsel 0, Step 0 β
β β
β Key observations: β
β β’ Multiple morsels execute concurrently (M0, M1, M2, M3 all in flight) β
β β’ Same morsel's steps execute sequentially (M0:S0 β M0:S1 β M0:S2) β
β β’ Workers grab whatever step is ready (work-stealing) β
β β’ Channels buffer batches between steps β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Workers grab morsels using lock-free atomic compare-and-swap:
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β JOB STATE β
β βββββββββ β
β next_free_slot: AtomicUsize β
β β
β Page descriptors (morsels): β
β ββββββββββ¬βββββββββ¬βββββββββ¬βββββββββ¬βββββββββ¬βββββββββ¬βββββββββ¬βββββββββ β
β β 0 β 1 β 2 β 3 β 4 β 5 β 6 β 7 β β
β β 50k β 50k β 50k β 37.5k β ... β ... β ... β ... β β
β ββββββββββ΄βββββββββ΄βββββββββ΄βββββββββ΄βββββββββ΄βββββββββ΄βββββββββ΄βββββββββ β
β β² β
β β β
β βββ next_free_slot = 0 β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β WORK-STEALING SEQUENCE β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β T=0: next_free_slot = 0 β β
β β β β
β β Worker 0: CAS(0 β 1) β β takes morsel 0 β β
β β Worker 1: CAS(0 β 1) β β retry β β
β β Worker 2: CAS(0 β 1) β β retry β β
β β Worker 3: CAS(0 β 1) β β retry β β
β β β β
β β T=1: next_free_slot = 1 β β
β β β β
β β Worker 1: CAS(1 β 2) β β takes morsel 1 β β
β β Worker 2: CAS(1 β 2) β β retry β β
β β Worker 3: CAS(1 β 2) β β retry β β
β β β β
β β T=2: next_free_slot = 2 β β
β β β β
β β Worker 2: CAS(2 β 3) β β takes morsel 2 β β
β β Worker 3: CAS(2 β 3) β β CAS(3 β 4) β β takes morsel 3 β β
β β β β
β β T=3: next_free_slot = 4 β β
β β β β
β β All workers now processing their morsels... β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β fn claim_next_slot(&self) -> Option<usize> { β β
β β loop { β β
β β let current = self.next_free_slot.load(Ordering::Acquire); β β
β β if current >= self.total_morsels { β β
β β return None; // All work claimed β β
β β } β β
β β if self.next_free_slot β β
β β .compare_exchange(current, current + 1, ...) β β
β β .is_ok() β β
β β { β β
β β return Some(current); // Successfully claimed β β
β β } β β
β β // CAS failed, another worker got it, retry β β
β β } β β
β β } β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β Benefits: β
β β’ Zero lock contention (atomic operations only) β
β β’ Fast workers automatically get more work β
β β’ No coordinator bottleneck β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Channels decouple steps and buffer batches when producers outpace consumers:
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β CHANNEL BUFFERING BETWEEN STEPS β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β Step 0 (fast) Channel (unbounded) Step 1 (slow) β β
β β βββββββββββββ ββββββββββββββββββ βββββββββββββ β β
β β β β
β β Worker produces ββββββ¬βββββ¬βββββ¬βββββ¬βββββ Worker consumes β β
β β batch every 2ms β B4 β B3 β B2 β B1 β B0 ββββββββΆ batch every 5ms β β
β β β ββββββ΄βββββ΄βββββ΄βββββ΄βββββ β β β
β β β buffered batches β β β
β β βΌ βΌ β β
β β βββββββββββ βββββββββββ β β
β β β Morsel 0β β Process β β β
β β β age>25 β ββββββββββββββββββββββββββββββββββββββββββββΆ βcity=NYC β β β
β β β filter β send() never blocks β filter β β β
β β βββββββββββ (unbounded channel) βββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β MULTI-PRODUCER TO SINGLE STEP β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β Multiple workers push to same channel: β β
β β β β
β β Worker 0 (M0) ββββ β β
β β β β β
β β Worker 1 (M1) ββββΌβββββΆ Channel 0 βββββΆ Any available worker runs Step 1 β β
β β β [B0][B1][B2] β β
β β Worker 2 (M2) ββββ€ β β
β β β β β
β β Worker 3 (M3) ββββ β β
β β β β
β β Channel guarantees: β β
β β β’ FIFO order per producer (M0's batches stay in order) β β
β β β’ No ordering between producers (M1's batch may arrive before M0's) β β
β β β’ Thread-safe (crossbeam mpmc channels) β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β TERMINATION PROTOCOL β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β How does Step 1 know all morsels from Step 0 are done? β β
β β β β
β β Step 0 (root): β β
β β for each morsel: β β
β β batch = process(morsel) β β
β β channel.send(batch) β β
β β β β
β β // After ALL morsels processed: β β
β β channel.send(ColumnarBatch { num_rows: 0 }) βββ TERMINATION SIGNAL β β
β β β β
β β Step 1 (non-root): β β
β β loop { β β
β β batch = channel.recv() β β
β β if batch.num_rows == 0 { β β
β β // Propagate termination downstream β β
β β next_channel.send(ColumnarBatch { num_rows: 0 }) β β
β β break β β
β β } β β
β β process(batch) β β
β β } β β
β β β β
β β Empty batch (num_rows: 0) cascades through pipeline to signal completion β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β INPUT: "SELECT name, age FROM users WHERE age > 25 AND city = 'NYC'" β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β STEP 1: Parser (sql/parser.rs) β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β sqlparser::parse_sql(sql_string) β
β β β
β βΌ β
β Statement::Query(Query { β
β body: Select { β
β projection: [ β
β SelectItem::UnnamedExpr(Expr::Identifier("name")), β
β SelectItem::UnnamedExpr(Expr::Identifier("age")), β
β ], β
β from: [TableWithJoins { β
β relation: TableFactor::Table { name: "users" } β
β }], β
β selection: Some(Expr::BinaryOp { β
β left: Expr::BinaryOp { β
β left: Expr::Identifier("age"), β
β op: Gt, β
β right: Expr::Value(Number("25")) β
β }, β
β op: And, β
β right: Expr::BinaryOp { β
β left: Expr::Identifier("city"), β
β op: Eq, β
β right: Expr::Value(SingleQuotedString("NYC")) β
β } β
β }) β
β } β
β }) β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β STEP 2: Planner (sql/planner.rs) β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β The planner does several key things: β
β β
β a) SCHEMA LOOKUP β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β page_directory.get_table_catalog("users") β β
β β β β
β β Returns: TableCatalog { β β
β β columns: [ β β
β β { name: "id", ordinal: 0, type: Int64 }, β β
β β { name: "name", ordinal: 1, type: String }, β β
β β { name: "age", ordinal: 2, type: Int64 }, β β
β β { name: "city", ordinal: 3, type: String }, β β
β β ] β β
β β } β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β b) PREDICATE ANALYSIS β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Extract which columns have filters: β β
β β β β
β β age β has filter (> 25) β β
β β city β has filter (= 'NYC') β β
β β name β no filter (projection only) β β
β β β β
β β Order columns: filters first (selectivity), then projections β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β c) BUILD QUERY PLAN β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β QueryPlan { β β
β β plan_type: Select, β β
β β table: "users", β β
β β columns: [ β β
β β ColumnSpec { β β
β β name: "age", β β
β β ordinal: 2, β β
β β filters: [FilterExpr::Leaf { β β
β β column_ordinal: 2, β β
β β op: GreaterThan, β β
β β value: Value::Int64(25) β β
β β }], β β
β β }, β β
β β ColumnSpec { β β
β β name: "city", β β
β β ordinal: 3, β β
β β filters: [FilterExpr::Leaf { β β
β β column_ordinal: 3, β β
β β op: Equal, β β
β β value: Value::String("NYC") β β
β β }], β β
β β }, β β
β β ColumnSpec { β β
β β name: "name", β β
β β ordinal: 1, β β
β β filters: [], // no filter, just projection β β
β β }, β β
β β ], β β
β β projections: ["name", "age"], β β
β β } β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β STEP 3: Pipeline Builder (pipeline/builder.rs) β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β build_pipeline(query_plan, page_handler) β Job β
β β
β a) CREATE CHANNELS β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β (entry_tx, entry_rx) = unbounded::<ColumnarBatch>() // input to root β β
β β (ch0_tx, ch0_rx) = unbounded::<ColumnarBatch>() // step 0 β step 1 β β
β β (ch1_tx, ch1_rx) = unbounded::<ColumnarBatch>() // step 1 β step 2 β β
β β (ch2_tx, ch2_rx) = unbounded::<ColumnarBatch>() // step 2 β output β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β b) WIRE STEPS β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β Step 0 (age, root): β β
β β previous_receiver: entry_rx (unused for root) β β
β β current_producer: ch0_tx β β
β β is_root: true β β
β β filters: [age > 25] β β
β β β β
β β Step 1 (city): β β
β β previous_receiver: ch0_rx βββ receives from step 0 β β
β β current_producer: ch1_tx β β
β β is_root: false β β
β β filters: [city = 'NYC'] β β
β β β β
β β Step 2 (name): β β
β β previous_receiver: ch1_rx βββ receives from step 1 β β
β β current_producer: ch2_tx β β
β β is_root: false β β
β β filters: [] (none) β β
β β β β
β β Output receiver: ch2_rx β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β c) RESULTING JOB STRUCTURE β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β Job β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β β
β β β steps: [Step 0, Step 1, Step 2] β β β
β β β next_free_slot: AtomicUsize(0) // for work-stealing β β β
β β β cost: 3 // number of steps β β β
β β β output_receiver: ch2_rx β β β
β β β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββ βββββββββββββ βββββββββββββ β β
β β β Step 0 β β Step 1 β β Step 2 β β β
β β β (age) ββββββββΆβ (city) ββββββββΆβ (name) ββββββββΆ output β β
β β β is_root β ch0 β β ch1 β β ch2 β β
β β βββββββββββββ βββββββββββββ βββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β EXECUTOR SUBMITS JOB β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β executor.submit(job) β
β β β
β βββ Add job to job_board (SkipSet, ordered by cost) β
β β β
β βββ Send job reference to main worker channel β
β β
β Workers compete to execute steps: β
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β ββ
β β Main Worker 1 Main Worker 2 Main Worker 3 ββ
β β β β β ββ
β β β recv(job) β β ββ
β β β β β ββ
β β βΌ β β ββ
β β job.get_next() β β ββ
β β β β β ββ
β β β CAS(0β1) β β β ββ
β β β got slot 0 β β ββ
β β β β β ββ
β β βΌ βΌ β ββ
β β steps[0].execute() job.get_next() β ββ
β β (running...) β β ββ
β β β β CAS(1β2) β β ββ
β β β β got slot 1 β ββ
β β β β β ββ
β β β βΌ βΌ ββ
β β β steps[1].execute() job.get_next() ββ
β β β (blocked on ch0...) β ββ
β β β β β CAS(2β3) β ββ
β β β β β got slot 2 ββ
β β β β β ββ
β β β β βΌ ββ
β β β β steps[2].execute() ββ
β β β β (blocked on ch1...) ββ
β β β β β ββ
β β ββ
β β All 3 steps executing in parallel on different workers! ββ
β β ββ
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β STEP 0: execute_root() - Scans "age" column β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β 1. GET PAGE CHAIN FROM METADATA β β
β β β β
β β page_handler.page_directory.get_column_chain("users", "age") β β
β β β β
β β Returns: ColumnChain { β β
β β pages: [ β β
β β PageDescriptor { id: "users.age.0", offset: 0, entry_count: 50000 }, β β
β β PageDescriptor { id: "users.age.1", offset: 65536, entry_count: 50000 }, β β
β β PageDescriptor { id: "users.age.2", offset: 131072, entry_count: 50000 }, β β
β β PageDescriptor { id: "users.age.3", offset: 196608, entry_count: 37500 }, β β
β β ] β β
β β } β β
β β β β
β β Total rows: 187,500 β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β 2. PROCESS EACH PAGE (MORSEL) β β
β β β β
β β for descriptor in column_chain.pages { β β
β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β MORSEL 0: PageDescriptor { id: "users.age.0", ... } β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β 2a. FETCH PAGE VIA PAGE HANDLER β β β
β β β β β β
β β β page = page_handler.get_page("users.age.0") β β β
β β β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β PageHandler.get_page(page_id): β β β β
β β β β β β β β
β β β β // Check uncompressed cache first β β β β
β β β β if let Some(page) = uncompressed_cache.get(page_id) { β β β β
β β β β return page; // HOT PATH - already in memory β β β β
β β β β } β β β β
β β β β β β β β
β β β β // Check compressed cache β β β β
β β β β if let Some(compressed) = compressed_cache.get(page_id) { β β β β
β β β β let page = compressor.decompress(compressed); β β β β
β β β β uncompressed_cache.put(page_id, page.clone()); β β β β
β β β β return page; // WARM PATH - decompress from memory β β β β
β β β β } β β β β
β β β β β β β β
β β β β // Fetch from disk β β β β
β β β β let descriptor = page_locator.locate(page_id); β β β β
β β β β let bytes = page_io.read( β β β β
β β β β descriptor.disk_path, β β β β
β β β β descriptor.offset, β β β β
β β β β descriptor.actual_len β β β β
β β β β ); β β β β
β β β β compressed_cache.put(page_id, bytes.clone()); β β β β
β β β β let page = compressor.decompress(bytes); β β β β
β β β β uncompressed_cache.put(page_id, page.clone()); β β β β
β β β β return page; // COLD PATH - read from disk β β β β
β β β β β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β β Returns: ColumnarPage { β β β
β β β data: ColumnData::Int64(vec![23, 45, 19, 67, 31, 28, ...]), β β β
β β β null_bitmap: Bitmap { bits: [0xFFFFFFFF...], len: 50000 }, β β β
β β β num_rows: 50000, β β β
β β β } β β β
β β β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β 2b. CREATE COLUMNAR BATCH β β β
β β β β β β
β β β let mut batch = ColumnarBatch::new(); β β β
β β β batch.num_rows = 50000; β β β
β β β batch.columns.insert(2, page); // ordinal 2 = "age" β β β
β β β batch.row_ids = (0..50000).collect(); // [0, 1, 2, ..., 49999] β β β
β β β β β β
β β β Batch state: β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β columns: { 2: ColumnarPage(age data) } β β β β
β β β β row_ids: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ...] β β β β
β β β β num_rows: 50000 β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β 2c. EVALUATE FILTER: age > 25 β β β
β β β β β β
β β β bitmap = evaluate_filter_expr_with_batch(&filter, &batch) β β β
β β β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β Physical evaluation (physical_evaluator.rs): β β β β
β β β β β β β β
β β β β let age_col = batch.columns.get(&2).unwrap(); β β β β
β β β β let int_data = age_col.data.as_int64(); β β β β
β β β β β β β β
β β β β let mut bitmap = Bitmap::new(batch.num_rows); β β β β
β β β β β β β β
β β β β for (idx, &value) in int_data.iter().enumerate() { β β β β
β β β β if value > 25 { β β β β
β β β β bitmap.set(idx, true); β β β β
β β β β } β β β β
β β β β } β β β β
β β β β β β β β
β β β β // Vectorized: actually processes 64 values per loop β β β β
β β β β // with bitwise operations β β β β
β β β β β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β β Result bitmap (50000 bits packed into 782 u64s): β β β
β β β β β β
β β β ages: [23, 45, 19, 67, 31, 28, 15, 52, 42, 18, ...] β β β
β β β bitmap: [ 0, 1, 0, 1, 1, 1, 0, 1, 1, 0, ...] β β β
β β β β β β β β β β β β β β β β
β β β 23 45 19 67 31 28 15 52 42 18 β β β
β β β β€25 >25 β€25 >25 >25 >25 β€25 >25 >25 β€25 β β β
β β β β β β
β β β bitmap.count_ones() = 31,247 (out of 50,000) β β β
β β β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β 2d. FILTER BATCH BY BITMAP β β β
β β β β β β
β β β filtered = batch.filter_by_bitmap(&bitmap) β β β
β β β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β Gather operation: β β β β
β β β β β β β β
β β β β let mut new_batch = ColumnarBatch::new(); β β β β
β β β β let mut new_row_ids = Vec::new(); β β β β
β β β β let mut new_int_data = Vec::new(); β β β β
β β β β β β β β
β β β β for idx in bitmap.iter_ones() { β β β β
β β β β new_row_ids.push(batch.row_ids[idx]); β β β β
β β β β new_int_data.push(batch.columns[&2].int_data[idx]); β β β β
β β β β } β β β β
β β β β β β β β
β β β β new_batch.row_ids = new_row_ids; β β β β
β β β β new_batch.columns.insert(2, ColumnarPage::from(new_int_data));β β β β
β β β β new_batch.num_rows = bitmap.count_ones(); β β β β
β β β β β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β β Filtered batch state: β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β columns: { 2: ColumnarPage([45, 67, 31, 28, 52, 42, ...]) } β β β β
β β β β row_ids: [1, 3, 4, 5, 7, 8, ...] // original positions β β β β
β β β β num_rows: 31247 β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β 2e. PUSH TO DOWNSTREAM CHANNEL β β β
β β β β β β
β β β current_producer.send(filtered) β β β
β β β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β β β
β β β β Step 0 Channel 0 β β β β
β β β β β β β β β β
β β β β β ColumnarBatch β β β β β
β β β β β { β β β β β
β β β β β columns: {2: [45,67,31,...]} β β β β β
β β β β β row_ids: [1,3,4,5,7,8,...] β β β β β
β β β β β num_rows: 31247 β β β β β
β β β β β } β β β β β
β β β β β β β β β β
β β β β ββββββββββββββββββββββββββββββββββΆβ β β β β
β β β β β β β β β
β β β β β Step 1 waiting β β β β
β β β β βββββββon recv() β β β β
β β β β β β β β β
β β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β β
β β β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β [Repeat for morsels 1, 2, 3...] β β
β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β AFTER ALL MORSELS: Send termination signal β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β current_producer.send(ColumnarBatch { num_rows: 0, ... }) β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β STEP 1: execute_non_root() - Filters on "city" column β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β while let Ok(batch) = previous_receiver.recv() { β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β RECEIVED BATCH FROM STEP 0: β β
β β β β
β β columns: { 2: ColumnarPage(age data) } // only "age" column present β β
β β row_ids: [1, 3, 4, 5, 7, 8, 12, 15, ...] // survivors from age filter β β
β β num_rows: 31247 β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β 1. MATERIALIZE "city" COLUMN FOR THESE ROWS β β
β β β β
β β This step needs column 3 (city), but batch only has column 2 (age). β β
β β Must load city data for ONLY the row_ids in this batch. β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Late materialization process: β β β
β β β β β β
β β β // Group row_ids by page group for efficient I/O β β β
β β β let rows_per_page = 50000; β β β
β β β β β β
β β β row_ids: [1, 3, 4, 5, 7, 8, 12, 15, ...] β β β
β β β ββββββββββββββββββββββββββββ β β β
β β β all in page group 0 β β β
β β β (row_id / 50000 = 0) β β β
β β β β β β
β β β // Fetch page for city column β β β
β β β city_page = page_handler.get_page("users.city.0") β β β
β β β β β β
β β β // Extract only the needed values β β β
β β β let mut city_data = BytesColumn::new(); β β β
β β β for &row_id in &batch.row_ids { β β β
β β β let offset = row_id % rows_per_page; β β β
β β β let city = city_page.data.get_string(offset); β β β
β β β city_data.push(city); β β β
β β β } β β β
β β β β β β
β β β batch.columns.insert(3, ColumnarPage::from_strings(city_data)); β β β
β β β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β Batch state after materialization: β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β columns: { β β β
β β β 2: ColumnarPage(ages: [45, 67, 31, 28, 52, 42, ...]), β β β
β β β 3: ColumnarPage(cities: ["NYC", "LA", "NYC", "CHI", "NYC", ...]), β β β
β β β } β β β
β β β row_ids: [1, 3, 4, 5, 7, 8, ...] β β β
β β β num_rows: 31247 β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β 2. EVALUATE FILTER: city = 'NYC' β β
β β β β
β β bitmap = evaluate_filter_expr_with_batch(&filter, &batch) β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β String comparison: β β β
β β β β β β
β β β cities: ["NYC", "LA", "NYC", "CHI", "NYC", "SF", ...] β β β
β β β bitmap: [ 1, 0, 1, 0, 1, 0, ...] β β β
β β β β β β
β β β bitmap.count_ones() = 8,234 (out of 31,247) β β β
β β β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β 3. FILTER BATCH AND PUSH DOWNSTREAM β β
β β β β
β β filtered = batch.filter_by_bitmap(&bitmap) β β
β β current_producer.send(filtered) β β
β β β β
β β Pushed to Step 2: β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β columns: { β β β
β β β 2: ColumnarPage(ages: [45, 31, 52, ...]), // only NYC rows β β β
β β β 3: ColumnarPage(cities: ["NYC", "NYC", "NYC", ...]), β β β
β β β } β β β
β β β row_ids: [1, 4, 7, ...] // original row positions β β β
β β β num_rows: 8234 β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β STEP 2: execute_non_root() - Materializes "name" column (final projection) β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β 1. RECEIVE BATCH FROM STEP 1 β β
β β β β
β β columns: { 2: ages, 3: cities } // has age and city β β
β β row_ids: [1, 4, 7, 23, 45, ...] // survivors of both filters β β
β β num_rows: 8234 β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β 2. MATERIALIZE "name" COLUMN (ordinal 1) β β
β β β β
β β // No filter on this step, just add the column β β
β β name_page = page_handler.get_page("users.name.0") β β
β β β β
β β for &row_id in &batch.row_ids { β β
β β let offset = row_id % rows_per_page; β β
β β let name = name_page.data.get_string(offset); β β
β β name_data.push(name); β β
β β } β β
β β β β
β β batch.columns.insert(1, ColumnarPage::from_strings(name_data)); β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β 3. NO FILTER - PASS THROUGH TO OUTPUT β β
β β β β
β β // Step 2 has no filters, so bitmap is all 1s β β
β β // Just push the enriched batch to output β β
β β β β
β β current_producer.send(batch) β β
β β β β
β β Final batch: β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β columns: { β β β
β β β 1: ColumnarPage(names: ["Alice", "Bob", "Carol", ...]), β β β
β β β 2: ColumnarPage(ages: [45, 31, 52, ...]), β β β
β β β 3: ColumnarPage(cities: ["NYC", "NYC", "NYC", ...]), β β β
β β β } β β β
β β β row_ids: [1, 4, 7, ...] β β β
β β β num_rows: 8234 β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β OUTPUT RECEIVER COLLECTS RESULTS β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β let mut results = Vec::new(); β
β β
β while let Ok(batch) = output_receiver.recv() { β
β if batch.num_rows == 0 { β
β break; // termination signal β
β } β
β results.push(batch); β
β } β
β β
β // Apply final projection: SELECT name, age β
β let projected = results.iter().map(|batch| { β
β ProjectedBatch { β
β name: batch.columns[&1], // ordinal 1 β
β age: batch.columns[&2], // ordinal 2 β
β } β
β }); β
β β
β FINAL RESULT: β
β ββββββββββββββββββββββββββββββββββββββββββ β
β β name β age β β
β ββββββββββββββββββββββββββββββββββββββββββ€ β
β β Alice β 45 β β
β β Bob β 31 β β
β β Carol β 52 β β
β β Dave β 38 β β
β β ... β ... β β
β β (8234 total rows) β β
β ββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β SQL Query β
β β β
β βΌ β
β βββββββββββ ββββββββββββ βββββββββββββββββ β
β β Parser ββββββΆβ Planner ββββββΆβPipeline Builderβ β
β βββββββββββ ββββββββββββ βββββββββ¬ββββββββ β
β β β
β βΌ β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β PIPELINE EXECUTION β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Worker Threads β β β
β β β β β β
β β β Worker 1 Worker 2 Worker 3 β β β
β β β β β β β β β
β β β βΌ βΌ βΌ β β β
β β β Step 0 Step 1 Step 2 β β β
β β β (root) β β β
β β β β β β β β β
β β β β ColumnarBatch β ColumnarBatch β ColumnarBatch β β β
β β β β (morsel 0) β (morsel 0) β (morsel 0) β β β
β β β βΌ βΌ βΌ β β β
β β β βββββββββββ βββββββββββ βββββββββββ β β β
β β β Channel 0 Channel 1 Channel 2 β β β
β β β βββββββββββ βββββββββββ βββββββββββ β β β
β β β β β β β
β β β βΌ β β β
β β β Output Receiver β β β
β β β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β PAGE HANDLER β β β
β β β β β β
β β β get_page(id) βββΆ [Uncompressed Cache] βββΆ [Compressed Cache] βββΆ [Disk] β β β
β β β β β β β β β
β β β β β β β β β
β β β HIT: O(1) HIT: decompress MISS: I/Oβ β β
β β β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β Query Results β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
| Property | How It Works |
|---|---|
| Push-based | Each step pushes batches downstream via channel.send(), no pulling |
| Morsel-driven | Each page group (50k rows) becomes one morsel/batch flowing through |
| Late materialization | Columns loaded only when needed by a step (not all columns upfront) |
| Parallel steps | Workers execute different steps simultaneously via atomic CAS |
| Vectorized filtering | Bitmap operations process 64 rows per CPU instruction |
| Pipeline parallelism | Multiple morsels in-flight across steps at the same time |
| Termination | Empty batch (num_rows=0) propagates end-of-data through all steps |