[POC] route Engine implementation through a PlanExecutor#2534
Conversation
|
PR title does not match the required pattern. Please ensure you follow the conventional commits spec. Your title should start with Title: |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2534 +/- ##
==========================================
- Coverage 88.56% 88.49% -0.07%
==========================================
Files 179 184 +5
Lines 59766 60132 +366
Branches 59766 60132 +366
==========================================
+ Hits 52930 53214 +284
- Misses 4818 4882 +64
- Partials 2018 2036 +18 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
|
||
| /// Convert a slice of [`FileMeta`] into a single [`ArrowEngineData`] batch matching | ||
| /// [`FILE_META_SCHEMA`]. | ||
| fn file_metas_to_engine_data(metas: &[FileMeta]) -> DeltaResult<Box<dyn EngineData>> { |
There was a problem hiding this comment.
There's an inefficiency here of having to go from file meta -> engine data -> file meta for the sake of preserving the existing StorageHandler interface. Do we have better ideas?
There was a problem hiding this comment.
That does seem really annoying. Even if we're not worried about the performance impact, the round trip increases code complexity and expands the bug surface.
On the other hand, every new type we're able to return increases FFI complexity quite a bit, because an extern source of FileMeta would have to represent the stream in some portable way, which likely leads to a per-call overhead. Tho EngineData just shifts that overhead to the row visitor that eventually extracts the FileMeta, which could be even worse.
If we have a relatively small set of things we'd ever need to return, we could consider adding a plan result variant for each unique Box<dyn Iterator<Item=Foo>>? Basically generalizing the PlanResult::ByteStream case. That would give a modicum of compile-time safety, tho each plan node type would have to document clearly what result variant(s) it can return, and consumers of results would still have to code against the possibility of getting the wrong variant.
If we worry there would be too many enum variants, we could also imagine some kind of generic PlanResult::Any(Box<dyn Any>) that can then be downcast to a specific Box<T> for plundering. One T of particular interest would be Box<dyn Iterator<Item=Foo>> (outer box for plundering, inner box for holding a dyn iterator). Allows a result to be almost anything, which is both a plus and a minus. Documentation becomes even more critical than before, because there's no longer a closed enum to give compile-time hints of what to expect.
There was a problem hiding this comment.
Notes from our discussion, plan result could be something like:
PlanResult {
Scalar(Scalar)
Bounded(EngineData)
Unbounded(iter of EngineData)
FileMeta
FileMetaIter
Bytes
BytesIter
}
strongly typed results principles:
- main benefit is that if engine produces the type naturally and kernel wants that same type, then it makes sense to add some concrete type to the PlanResult (instead of arrow)
- if engine would have produced some other type and then have to transform it anyway, might as well just put it into arrow
Other notes:
- for multi-node plans, we may want file listing to return generic relation data. we can introduce typed nodes for this in the future
- how to optimize case where kernel "reads some data as engine data then immediately wants to eval an expression":
- option 1: rewrite kernel code to use proper plans
- option 2: implement custom engine data type (complexity around how to implement visitors on ths engine data)
| } | ||
|
|
||
| #[allow(clippy::panic)] | ||
| fn copy_atomic(&self, _src: &Url, _dest: &Url) -> DeltaResult<()> { |
There was a problem hiding this comment.
This is only used by the UC committer - wasn't sure if we wanted to add a dedicated plan node just for a copy op. So left it unimplemented for now.
|
|
||
| fn execute_read_bytes(&self, files: Vec<FileSlice>) -> DeltaResult<PlanResult> { | ||
| let iter = self.storage.read_files(files)?; | ||
| Ok(PlanResult::ByteStream(iter)) |
There was a problem hiding this comment.
Here I chose to introduce a separate Result variant for Bytes instead of having to map bytes to EngineData and then back again. We should more generally discuss how we want to handle generic return types from plan execution. Should it always be EngineData?
| /// Returns [`PlanResult::Unit`] on success. If `overwrite` is false and the file already | ||
| /// exists, the executor must return | ||
| /// [`Error::FileAlreadyExists`](crate::Error::FileAlreadyExists). | ||
| WriteBytes { |
There was a problem hiding this comment.
I "think" write bytes is only used for writing CRC files - could we have instead just used JSON handler instead and avoid implementing a Write node in the plan algebra?
|
|
||
| /// Convert a slice of [`FileMeta`] into a single [`ArrowEngineData`] batch matching | ||
| /// [`FILE_META_SCHEMA`]. | ||
| fn file_metas_to_engine_data(metas: &[FileMeta]) -> DeltaResult<Box<dyn EngineData>> { |
There was a problem hiding this comment.
That does seem really annoying. Even if we're not worried about the performance impact, the round trip increases code complexity and expands the bug surface.
On the other hand, every new type we're able to return increases FFI complexity quite a bit, because an extern source of FileMeta would have to represent the stream in some portable way, which likely leads to a per-call overhead. Tho EngineData just shifts that overhead to the row visitor that eventually extracts the FileMeta, which could be even worse.
If we have a relatively small set of things we'd ever need to return, we could consider adding a plan result variant for each unique Box<dyn Iterator<Item=Foo>>? Basically generalizing the PlanResult::ByteStream case. That would give a modicum of compile-time safety, tho each plan node type would have to document clearly what result variant(s) it can return, and consumers of results would still have to code against the possibility of getting the wrong variant.
If we worry there would be too many enum variants, we could also imagine some kind of generic PlanResult::Any(Box<dyn Any>) that can then be downcast to a specific Box<T> for plundering. One T of particular interest would be Box<dyn Iterator<Item=Foo>> (outer box for plundering, inner box for holding a dyn iterator). Allows a result to be almost anything, which is both a plus and a minus. Documentation becomes even more critical than before, because there's no longer a closed enum to give compile-time hints of what to expect.
| i64::try_from(m.size) | ||
| .ok() | ||
| .map(Some) | ||
| .unwrap_or(Some(i64::MAX)) |
There was a problem hiding this comment.
Should we default to 0 instead of MAX? Or even just None, given that this is a nullable column?
There was a problem hiding this comment.
BTW: Int64Array has both From and FromIterator for both i64 and Option.
AFAICT the current code has an unnecessary Option layer (None never appears).
There was a problem hiding this comment.
Update: all three columns are non-nullable. So we need to decide about 0 vs. MAX and can definitely skip the spurious Option stuff.
| PlanResult::Data(iter) => { | ||
| let meta_iter = iter.flat_map(|batch_result| match batch_result { | ||
| Ok(batch) => match engine_data_to_file_metas(batch.as_ref()) { | ||
| Ok(metas) => metas.into_iter().map(Ok).collect::<Vec<_>>().into_iter(), |
There was a problem hiding this comment.
Why do we need to collect the iterator as vec, only to immediately turn it back to an iterator?
(seems unnecessarily expensive, is there a better way to deal with whatever type mismatch?)
| }; | ||
| let result = self.executor.execute_plan(plan)?; | ||
| match result { | ||
| PlanResult::Unit => Ok(()), |
There was a problem hiding this comment.
Interesting... no PlanResult::Err case, because we return DeltaResult<PlanResult> in order to leverage ??
aside: If the whole rust Try v2 thing ever stabilizes we might be able to make PlanResult ? aware and use it directly. Or maybe not, because what type would ? unwrap as when we don't know which of the other N-1 enum variants we might be dealing with?
| let batch = iter | ||
| .next() | ||
| .ok_or_else(|| Error::generic("HeadFile returned no data batches"))??; | ||
| let metas = engine_data_to_file_metas(batch.as_ref())?; | ||
| metas | ||
| .into_iter() | ||
| .next() | ||
| .ok_or_else(|| Error::generic("HeadFile returned an empty batch with no rows")) |
There was a problem hiding this comment.
Another case where EngineData as carrier makes a poor UX...
| Data(Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>>>), | ||
| /// A stream of raw byte buffers, one per file or file slice. | ||
| ByteStream(Box<dyn Iterator<Item = DeltaResult<Bytes>>>), |
There was a problem hiding this comment.
aside: Should we consider capturing a type alias?
type DeltaResultIterator<T> = Box<dyn Iterator<Item = DeltaResult<T>>>;I believe the pattern already showed up quite a bit in kernel and now we have two more here?
What changes are proposed in this pull request?
This PR demonstrates how we can introduce a migration shim for slowly moving Engine functionality to declarative plan execution. The high level idea is that:
PlanExecutortrait for executing plansPlanBasedEnginewhich implsEngineby wrapping a PlanExecutor + DefaultEngine handlers.a. Construct a plan and call into the plan executor. Then map the result back to the expected type
b. Simply defer to the default engine implementation.
This PR also demonstrates a
NaivePlanExecutorwhich uses the default storage handler to implement I/O. So the class structure becomes:PlanBasedEngine (impls Engine) -> NaivePlanExecutor (impls PlanExecutor) -> DefaultStorageHandlerThe benefits of this are that:
How was this change tested?
Basic unit tests - but this is just a POC