Skip to content

immediate retry metadata refresh#1786

Open
ankitk-me wants to merge 12 commits into
aklivity:developfrom
ankitk-me:metadata-retry
Open

immediate retry metadata refresh#1786
ankitk-me wants to merge 12 commits into
aklivity:developfrom
ankitk-me:metadata-retry

Conversation

@ankitk-me
Copy link
Copy Markdown
Contributor

No description provided.

@ankitk-me ankitk-me changed the title initial commit immediate retry metadata refresh May 21, 2026
@ankitk-me ankitk-me requested a review from jfallows May 21, 2026 16:34
@ankitk-me ankitk-me self-assigned this May 21, 2026
ankitk-me and others added 2 commits May 22, 2026 09:35
…ce errors

Extend the set of Kafka error codes that signal a stale metadata condition and
trigger a metadata refresh via metaFlushSignal. Previously only
NOT_LEADER_FOR_PARTITION (6) and LEADER_NOT_AVAILABLE (5) were handled in
fetch, and only those two in produce.

Fetch now handles all ten retriable error codes (3, 5, 6, 8, 9, 56, 74, 75,
78, 100). Produce now handles KAFKA_STORAGE_ERROR (56) in addition to the
existing two. Remove dead doFlush method from KafkaClientProduceFactory.

Fix metaFlushSignal leak in KafkaClientMetaFactory: clear the signal on all
three application-stream close paths (end, abort, reset) so a restarted meta
stream can re-register after reconnect.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Comment on lines +2274 to +2277
if (metaFlushSignal != null)
{
metaFlushSignal.accept(traceId);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it would be simpler to have clientRoute.metaFlushSignal be a no-op instead of null when not set, then just always call it unconditionally instead of testing for null each time.

factories.put(CLIENT, new KafkaClientFactory(config, context, this::supplyClientRoute));
factories.put(CACHE_SERVER, new KafkaCacheServerFactory(config, context, supplyCache,
this::supplyCacheRoute));
this::supplyCacheRoute, this::supplyClientRoute));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does cache server need client route?

I don't think we can require this as they could be on different workers as they are different bindings in the pipeline.

In general, client-to-client coordination is okay, i.e. in the same kafka client factory, and cache-server-to-cache-server is also okay, i.e. in the same kafka cache server factory, but coordination across factories is outside the abstraction and also not thread safe.

public final Int2ObjectHashMap<Int2IntHashMap> partitions;

public volatile long metaInitialId;
public volatile LongConsumer metaFlushSignal;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer we avoid the term signal here, as it already has a defined meaning.


state = KafkaState.openedInitial(state);

clientRoute.metaFlushSignal = this::doMetaFanoutFlushIfNecessary;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cross-pollination from client to cache should be removed, as mentioned in other feedback.

This is triggering a FLUSH back to the meta client anyway, so the reaction can instead be fully self-contained there with the same effect.

ankitk-me and others added 2 commits May 26, 2026 09:19
… cache server

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@ankitk-me ankitk-me marked this pull request as ready for review May 26, 2026 04:04
ankitk-me and others added 5 commits May 26, 2026 09:44
…sh trigger

ERROR_UNKNOWN_TOPIC_OR_PART (3) can indicate a topic that genuinely does not
exist, not just stale metadata. Keeping it in the retriable set would cause
repeated metadata refreshes with no benefit when the topic is permanently
absent. The remaining 9 codes are unambiguously leadership or epoch errors
where a refresh is the correct response.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants