Skip to content

Handle multi-key bundles in side-input runner#38988

Open
reuvenlax wants to merge 3 commits into
apache:masterfrom
reuvenlax:side_input_finish_key
Open

Handle multi-key bundles in side-input runner#38988
reuvenlax wants to merge 3 commits into
apache:masterfrom
reuvenlax:side_input_finish_key

Conversation

@reuvenlax

Copy link
Copy Markdown
Contributor

@gemini-code-assist

Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request implements support for multi-key bundles within the side-input runner. By introducing an 'activeKey' tracking mechanism, the changes allow the runner to correctly manage side-input processing across multiple keys within a single bundle, moving away from the previous assumption that bundles were strictly single-key.

Highlights

  • Key Tracking State: Introduced an 'activeKey' boolean flag across multiple runner classes to track the state of the current key being processed.
  • Bundle Initialization Logic: Refactored 'processElement' and 'startBundle' methods to conditionally initialize and trigger side-input processing based on the 'activeKey' state, allowing for multi-key bundle support.
  • Lifecycle Management: Updated 'finishKey' and 'finishBundle' methods to reset the 'activeKey' flag, ensuring proper cleanup between keys and bundles.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces an activeKey boolean flag across SimpleParDoFn, StreamingKeyedWorkItemSideInputParDoFn, and StreamingSideInputDoFnRunner to track key lifecycle state and manage bundle initialization correctly. Feedback on these changes highlights a critical bug in StreamingKeyedWorkItemSideInputParDoFn where onStartKey() is invoked before the current key is written to state, as well as a suggestion to make the activeKey field private in StreamingSideInputDoFnRunner to improve encapsulation.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines 154 to 165
public void processElement(Object untypedElem) throws Exception {
if (helpers.fnRunner == null) {
boolean initialize = !activeKey || helpers.fnRunner == null;
if (initialize) {
// If we need to run reallyStartBundle in here, we need to make sure to switch the state
// sampler into the start state.
try (Closeable start = helpers.operationContext.enterStart()) {
helpers.reallyStartBundle();
if (helpers.fnRunner == null) {
helpers.reallyStartBundle();
}
onStartKey();
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In StreamingKeyedWorkItemSideInputParDoFn, onStartKey() reads the current key from state using keyValue().read(). However, when onStartKey() is called during processElement initialization, the new key has not yet been written to keyValue() (which normally happens later in onProcessWindowedValue). This means keyValue().read() will either return null (if the key is being processed for the first time) or the key from a previous bundle, leading to incorrect behavior or missed unblocking of elements/timers for the new key.

To fix this, we should extract the key from untypedElem and write it to keyValue() before calling onStartKey() in processElement.

Suggested change
public void processElement(Object untypedElem) throws Exception {
if (helpers.fnRunner == null) {
boolean initialize = !activeKey || helpers.fnRunner == null;
if (initialize) {
// If we need to run reallyStartBundle in here, we need to make sure to switch the state
// sampler into the start state.
try (Closeable start = helpers.operationContext.enterStart()) {
helpers.reallyStartBundle();
if (helpers.fnRunner == null) {
helpers.reallyStartBundle();
}
onStartKey();
}
}
public void processElement(Object untypedElem) throws Exception {
boolean initialize = !activeKey || helpers.fnRunner == null;
if (initialize) {
// If we need to run reallyStartBundle in here, we need to make sure to switch the state
// sampler into the start state.
try (Closeable start = helpers.operationContext.enterStart()) {
if (helpers.fnRunner == null) {
helpers.reallyStartBundle();
}
WindowedValue<KeyedWorkItem<K, InputT>> elem = (WindowedValue<KeyedWorkItem<K, InputT>>) untypedElem;
keyValue().write(elem.getValue().key());
onStartKey();
}
}

implements DoFnRunner<InputT, OutputT> {
private final DoFnRunner<InputT, OutputT> simpleDoFnRunner;
private final StreamingSideInputProcessor<InputT, W> sideInputProcessor;
boolean activeKey = false;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The activeKey field has default (package-private) visibility. It should be declared as private to maintain proper encapsulation and consistency with other classes like SimpleParDoFn and StreamingKeyedWorkItemSideInputParDoFn.

Suggested change
boolean activeKey = false;
private boolean activeKey = false;

@arunpandianp arunpandianp left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving onStartKey out of startBundle could simplify things. Not sure if there is a reason that prevents us from doing so.


@Override
public void processTimers() throws Exception {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be keys with only timers and no elements, do we need to call onStartKey in processTimers?

@Override
public void finishKey(Object key) throws Exception {}
public void finishKey(Object key) throws Exception {
this.activeKey = false;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC there could be keys with no elements or timers. Should we move the onStartKey call here to handle such keys?

Instead of calling onStartKey in startBundle and processElements, we could call it in processElements, processTimers and finishKey.

boolean hasState = helpers.hasState();

// TODO(relax): We should be able to get this without writing it to state!
// TODO(relax): We should be able to get this without writing it to state! To make this work,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finishKey has the key. If we move onStartKey to finishKey, we should be able to make this optimization.

@Override
public void finishKey(Object key) throws Exception {}
public void finishKey(Object key) throws Exception {
this.activeKey = false;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we call sideInputFetcher.persist(); in finishKey

@github-actions

Copy link
Copy Markdown
Contributor

Assigning reviewers:

R: @chamikaramj added as fallback since no labels match configuration

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants