Skip to content

KAFKA-20269: Refactor target assignment update for delayed assignments#21664

Open
squah-confluent wants to merge 1 commit intoapache:trunkfrom
confluentinc:squah-kip-1263-refactor-target-assignment-update-for-delayed-assignments
Open

KAFKA-20269: Refactor target assignment update for delayed assignments#21664
squah-confluent wants to merge 1 commit intoapache:trunkfrom
confluentinc:squah-kip-1263-refactor-target-assignment-update-for-delayed-assignments

Conversation

@squah-confluent
Copy link
Contributor

@squah-confluent squah-confluent commented Mar 7, 2026

When assignment batching or assignment offload are implemented, the
methods to update the target assignment will no longer always return a
new target assignment, depending on timings and the group coordinator
config.

When assignment batching or assignment offload are implemented, the
methods to update the target assignment will no longer always return a
new target assignment, depending on timings and the group coordinator
config.
@github-actions github-actions bot added triage PRs from the community group-coordinator labels Mar 7, 2026
@squah-confluent
Copy link
Contributor Author

@lucasbru Could you check the streams part of this change?

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two nits, otherwise lgtm

@@ -3937,7 +3956,7 @@ private Assignment updateTargetAssignment(
* @param records The list to accumulate any new records.
* @return The new target assignment for the updated member, or EMPTY if no member specified.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do we not want to update the return javadocs? I suppose this goes to the next PR?

returnedStatus.add(
new Status()
.setStatusCode(StreamsGroupHeartbeatResponse.Status.ASSIGNMENT_DELAYED.code())
.setStatusDetail("Assignment delayed due to the configured initial rebalance delay.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can also get here also if the initial assignment is offloaded. Should we reflect this in the status detail?

@dajac dajac removed the triage PRs from the community label Mar 9, 2026
Optional<Assignment> updatedTargetAssignment = Optional.empty();
if (groupEpoch > group.assignmentEpoch()) {
targetAssignment = updateTargetAssignment(
updatedTargetAssignment = maybeUpdateTargetAssignment(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether we should go a step further here. Have you considered pushing all the logic into maybeUpdateTargetAssignment? For instance, maybeUpdateTargetAssignment could return a record(epoch, assignment) and handle all the conditions that we have here. Thoughts?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants