Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,52 @@ private static class UpdateSubscriptionMetadataResult {
}
}

private record UpdateTargetAssignmentResult<T>(
int targetAssignmentEpoch,
T targetAssignment
) {
private static UpdateTargetAssignmentResult<Assignment> fromLastTargetAssignment(
ConsumerGroup group,
ConsumerGroupMember member
) {
return new UpdateTargetAssignmentResult<>(
group.assignmentEpoch(),
group.targetAssignment(member.memberId(), member.instanceId())
);
}

private static UpdateTargetAssignmentResult<Assignment> fromLastTargetAssignment(
ShareGroup group,
ShareGroupMember member
) {
return new UpdateTargetAssignmentResult<>(
group.assignmentEpoch(),
group.targetAssignment(member.memberId())
);
}

private static UpdateTargetAssignmentResult<TasksTuple> fromLastTargetAssignment(
StreamsGroup group,
Optional<StreamsGroupMember> member
) {
if (member.isPresent()) {
if (member.get().instanceId().isPresent()) {
throw new UnsupportedOperationException("Static members are not supported yet.");
}

return new UpdateTargetAssignmentResult<>(
group.assignmentEpoch(),
group.targetAssignment(member.get().memberId())
);
} else {
return new UpdateTargetAssignmentResult<>(
group.assignmentEpoch(),
TasksTuple.EMPTY
);
}
}
}

public static class Builder {
private LogContext logContext = null;
private SnapshotRegistry snapshotRegistry = null;
Expand Down Expand Up @@ -2067,36 +2113,16 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream
// 4. Update the target assignment if the group epoch is larger than the target assignment epoch or a static member
// replaces an existing static member.
// The delta between the existing and the new target assignment is persisted to the partition.
int targetAssignmentEpoch;
TasksTuple targetAssignment;
if (groupEpoch > group.assignmentEpoch()) {
boolean initialDelayActive = timer.isScheduled(streamsInitialRebalanceKey(groupId));
if (initialDelayActive) {
// During initial rebalance delay, return empty assignment to first joining members.
targetAssignmentEpoch = Math.max(1, group.assignmentEpoch());
targetAssignment = TasksTuple.EMPTY;

returnedStatus.add(
new Status()
.setStatusCode(StreamsGroupHeartbeatResponse.Status.ASSIGNMENT_DELAYED.code())
.setStatusDetail("Assignment delayed due to the configured initial rebalance delay.")
);
} else {
targetAssignment = updateStreamsTargetAssignment(
group,
groupEpoch,
Optional.of(updatedMember),
updatedConfiguredTopology,
metadataImage,
records,
currentAssignmentConfigs
);
targetAssignmentEpoch = groupEpoch;
}
} else {
targetAssignmentEpoch = group.assignmentEpoch();
targetAssignment = group.targetAssignment(updatedMember.memberId());
}
UpdateTargetAssignmentResult<TasksTuple> updateTargetAssignmentResult = maybeUpdateStreamsTargetAssignment(
group,
groupEpoch,
Optional.of(updatedMember),
updatedConfiguredTopology,
metadataImage,
records,
Optional.of(returnedStatus),
currentAssignmentConfigs
);

// 5. Reconcile the member's assignment with the target assignment if the member is not
// fully reconciled yet.
Expand All @@ -2106,8 +2132,8 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream
group::currentActiveTaskProcessId,
group::currentStandbyTaskProcessIds,
group::currentWarmupTaskProcessIds,
targetAssignmentEpoch,
targetAssignment,
updateTargetAssignmentResult.targetAssignmentEpoch(),
updateTargetAssignmentResult.targetAssignment(),
ownedActiveTasks,
ownedStandbyTasks,
ownedWarmupTasks,
Expand Down Expand Up @@ -2396,32 +2422,23 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>

// 2. Update the target assignment if the group epoch is larger than the target assignment epoch. The delta between
// the existing and the new target assignment is persisted to the partition.
final int targetAssignmentEpoch;
final Assignment targetAssignment;

if (groupEpoch > group.assignmentEpoch()) {
targetAssignment = updateTargetAssignment(
group,
groupEpoch,
member,
updatedMember,
subscriptionType,
records
);
targetAssignmentEpoch = groupEpoch;
} else {
targetAssignmentEpoch = group.assignmentEpoch();
targetAssignment = group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());
}
UpdateTargetAssignmentResult<Assignment> updateTargetAssignmentResult = maybeUpdateTargetAssignment(
group,
groupEpoch,
member,
updatedMember,
subscriptionType,
records
);

// 3. Reconcile the member's assignment with the target assignment if the member is not
// fully reconciled yet.
updatedMember = maybeReconcile(
groupId,
updatedMember,
group::currentPartitionEpoch,
targetAssignmentEpoch,
targetAssignment,
updateTargetAssignmentResult.targetAssignmentEpoch(),
updateTargetAssignmentResult.targetAssignment(),
group.resolvedRegularExpressions(),
// Force consistency with the subscription when the subscription has changed.
hasSubscriptionChanged,
Expand Down Expand Up @@ -2611,31 +2628,22 @@ private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinToConsumerGro

// 2. Update the target assignment if the group epoch is larger than the target assignment epoch.
// The delta between the existing and the new target assignment is persisted to the partition.
final int targetAssignmentEpoch;
final Assignment targetAssignment;

if (groupEpoch > group.assignmentEpoch()) {
targetAssignment = updateTargetAssignment(
group,
groupEpoch,
member,
updatedMember,
subscriptionType,
records
);
targetAssignmentEpoch = groupEpoch;
} else {
targetAssignmentEpoch = group.assignmentEpoch();
targetAssignment = group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());
}
UpdateTargetAssignmentResult<Assignment> updateTargetAssignmentResult = maybeUpdateTargetAssignment(
group,
groupEpoch,
member,
updatedMember,
subscriptionType,
records
);

