Conversation
🤖 GitHub commentsJust comment with:
|
|
This pull request does not have a backport label. Could you fix it @yaauie? 🙏
|
8a4ed50 to
657fcfe
Compare
| } | ||
| notEmpty.signalAll(); | ||
| } | ||
| this.unreadCount -= unreadCount; |
There was a problem hiding this comment.
| this.unreadCount -= unreadCount; | |
| this.unreadCount += unreadCount; |
I believe this should be +=, otherwise unreadCount will be incorrect:
> test_dir = "/tmp/logstash-pq-test-#{Time.now.to_i}"
=> "/tmp/logstash-pq-test-1770976075"
> Dir.mkdir(test_dir)
=> 0
> settings = Java::org.logstash.ackedqueue.SettingsImpl.fileSettingsBuilder(test_dir).elementClass(Java::org.logstash.Event.java_class).capacity(10000).build()
=> #<Java::OrgLogstashAckedqueue::SettingsImpl:0x56357e52>
> queue = Java::org.logstash.ackedqueue.Queue.new(settings); queue.open
[2026-02-13T09:48:18,226][INFO ][org.logstash.ackedqueue.QueueUpgrade] No PQ version file found, upgrading to PQ v2.
=> nil
> 100.times { |i| queue.write(Java::org.logstash.Event.new()) }; queue.unreadCount
=> 100
> batch = queue.readBatch(10, 100); batch.unread
=> nil
> queue.getUnreadCount
=> 80 # should be 100 ⚠️ I don't think it would cause data loss but it could exert backpressure unnecessarily.
| } | ||
| } | ||
|
|
||
| public void unread(final long firstUnreadSeqNum, final int unreadCount) throws IOException { |
There was a problem hiding this comment.
oh english is fun..let's talk about unread and unread...
the way I understand this is:
- the word "unread" (un-reed) in the method name would be an infinitive form denoting the action of unreading
- the second argument
int unreadCountwould be (un-reedCount) in the sense that we're asking to unread this amount of events - finally, in
this.unreadCount, the "unread" is an adjective and thus read un-red (what a tongue-twister).
So the words used for unreadCount (the argument) and unreadCount (the this. variable) are homographs, and the former ends up "soft-shadowing" the latter...
All this to say.. can we rename the 2nd argument to just eventCount, and reflect the change to the rest of the method? 😅
| public void unread(final long firstUnreadSeqNum, final int unreadCount) throws IOException { | |
| public void unread(final long firstUnreadSeqNum, final int eventCount) throws IOException { |
| } | ||
| } | ||
|
|
||
| public void unread(final long firstUnreadSeqNum, final int unreadCount) throws IOException { |
There was a problem hiding this comment.
should this be idempontent?
> 100.times { |i| queue.write(Java::org.logstash.Event.new()) }; queue.unreadCount
=> 100
> batch = queue.readBatch(10, 100); 10.times { batch.unread }
=> 10
> queue.getUnreadCount
=> -10
I don't think we want unread to be called more than once, but I wonder if there's any harm in this extra safety net.
💚 Build Succeeded
History
|
Release notes
[rn:skip]What does this PR do?
Adds
Batch#unread, which is a way for a worker to refute ownership of a batch so that its events can be picked up by another worker. This is a pre-requisite for in-place crashed pipeline recovery (#18534)The PQ
Pagehas long kept track of which events on the page had been acked using aBitSet, but it only kept track of the "high water mark" for the first unread, based on the assumption that once an event has been read from a page the only outcomes were the events being acknowledged XOR the pipeline crashing.If we want for events to be emitted again without re-opening the queue, we need to port the same
BitSetlogic to reads, marking events as they are read and unmarking them when they are unread.There are many assumptions across the PQ's implementation that assume a batch will always contain a contiguous sequence of events, and the ability to un-read events means that two consecutive calls to
Page#readare no longer guaranteed to emit a single contiguous sequence (e.g., if an un-read has occurred between calls toPage#read, the second set of events can include events from before the first). We refactor theQueue#readPageBatchto ensure only one call toPage#readis made.Review Note:
This PR is best reviewed one commit at a time. It contains a number of small zero-net-change refactors to clean up the existing code before meaningfully changing the behavior.
Why is it important/What is the impact to the user?
This is part of the work needed to make a pipeline recoverable in the event of a worker crash without closing the queue (and therefore without needing to close the pipeline's inputs). It allows a crashing worker to return its batch of events to the queue so that when a new generation of workers is started, the events from that batch can be picked up and reprocessed.
Checklist
[ ] I have made corresponding changes to the documentation[ ] I have made corresponding change to the default configuration files (and/or docker env variables)