immediate retry metadata refresh#1786
Conversation
…ce errors Extend the set of Kafka error codes that signal a stale metadata condition and trigger a metadata refresh via metaFlushSignal. Previously only NOT_LEADER_FOR_PARTITION (6) and LEADER_NOT_AVAILABLE (5) were handled in fetch, and only those two in produce. Fetch now handles all ten retriable error codes (3, 5, 6, 8, 9, 56, 74, 75, 78, 100). Produce now handles KAFKA_STORAGE_ERROR (56) in addition to the existing two. Remove dead doFlush method from KafkaClientProduceFactory. Fix metaFlushSignal leak in KafkaClientMetaFactory: clear the signal on all three application-stream close paths (end, abort, reset) so a restarted meta stream can re-register after reconnect. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
| if (metaFlushSignal != null) | ||
| { | ||
| metaFlushSignal.accept(traceId); | ||
| } |
There was a problem hiding this comment.
Perhaps it would be simpler to have clientRoute.metaFlushSignal be a no-op instead of null when not set, then just always call it unconditionally instead of testing for null each time.
| factories.put(CLIENT, new KafkaClientFactory(config, context, this::supplyClientRoute)); | ||
| factories.put(CACHE_SERVER, new KafkaCacheServerFactory(config, context, supplyCache, | ||
| this::supplyCacheRoute)); | ||
| this::supplyCacheRoute, this::supplyClientRoute)); |
There was a problem hiding this comment.
Why does cache server need client route?
I don't think we can require this as they could be on different workers as they are different bindings in the pipeline.
In general, client-to-client coordination is okay, i.e. in the same kafka client factory, and cache-server-to-cache-server is also okay, i.e. in the same kafka cache server factory, but coordination across factories is outside the abstraction and also not thread safe.
| public final Int2ObjectHashMap<Int2IntHashMap> partitions; | ||
|
|
||
| public volatile long metaInitialId; | ||
| public volatile LongConsumer metaFlushSignal; |
There was a problem hiding this comment.
I'd prefer we avoid the term signal here, as it already has a defined meaning.
|
|
||
| state = KafkaState.openedInitial(state); | ||
|
|
||
| clientRoute.metaFlushSignal = this::doMetaFanoutFlushIfNecessary; |
There was a problem hiding this comment.
This cross-pollination from client to cache should be removed, as mentioned in other feedback.
This is triggering a FLUSH back to the meta client anyway, so the reaction can instead be fully self-contained there with the same effect.
… cache server Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…sh trigger ERROR_UNKNOWN_TOPIC_OR_PART (3) can indicate a topic that genuinely does not exist, not just stale metadata. Keeping it in the retriable set would cause repeated metadata refreshes with no benefit when the topic is permanently absent. The remaining 9 codes are unambiguously leadership or epoch errors where a refresh is the correct response. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
No description provided.