// 3. Reconcile the member's assignment with the target assignment if the member is not fully reconciled yet.
updatedMember = maybeReconcile(
groupId,
updatedMember,
group::currentPartitionEpoch,
targetAssignmentEpoch,
targetAssignment,
updateTargetAssignmentResult.targetAssignmentEpoch(),
updateTargetAssignmentResult.targetAssignment(),
group.resolvedRegularExpressions(),
// Force consistency with the subscription when the subscription has changed.
bumpGroupEpoch,
Expand Down Expand Up @@ -2773,30 +2781,21 @@ private CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<In

// 2. Update the target assignment if the group epoch is larger than the target assignment epoch. The delta between
// the existing and the new target assignment is persisted to the partition.
final int targetAssignmentEpoch;
final Assignment targetAssignment;

if (groupEpoch > group.assignmentEpoch()) {
targetAssignment = updateTargetAssignment(
group,
groupEpoch,
updatedMember,
subscriptionType,
records
);
targetAssignmentEpoch = groupEpoch;
} else {
targetAssignmentEpoch = group.assignmentEpoch();
targetAssignment = group.targetAssignment(updatedMember.memberId());
}
UpdateTargetAssignmentResult<Assignment> updateTargetAssignmentResult = maybeUpdateTargetAssignment(
group,
groupEpoch,
updatedMember,
subscriptionType,
records
);

// 3. Reconcile the member's assignment with the target assignment if the member is not
// fully reconciled yet.
updatedMember = maybeReconcile(
groupId,
updatedMember,
targetAssignmentEpoch,
targetAssignment,
updateTargetAssignmentResult.targetAssignmentEpoch(),
updateTargetAssignmentResult.targetAssignment(),
// Force consistency with the subscription when the subscription has changed.
bumpGroupEpoch,
records
Expand Down Expand Up @@ -3806,14 +3805,19 @@ private UpdateSubscriptionMetadataResult updateSubscriptionMetadata(
* @param records The list to accumulate any new records.
* @return The new target assignment.
*/
private Assignment updateTargetAssignment(
private UpdateTargetAssignmentResult<Assignment> maybeUpdateTargetAssignment(
ConsumerGroup group,
int groupEpoch,
ConsumerGroupMember member,
ConsumerGroupMember updatedMember,
SubscriptionType subscriptionType,
List<CoordinatorRecord> records
) {
if (groupEpoch <= group.assignmentEpoch()) {
// The assignment is up to date.
return UpdateTargetAssignmentResult.fromLastTargetAssignment(group, updatedMember);
}

String preferredServerAssignor = group.computePreferredServerAssignor(
member,
updatedMember
Expand Down Expand Up @@ -3854,9 +3858,9 @@ private Assignment updateTargetAssignment(

MemberAssignment newMemberAssignment = assignmentResult.targetAssignment().get(updatedMember.memberId());
if (newMemberAssignment != null) {
return new Assignment(newMemberAssignment.partitions());
return new UpdateTargetAssignmentResult<>(groupEpoch, new Assignment(newMemberAssignment.partitions()));
} else {
return Assignment.EMPTY;
return new UpdateTargetAssignmentResult<>(groupEpoch, Assignment.EMPTY);
}
} catch (PartitionAssignorException ex) {
String msg = String.format("Failed to compute a new target assignment for epoch %d: %s",
Expand All @@ -3876,13 +3880,18 @@ private Assignment updateTargetAssignment(
* @param records The list to accumulate any new records.
* @return The new target assignment.
*/
private Assignment updateTargetAssignment(
private UpdateTargetAssignmentResult<Assignment> maybeUpdateTargetAssignment(
ShareGroup group,
int groupEpoch,
ShareGroupMember updatedMember,
SubscriptionType subscriptionType,
List<CoordinatorRecord> records
) {
if (groupEpoch <= group.assignmentEpoch()) {
// The assignment is up to date.
return UpdateTargetAssignmentResult.fromLastTargetAssignment(group, updatedMember);
}

try {
Map<Uuid, Set<Integer>> initializedTopicPartitions = shareGroupStatePartitionMetadata.containsKey(group.groupId()) ?
stripInitValue(shareGroupStatePartitionMetadata.get(group.groupId()).initializedTopics()) :
Expand Down Expand Up @@ -3915,9 +3924,9 @@ private Assignment updateTargetAssignment(

MemberAssignment newMemberAssignment = assignmentResult.targetAssignment().get(updatedMember.memberId());
if (newMemberAssignment != null) {
return new Assignment(newMemberAssignment.partitions());
return new UpdateTargetAssignmentResult<>(groupEpoch, new Assignment(newMemberAssignment.partitions()));
} else {
return Assignment.EMPTY;
return new UpdateTargetAssignmentResult<>(groupEpoch, Assignment.EMPTY);
}
} catch (PartitionAssignorException ex) {
String msg = String.format("Failed to compute a new target assignment for epoch %d: %s",
Expand All @@ -3935,17 +3944,39 @@ private Assignment updateTargetAssignment(
* @param updatedMember The updated member (optional).
* @param metadataImage The metadata image.
* @param records The list to accumulate any new records.
* @param returnedStatus A mutable collection of status to be returned in the response.
* @return The new target assignment for the updated member, or EMPTY if no member specified.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do we not want to update the return javadocs? I suppose this goes to the next PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review.

After applying #21664 (comment) I'm not sure it makes sense to update the return javadocs.

*/
private TasksTuple updateStreamsTargetAssignment(
private UpdateTargetAssignmentResult<TasksTuple> maybeUpdateStreamsTargetAssignment(
StreamsGroup group,
int groupEpoch,
Optional<StreamsGroupMember> updatedMember,
ConfiguredTopology configuredTopology,
CoordinatorMetadataImage metadataImage,
List<CoordinatorRecord> records,
Optional<List<Status>> returnedStatus,
Map<String, String> assignmentConfigs
) {
boolean initialDelayActive = timer.isScheduled(streamsInitialRebalanceKey(group.groupId()));
if (initialDelayActive) {
returnedStatus.ifPresent(statusList -> statusList.add(
new Status()
.setStatusCode(StreamsGroupHeartbeatResponse.Status.ASSIGNMENT_DELAYED.code())
.setStatusDetail("Assignment delayed due to the configured initial rebalance delay.")
));

return new UpdateTargetAssignmentResult<>(
Math.max(1, group.assignmentEpoch()),
updatedMember.map(member -> group.targetAssignment(member.memberId()))
.orElse(TasksTuple.EMPTY)
);
}

if (groupEpoch <= group.assignmentEpoch()) {
// The assignment is up to date.
return UpdateTargetAssignmentResult.fromLastTargetAssignment(group, updatedMember);
}

TaskAssignor assignor = streamsGroupAssignor(group.groupId());
try {
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder assignmentResultBuilder =
Expand Down Expand Up @@ -3980,8 +4011,11 @@ private TasksTuple updateStreamsTargetAssignment(

records.addAll(assignmentResult.records());

return updatedMember.map(member -> assignmentResult.targetAssignment().get(member.memberId()))
.orElse(TasksTuple.EMPTY);
return new UpdateTargetAssignmentResult<>(
groupEpoch,
updatedMember.map(member -> assignmentResult.targetAssignment().get(member.memberId()))
.orElse(TasksTuple.EMPTY)
);
} catch (TaskAssignorException ex) {
String msg = String.format("Failed to compute a new target assignment for epoch %d: %s",
groupEpoch, ex.getMessage());
Expand Down Expand Up @@ -4018,13 +4052,14 @@ private CoordinatorResult<Void, CoordinatorRecord> computeDelayedTargetAssignmen
}

List<CoordinatorRecord> records = new ArrayList<>();
updateStreamsTargetAssignment(
maybeUpdateStreamsTargetAssignment(
group,
group.groupEpoch(),
Optional.empty(),
group.configuredTopology().get(),
metadataImage,
records,
Optional.empty(),
group.lastAssignmentConfigs()
);

Expand Down