KAFKA-10549: Add topic ID support to OffsetForLeader - client (4/N)#21127
KAFKA-10549: Add topic ID support to OffsetForLeader - client (4/N)#21127TaiJuWu wants to merge 7 commits intoapache:trunkfrom
Conversation
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
| // Convert TopicPartition to TopicIdPartition | ||
| Map<TopicIdPartition, FetchPosition> fetchPositionsWithId = new HashMap<>(); | ||
| Map<String, Uuid> topicIds = metadata.topicIds(); | ||
| fetchPositions.forEach((tp, position) -> { | ||
| Uuid topicId = topicIds.get(tp.topic()); | ||
| if (topicId != null) { | ||
| TopicIdPartition tip = new TopicIdPartition(topicId, tp); | ||
| fetchPositionsWithId.put(tip, position); | ||
| } else { | ||
| // Topic ID not available yet, skip this partition for now | ||
| // The metadata will be refreshed and we'll retry | ||
| log.debug("Skipping offset validation for partition {} because topic ID is not available in metadata", tp); | ||
| } | ||
| }); |
There was a problem hiding this comment.
I may be missing some context, but it looks like this path assumes that if a topic doesn’t have a TopicId in metadata.topicIds(), it will eventually get one and validation will proceed later. In migration / mixed-version environments, I’m wondering if it’s plausible that many topics may not have TopicIds for some time (or possibly indefinitely, depending on broker support / metadata propagation).
If that’s the case, should we handle the “missing TopicId” case explicitly here — e.g., fall back to a name-based request (older protocol version such as v4), or consider sending Uuid.ZERO_UUID for those partitions if that’s the intended compatibility path? Otherwise, my concern is that some TopicPartitions could be silently omitted from the OffsetsForLeaderEpoch request and remain unvalidated.
Does that concern make sense, or is there an earlier step that guarantees TopicIds will be present by the time we reach this code?
There was a problem hiding this comment.
Nice catch, I always forget we need to support old broker :(
| // Convert TopicPartition to TopicIdPartition | ||
| Map<TopicIdPartition, SubscriptionState.FetchPosition> fetchPositionsWithId = new HashMap<>(); | ||
| Map<String, Uuid> topicIds = metadata.topicIds(); | ||
| fetchPositions.forEach((tp, position) -> { | ||
| Uuid topicId = topicIds.get(tp.topic()); | ||
| if (topicId != null) { | ||
| TopicIdPartition tip = new TopicIdPartition(topicId, tp.partition(), tp.topic()); | ||
| fetchPositionsWithId.put(tip, position); | ||
| } else { | ||
| // Topic ID not available yet, skip this partition for now | ||
| // The metadata will be refreshed and we'll retry | ||
| log.debug("Skipping offset validation for partition {} because topic ID is not available in metadata", tp); | ||
| } | ||
| }); |
chickenchickenlove
left a comment
There was a problem hiding this comment.
Thanks for the patch! 🙇♂️
I’m not as familiar with this part of Kafka as you are and studying this field, and after reading the code you wrote, I wanted to share my thoughts.
I hope this will be helpful for your PR, and if I misunderstood anything and left an incorrect comment, I’d like to apologize in advance.
Add topic ID support to OffsetForLeader for client side.