Skip to content

[POC] route Engine implementation through a PlanExecutor#2534

Draft
kyli87 wants to merge 3 commits into
delta-io:mainfrom
kyli87:kyli/plan-migration-v0
Draft

[POC] route Engine implementation through a PlanExecutor#2534
kyli87 wants to merge 3 commits into
delta-io:mainfrom
kyli87:kyli/plan-migration-v0

Conversation

@kyli87
Copy link
Copy Markdown
Collaborator

@kyli87 kyli87 commented May 7, 2026

What changes are proposed in this pull request?

This PR demonstrates how we can introduce a migration shim for slowly moving Engine functionality to declarative plan execution. The high level idea is that:

  1. Introduce a PlanExecutor trait for executing plans
  2. Introduce a PlanBasedEngine which impls Engine by wrapping a PlanExecutor + DefaultEngine handlers.
  3. For each engine handler API, either:
    a. Construct a plan and call into the plan executor. Then map the result back to the expected type
    b. Simply defer to the default engine implementation.

This PR also demonstrates a NaivePlanExecutor which uses the default storage handler to implement I/O. So the class structure becomes:

  • PlanBasedEngine (impls Engine) -> NaivePlanExecutor (impls PlanExecutor) -> DefaultStorageHandler

The benefits of this are that:

  • we can incrementally move Engine APIs towards plan execution, or even decide to leave them as default engine implementations if UnifiedKernel doesn't need it to be replaced
  • we expose the plan executor for kernel to leverage if it desires (distributed log replay or other data flow intensive usecases)
  • For FFI, we only need to worry about implementing the PlanExecutor interface instead of all of the engine APIs.

How was this change tested?

Basic unit tests - but this is just a POC

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 7, 2026

PR title does not match the required pattern. Please ensure you follow the conventional commits spec.

Your title should start with feat:, fix:, chore:, docs:, perf:, refactor:, test:, or ci:, and if it's a breaking change that should be suffixed with a ! (like feat!:), and then a 1-72 character brief description of your change.

Title: [POC] route Engine implementation through a PlanExecutor

@codecov
Copy link
Copy Markdown

codecov Bot commented May 7, 2026

Codecov Report

❌ Patch coverage is 76.92308% with 81 lines in your changes missing coverage. Please review.
✅ Project coverage is 88.49%. Comparing base (890b5d4) to head (3cabf70).
⚠️ Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
kernel/src/engine/plan/engine.rs 0.00% 36 Missing ⚠️
kernel/src/engine/plan/storage.rs 86.58% 20 Missing and 11 partials ⚠️
kernel/src/engine/plan/naive.rs 91.04% 0 Missing and 6 partials ⚠️
kernel/src/plan/result.rs 0.00% 6 Missing ⚠️
kernel/src/lib.rs 0.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2534      +/-   ##
==========================================
- Coverage   88.56%   88.49%   -0.07%     
==========================================
  Files         179      184       +5     
  Lines       59766    60132     +366     
  Branches    59766    60132     +366     
==========================================
+ Hits        52930    53214     +284     
- Misses       4818     4882      +64     
- Partials     2018     2036      +18     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.


/// Convert a slice of [`FileMeta`] into a single [`ArrowEngineData`] batch matching
/// [`FILE_META_SCHEMA`].
fn file_metas_to_engine_data(metas: &[FileMeta]) -> DeltaResult<Box<dyn EngineData>> {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an inefficiency here of having to go from file meta -> engine data -> file meta for the sake of preserving the existing StorageHandler interface. Do we have better ideas?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That does seem really annoying. Even if we're not worried about the performance impact, the round trip increases code complexity and expands the bug surface.

On the other hand, every new type we're able to return increases FFI complexity quite a bit, because an extern source of FileMeta would have to represent the stream in some portable way, which likely leads to a per-call overhead. Tho EngineData just shifts that overhead to the row visitor that eventually extracts the FileMeta, which could be even worse.

If we have a relatively small set of things we'd ever need to return, we could consider adding a plan result variant for each unique Box<dyn Iterator<Item=Foo>>? Basically generalizing the PlanResult::ByteStream case. That would give a modicum of compile-time safety, tho each plan node type would have to document clearly what result variant(s) it can return, and consumers of results would still have to code against the possibility of getting the wrong variant.

If we worry there would be too many enum variants, we could also imagine some kind of generic PlanResult::Any(Box<dyn Any>) that can then be downcast to a specific Box<T> for plundering. One T of particular interest would be Box<dyn Iterator<Item=Foo>> (outer box for plundering, inner box for holding a dyn iterator). Allows a result to be almost anything, which is both a plus and a minus. Documentation becomes even more critical than before, because there's no longer a closed enum to give compile-time hints of what to expect.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notes from our discussion, plan result could be something like:

PlanResult {
  Scalar(Scalar)
  Bounded(EngineData)
  Unbounded(iter of EngineData)
  FileMeta
  FileMetaIter
  Bytes
  BytesIter
}

strongly typed results principles:

  • main benefit is that if engine produces the type naturally and kernel wants that same type, then it makes sense to add some concrete type to the PlanResult (instead of arrow)
  • if engine would have produced some other type and then have to transform it anyway, might as well just put it into arrow

Other notes:

  • for multi-node plans, we may want file listing to return generic relation data. we can introduce typed nodes for this in the future
  • how to optimize case where kernel "reads some data as engine data then immediately wants to eval an expression":
    • option 1: rewrite kernel code to use proper plans
    • option 2: implement custom engine data type (complexity around how to implement visitors on ths engine data)

}

