Skip to content

KAFKA-10549: Add topic ID support to OffsetForLeader - ServerAndProtocol (3/N)#21126

Open
TaiJuWu wants to merge 4 commits intoapache:trunkfrom
TaiJuWu:OffsetForLeaders-protocolAndServer
Open

KAFKA-10549: Add topic ID support to OffsetForLeader - ServerAndProtocol (3/N)#21126
TaiJuWu wants to merge 4 commits intoapache:trunkfrom
TaiJuWu:OffsetForLeaders-protocolAndServer

Conversation

@TaiJuWu
Copy link
Copy Markdown
Collaborator

@TaiJuWu TaiJuWu commented Dec 11, 2025

Add topic ID support to OffsetForLeader - ServerAndProtocol

@github-actions github-actions bot added triage PRs from the community core Kafka Broker clients labels Dec 11, 2025
@github-actions
Copy link
Copy Markdown

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

Copy link
Copy Markdown
Contributor

@chickenchickenlove chickenchickenlove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your work on this! 🙇‍♂️
I’m not as familiar with this part of Kafka as you are, but I left a few comments based on my reading.
Apologies in advance if I’ve misunderstood anything or if any of these comments are noisy.

Comment on lines +69 to +71

short latestVersion = canUseTopicId ? (short) 5 : (short) 4;
return new Builder((short) 3, latestVersion, data);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ApiKeys.OFFSET_FOR_LEADER_EPOCH could be updated to treat v5 as the unstable latest version. That way, we can reference the version registered in the enum instead of hard-coding magic numbers like 4/5 here.
For example, we can use

if (canUseTopicId) {
  return new Builder((short) 3, ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), data);
} else {
  return new Builder((short) 3, ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(true), data);
}

If bumping latestVersion is too impactful, would it make sense to follow any existing pattern for retrieving the unstable latest version and use that instead?


if (version >= 5) {
for (OffsetForLeaderEpochRequestData.OffsetForLeaderTopic topic : data.topics()) {
if (topic.topicId() == null || topic.topicId() == Uuid.ZERO_UUID) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parsed UUID object in the API response might not share the same reference as ZERO_UUID. I wonder if comparing them using equals() instead of == was intended here. What do you think?
For example,

if (topic.topicId() == null || topic.topicId().equals(Uuid.ZERO_UUID)) { ... }

else authHelper.partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, knownTopics) { offsetForLeaderTopic =>
// Resolve topic name from topicId if needed for authorization
if (OffsetsForLeaderEpochRequest.useTopicIds(request.header.apiVersion)) {
metadataCache.getTopicName(offsetForLeaderTopic.topicId).get()
Copy link
Copy Markdown
Contributor

@chickenchickenlove chickenchickenlove Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider race condition in terms of view update here?
Because we check for existence first and retrieve the value later.

    // Separate topics with unknown topic IDs when using version 5+
    val (knownTopics, unknownTopicIdTopics) = if (OffsetsForLeaderEpochRequest.useTopicIds(request.header.apiVersion)) {
      topics.partition { offsetForLeaderTopic =>
        metadataCache.getTopicName(offsetForLeaderTopic.topicId).isPresent
      }
    } else {
      (topics, Seq.empty[OffsetForLeaderTopic])
    }

Consider the following sequence:

  1. Thread A: Executes the partitioning logic. metadataCache.getTopicName(id).isPresent() returns true for UUID-123, so it's added to knownTopics.
  2. Thread B: Handles a metadata update (e.g., topic deletion via Admin API). It removes UUID-123 from metadataCache.
  3. Thread A: Proceeds to the authorization block and calls metadataCache.getTopicName(id).get().

As a result of this operation, NoSuchElementException will be thrown because the topic no longer exists in the cache, leading to an uncaught exception.

What do you think?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right so I use a addition variable to store such information, and other comments are addressed, PTAL.

@github-actions github-actions bot removed needs-attention triage PRs from the community labels Dec 31, 2025
data.setTopics(epochsByPartition);
return new Builder((short) 3, ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), data);

short latestVersion = canUseTopicId ? ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion() : (short) 4;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TaiJuWu
Thank you for the update! 🙇‍♂️

However, ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion() seems to return 4.
In that case, wouldn’t ApiMessageType.OFFSET_FOR_LEADER_EPOCH also need to be updated?

// As-Is
OFFSET_FOR_LEADER_EPOCH("OffsetForLeaderEpoch", ... , (short) 4, ..., false),

// To-Be
OFFSET_FOR_LEADER_EPOCH("OffsetForLeaderEpoch", ... , (short) 5, ..., true),

Also, if we make this update, I think the logic could be simplified as follows:

short latestVersion = ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(canUseTopic);

I checked and found about 5 references to ApiMessageType.OFFSET_FOR_LEADER_EPOCH in the IDE.
While bumping the version in ApiMessageType.OFFSET_FOR_LEADER_EPOCH would align the codebase to a single source of truth, I realize it widens the scope of this PR.

If you feel this change is too broad for the current PR, please feel free to leave it for a separate task.

What do you think?

Copy link
Copy Markdown
Collaborator Author

@TaiJuWu TaiJuWu Jan 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The OFFSET_FOR_LEADER_EPOCH is generated automatically so it will be automatically update when we update json file.
If you clean build, I think it will be updated.

short latestVersion = ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(canUseTopic); seems not correct because the parameter is UnstableVersion but it is different things with topicId.

If there is any mistake, please correct me.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, thanks for correcting me!

Copy link
Copy Markdown
Contributor

@chickenchickenlove chickenchickenlove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your hard work!
Looks good to me 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

clients core Kafka Broker

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants