KAFKA-10549: Add topic ID support to OffsetForLeader - ServerAndProtocol (3/N)#21126
KAFKA-10549: Add topic ID support to OffsetForLeader - ServerAndProtocol (3/N)#21126TaiJuWu wants to merge 4 commits intoapache:trunkfrom
Conversation
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
chickenchickenlove
left a comment
There was a problem hiding this comment.
Thanks for your work on this! 🙇♂️
I’m not as familiar with this part of Kafka as you are, but I left a few comments based on my reading.
Apologies in advance if I’ve misunderstood anything or if any of these comments are noisy.
|
|
||
| short latestVersion = canUseTopicId ? (short) 5 : (short) 4; | ||
| return new Builder((short) 3, latestVersion, data); |
There was a problem hiding this comment.
ApiKeys.OFFSET_FOR_LEADER_EPOCH could be updated to treat v5 as the unstable latest version. That way, we can reference the version registered in the enum instead of hard-coding magic numbers like 4/5 here.
For example, we can use
if (canUseTopicId) {
return new Builder((short) 3, ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), data);
} else {
return new Builder((short) 3, ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(true), data);
}If bumping latestVersion is too impactful, would it make sense to follow any existing pattern for retrieving the unstable latest version and use that instead?
|
|
||
| if (version >= 5) { | ||
| for (OffsetForLeaderEpochRequestData.OffsetForLeaderTopic topic : data.topics()) { | ||
| if (topic.topicId() == null || topic.topicId() == Uuid.ZERO_UUID) { |
There was a problem hiding this comment.
The parsed UUID object in the API response might not share the same reference as ZERO_UUID. I wonder if comparing them using equals() instead of == was intended here. What do you think?
For example,
if (topic.topicId() == null || topic.topicId().equals(Uuid.ZERO_UUID)) { ... }| else authHelper.partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, knownTopics) { offsetForLeaderTopic => | ||
| // Resolve topic name from topicId if needed for authorization | ||
| if (OffsetsForLeaderEpochRequest.useTopicIds(request.header.apiVersion)) { | ||
| metadataCache.getTopicName(offsetForLeaderTopic.topicId).get() |
There was a problem hiding this comment.
Should we consider race condition in terms of view update here?
Because we check for existence first and retrieve the value later.
// Separate topics with unknown topic IDs when using version 5+
val (knownTopics, unknownTopicIdTopics) = if (OffsetsForLeaderEpochRequest.useTopicIds(request.header.apiVersion)) {
topics.partition { offsetForLeaderTopic =>
metadataCache.getTopicName(offsetForLeaderTopic.topicId).isPresent
}
} else {
(topics, Seq.empty[OffsetForLeaderTopic])
}Consider the following sequence:
- Thread A: Executes the partitioning logic.
metadataCache.getTopicName(id).isPresent()returns true for UUID-123, so it's added to knownTopics. - Thread B: Handles a metadata update (e.g., topic deletion via Admin API). It removes UUID-123 from
metadataCache. - Thread A: Proceeds to the authorization block and calls
metadataCache.getTopicName(id).get().
As a result of this operation, NoSuchElementException will be thrown because the topic no longer exists in the cache, leading to an uncaught exception.
What do you think?
There was a problem hiding this comment.
You are right so I use a addition variable to store such information, and other comments are addressed, PTAL.
| data.setTopics(epochsByPartition); | ||
| return new Builder((short) 3, ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), data); | ||
|
|
||
| short latestVersion = canUseTopicId ? ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion() : (short) 4; |
There was a problem hiding this comment.
@TaiJuWu
Thank you for the update! 🙇♂️
However, ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion() seems to return 4.
In that case, wouldn’t ApiMessageType.OFFSET_FOR_LEADER_EPOCH also need to be updated?
// As-Is
OFFSET_FOR_LEADER_EPOCH("OffsetForLeaderEpoch", ... , (short) 4, ..., false),
// To-Be
OFFSET_FOR_LEADER_EPOCH("OffsetForLeaderEpoch", ... , (short) 5, ..., true),Also, if we make this update, I think the logic could be simplified as follows:
short latestVersion = ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(canUseTopic);I checked and found about 5 references to ApiMessageType.OFFSET_FOR_LEADER_EPOCH in the IDE.
While bumping the version in ApiMessageType.OFFSET_FOR_LEADER_EPOCH would align the codebase to a single source of truth, I realize it widens the scope of this PR.
If you feel this change is too broad for the current PR, please feel free to leave it for a separate task.
What do you think?
There was a problem hiding this comment.
The OFFSET_FOR_LEADER_EPOCH is generated automatically so it will be automatically update when we update json file.
If you clean build, I think it will be updated.
short latestVersion = ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(canUseTopic); seems not correct because the parameter is UnstableVersion but it is different things with topicId.
If there is any mistake, please correct me.
There was a problem hiding this comment.
My bad, thanks for correcting me!
Add topic ID support to OffsetForLeader - ServerAndProtocol