#[allow(clippy::panic)]
fn copy_atomic(&self, _src: &Url, _dest: &Url) -> DeltaResult<()> {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only used by the UC committer - wasn't sure if we wanted to add a dedicated plan node just for a copy op. So left it unimplemented for now.


fn execute_read_bytes(&self, files: Vec<FileSlice>) -> DeltaResult<PlanResult> {
let iter = self.storage.read_files(files)?;
Ok(PlanResult::ByteStream(iter))
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I chose to introduce a separate Result variant for Bytes instead of having to map bytes to EngineData and then back again. We should more generally discuss how we want to handle generic return types from plan execution. Should it always be EngineData?

Comment thread kernel/src/plan/mod.rs
/// Returns [`PlanResult::Unit`] on success. If `overwrite` is false and the file already
/// exists, the executor must return
/// [`Error::FileAlreadyExists`](crate::Error::FileAlreadyExists).
WriteBytes {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I "think" write bytes is only used for writing CRC files - could we have instead just used JSON handler instead and avoid implementing a Write node in the plan algebra?

Copy link
Copy Markdown
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flushing comments


/// Convert a slice of [`FileMeta`] into a single [`ArrowEngineData`] batch matching
/// [`FILE_META_SCHEMA`].
fn file_metas_to_engine_data(metas: &[FileMeta]) -> DeltaResult<Box<dyn EngineData>> {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That does seem really annoying. Even if we're not worried about the performance impact, the round trip increases code complexity and expands the bug surface.

On the other hand, every new type we're able to return increases FFI complexity quite a bit, because an extern source of FileMeta would have to represent the stream in some portable way, which likely leads to a per-call overhead. Tho EngineData just shifts that overhead to the row visitor that eventually extracts the FileMeta, which could be even worse.

If we have a relatively small set of things we'd ever need to return, we could consider adding a plan result variant for each unique Box<dyn Iterator<Item=Foo>>? Basically generalizing the PlanResult::ByteStream case. That would give a modicum of compile-time safety, tho each plan node type would have to document clearly what result variant(s) it can return, and consumers of results would still have to code against the possibility of getting the wrong variant.

If we worry there would be too many enum variants, we could also imagine some kind of generic PlanResult::Any(Box<dyn Any>) that can then be downcast to a specific Box<T> for plundering. One T of particular interest would be Box<dyn Iterator<Item=Foo>> (outer box for plundering, inner box for holding a dyn iterator). Allows a result to be almost anything, which is both a plus and a minus. Documentation becomes even more critical than before, because there's no longer a closed enum to give compile-time hints of what to expect.

i64::try_from(m.size)
.ok()
.map(Some)
.unwrap_or(Some(i64::MAX))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we default to 0 instead of MAX? Or even just None, given that this is a nullable column?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW: Int64Array has both From and FromIterator for both i64 and Option.
AFAICT the current code has an unnecessary Option layer (None never appears).

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: all three columns are non-nullable. So we need to decide about 0 vs. MAX and can definitely skip the spurious Option stuff.

PlanResult::Data(iter) => {
let meta_iter = iter.flat_map(|batch_result| match batch_result {
Ok(batch) => match engine_data_to_file_metas(batch.as_ref()) {
Ok(metas) => metas.into_iter().map(Ok).collect::<Vec<_>>().into_iter(),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to collect the iterator as vec, only to immediately turn it back to an iterator?
(seems unnecessarily expensive, is there a better way to deal with whatever type mismatch?)

};
let result = self.executor.execute_plan(plan)?;
match result {
PlanResult::Unit => Ok(()),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting... no PlanResult::Err case, because we return DeltaResult<PlanResult> in order to leverage ??

aside: If the whole rust Try v2 thing ever stabilizes we might be able to make PlanResult ? aware and use it directly. Or maybe not, because what type would ? unwrap as when we don't know which of the other N-1 enum variants we might be dealing with?

Comment on lines +103 to +110
let batch = iter
.next()
.ok_or_else(|| Error::generic("HeadFile returned no data batches"))??;
let metas = engine_data_to_file_metas(batch.as_ref())?;
metas
.into_iter()
.next()
.ok_or_else(|| Error::generic("HeadFile returned an empty batch with no rows"))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another case where EngineData as carrier makes a poor UX...

Comment thread kernel/src/plan/result.rs
Comment on lines +17 to +19
Data(Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>>>),
/// A stream of raw byte buffers, one per file or file slice.
ByteStream(Box<dyn Iterator<Item = DeltaResult<Bytes>>>),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside: Should we consider capturing a type alias?

type DeltaResultIterator<T> = Box<dyn Iterator<Item = DeltaResult<T>>>;

I believe the pattern already showed up quite a bit in kernel and now we have two more here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants