Skip to content

KAFKA-10549: Add topic ID support to OffsetForLeader - client (4/N)#21127

Open
TaiJuWu wants to merge 7 commits intoapache:trunkfrom
TaiJuWu:OffsetForLeaders-client
Open

KAFKA-10549: Add topic ID support to OffsetForLeader - client (4/N)#21127
TaiJuWu wants to merge 7 commits intoapache:trunkfrom
TaiJuWu:OffsetForLeaders-client

Conversation

@TaiJuWu
Copy link
Collaborator

@TaiJuWu TaiJuWu commented Dec 11, 2025

Add topic ID support to OffsetForLeader for client side.

@github-actions github-actions bot added triage PRs from the community core Kafka Broker consumer clients labels Dec 11, 2025
@TaiJuWu TaiJuWu changed the title KAFKA-10549: Add topic ID support to OffsetForLeader - client (4/N) (WIP) KAFKA-10549: Add topic ID support to OffsetForLeader - client (4/N) Dec 11, 2025
@TaiJuWu TaiJuWu changed the title (WIP) KAFKA-10549: Add topic ID support to OffsetForLeader - client (4/N) KAFKA-10549: Add topic ID support to OffsetForLeader - client (4/N) Dec 13, 2025
@TaiJuWu TaiJuWu marked this pull request as draft December 13, 2025 06:07
@TaiJuWu TaiJuWu closed this Dec 13, 2025
@TaiJuWu TaiJuWu reopened this Dec 13, 2025
@TaiJuWu TaiJuWu marked this pull request as ready for review December 13, 2025 06:07
@github-actions
Copy link

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

Comment on lines +276 to +289
// Convert TopicPartition to TopicIdPartition
Map<TopicIdPartition, FetchPosition> fetchPositionsWithId = new HashMap<>();
Map<String, Uuid> topicIds = metadata.topicIds();
fetchPositions.forEach((tp, position) -> {
Uuid topicId = topicIds.get(tp.topic());
if (topicId != null) {
TopicIdPartition tip = new TopicIdPartition(topicId, tp);
fetchPositionsWithId.put(tip, position);
} else {
// Topic ID not available yet, skip this partition for now
// The metadata will be refreshed and we'll retry
log.debug("Skipping offset validation for partition {} because topic ID is not available in metadata", tp);
}
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be missing some context, but it looks like this path assumes that if a topic doesn’t have a TopicId in metadata.topicIds(), it will eventually get one and validation will proceed later. In migration / mixed-version environments, I’m wondering if it’s plausible that many topics may not have TopicIds for some time (or possibly indefinitely, depending on broker support / metadata propagation).

If that’s the case, should we handle the “missing TopicId” case explicitly here — e.g., fall back to a name-based request (older protocol version such as v4), or consider sending Uuid.ZERO_UUID for those partitions if that’s the intended compatibility path? Otherwise, my concern is that some TopicPartitions could be silently omitted from the OffsetsForLeaderEpoch request and remain unvalidated.

Does that concern make sense, or is there an earlier step that guarantees TopicIds will be present by the time we reach this code?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, I always forget we need to support old broker :(

Comment on lines +788 to +801
// Convert TopicPartition to TopicIdPartition
Map<TopicIdPartition, SubscriptionState.FetchPosition> fetchPositionsWithId = new HashMap<>();
Map<String, Uuid> topicIds = metadata.topicIds();
fetchPositions.forEach((tp, position) -> {
Uuid topicId = topicIds.get(tp.topic());
if (topicId != null) {
TopicIdPartition tip = new TopicIdPartition(topicId, tp.partition(), tp.topic());
fetchPositionsWithId.put(tip, position);
} else {
// Topic ID not available yet, skip this partition for now
// The metadata will be refreshed and we'll retry
log.debug("Skipping offset validation for partition {} because topic ID is not available in metadata", tp);
}
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

Copy link
Contributor

@chickenchickenlove chickenchickenlove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch! 🙇‍♂️
I’m not as familiar with this part of Kafka as you are and studying this field, and after reading the code you wrote, I wanted to share my thoughts.
I hope this will be helpful for your PR, and if I misunderstood anything and left an incorrect comment, I’d like to apologize in advance.

@github-actions github-actions bot removed needs-attention triage PRs from the community labels Feb 2, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants