Handle multi-key bundles in side-input runner#38988
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request implements support for multi-key bundles within the side-input runner. By introducing an 'activeKey' tracking mechanism, the changes allow the runner to correctly manage side-input processing across multiple keys within a single bundle, moving away from the previous assumption that bundles were strictly single-key. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces an activeKey boolean flag across SimpleParDoFn, StreamingKeyedWorkItemSideInputParDoFn, and StreamingSideInputDoFnRunner to track key lifecycle state and manage bundle initialization correctly. Feedback on these changes highlights a critical bug in StreamingKeyedWorkItemSideInputParDoFn where onStartKey() is invoked before the current key is written to state, as well as a suggestion to make the activeKey field private in StreamingSideInputDoFnRunner to improve encapsulation.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| public void processElement(Object untypedElem) throws Exception { | ||
| if (helpers.fnRunner == null) { | ||
| boolean initialize = !activeKey || helpers.fnRunner == null; | ||
| if (initialize) { | ||
| // If we need to run reallyStartBundle in here, we need to make sure to switch the state | ||
| // sampler into the start state. | ||
| try (Closeable start = helpers.operationContext.enterStart()) { | ||
| helpers.reallyStartBundle(); | ||
| if (helpers.fnRunner == null) { | ||
| helpers.reallyStartBundle(); | ||
| } | ||
| onStartKey(); | ||
| } | ||
| } |
There was a problem hiding this comment.
In StreamingKeyedWorkItemSideInputParDoFn, onStartKey() reads the current key from state using keyValue().read(). However, when onStartKey() is called during processElement initialization, the new key has not yet been written to keyValue() (which normally happens later in onProcessWindowedValue). This means keyValue().read() will either return null (if the key is being processed for the first time) or the key from a previous bundle, leading to incorrect behavior or missed unblocking of elements/timers for the new key.
To fix this, we should extract the key from untypedElem and write it to keyValue() before calling onStartKey() in processElement.
| public void processElement(Object untypedElem) throws Exception { | |
| if (helpers.fnRunner == null) { | |
| boolean initialize = !activeKey || helpers.fnRunner == null; | |
| if (initialize) { | |
| // If we need to run reallyStartBundle in here, we need to make sure to switch the state | |
| // sampler into the start state. | |
| try (Closeable start = helpers.operationContext.enterStart()) { | |
| helpers.reallyStartBundle(); | |
| if (helpers.fnRunner == null) { | |
| helpers.reallyStartBundle(); | |
| } | |
| onStartKey(); | |
| } | |
| } | |
| public void processElement(Object untypedElem) throws Exception { | |
| boolean initialize = !activeKey || helpers.fnRunner == null; | |
| if (initialize) { | |
| // If we need to run reallyStartBundle in here, we need to make sure to switch the state | |
| // sampler into the start state. | |
| try (Closeable start = helpers.operationContext.enterStart()) { | |
| if (helpers.fnRunner == null) { | |
| helpers.reallyStartBundle(); | |
| } | |
| WindowedValue<KeyedWorkItem<K, InputT>> elem = (WindowedValue<KeyedWorkItem<K, InputT>>) untypedElem; | |
| keyValue().write(elem.getValue().key()); | |
| onStartKey(); | |
| } | |
| } |
| implements DoFnRunner<InputT, OutputT> { | ||
| private final DoFnRunner<InputT, OutputT> simpleDoFnRunner; | ||
| private final StreamingSideInputProcessor<InputT, W> sideInputProcessor; | ||
| boolean activeKey = false; |
There was a problem hiding this comment.
The activeKey field has default (package-private) visibility. It should be declared as private to maintain proper encapsulation and consistency with other classes like SimpleParDoFn and StreamingKeyedWorkItemSideInputParDoFn.
| boolean activeKey = false; | |
| private boolean activeKey = false; |
arunpandianp
left a comment
There was a problem hiding this comment.
Moving onStartKey out of startBundle could simplify things. Not sure if there is a reason that prevents us from doing so.
|
|
||
| @Override | ||
| public void processTimers() throws Exception { | ||
|
|
There was a problem hiding this comment.
There could be keys with only timers and no elements, do we need to call onStartKey in processTimers?
| @Override | ||
| public void finishKey(Object key) throws Exception {} | ||
| public void finishKey(Object key) throws Exception { | ||
| this.activeKey = false; |
There was a problem hiding this comment.
IIUC there could be keys with no elements or timers. Should we move the onStartKey call here to handle such keys?
Instead of calling onStartKey in startBundle and processElements, we could call it in processElements, processTimers and finishKey.
| boolean hasState = helpers.hasState(); | ||
|
|
||
| // TODO(relax): We should be able to get this without writing it to state! | ||
| // TODO(relax): We should be able to get this without writing it to state! To make this work, |
There was a problem hiding this comment.
finishKey has the key. If we move onStartKey to finishKey, we should be able to make this optimization.
| @Override | ||
| public void finishKey(Object key) throws Exception {} | ||
| public void finishKey(Object key) throws Exception { | ||
| this.activeKey = false; |
There was a problem hiding this comment.
should we call sideInputFetcher.persist(); in finishKey
|
Assigning reviewers: R: @chamikaramj added as fallback since no labels match configuration Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
R: @arunpandianp