Skip to content

Pq replay unacked#18766

Open
yaauie wants to merge 7 commits intoelastic:mainfrom
yaauie:pq-replay-unacked
Open

Pq replay unacked#18766
yaauie wants to merge 7 commits intoelastic:mainfrom
yaauie:pq-replay-unacked

Conversation

@yaauie
Copy link
Member

@yaauie yaauie commented Feb 11, 2026

Release notes

[rn:skip]

What does this PR do?

Adds Batch#unread, which is a way for a worker to refute ownership of a batch so that its events can be picked up by another worker. This is a pre-requisite for in-place crashed pipeline recovery (#18534)

The PQ Page has long kept track of which events on the page had been acked using a BitSet, but it only kept track of the "high water mark" for the first unread, based on the assumption that once an event has been read from a page the only outcomes were the events being acknowledged XOR the pipeline crashing.

If we want for events to be emitted again without re-opening the queue, we need to port the same BitSet logic to reads, marking events as they are read and unmarking them when they are unread.

There are many assumptions across the PQ's implementation that assume a batch will always contain a contiguous sequence of events, and the ability to un-read events means that two consecutive calls to Page#read are no longer guaranteed to emit a single contiguous sequence (e.g., if an un-read has occurred between calls to Page#read, the second set of events can include events from before the first). We refactor the Queue#readPageBatch to ensure only one call to Page#read is made.

Review Note:

This PR is best reviewed one commit at a time. It contains a number of small zero-net-change refactors to clean up the existing code before meaningfully changing the behavior.

Why is it important/What is the impact to the user?

This is part of the work needed to make a pipeline recoverable in the event of a worker crash without closing the queue (and therefore without needing to close the pipeline's inputs). It allows a crashing worker to return its batch of events to the queue so that when a new generation of workers is started, the events from that batch can be picked up and reprocessed.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • [ ] I have made corresponding changes to the documentation
  • [ ] I have made corresponding change to the default configuration files (and/or docker env variables)
  • I have added tests that prove my fix is effective or that my feature works

@github-actions
Copy link
Contributor

🤖 GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)
  • run exhaustive tests : Run the exhaustive tests Buildkite pipeline.

@mergify
Copy link
Contributor

mergify bot commented Feb 11, 2026

This pull request does not have a backport label. Could you fix it @yaauie? 🙏
To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-8./d is the label to automatically backport to the 8./d branch. /d is the digit.
  • If no backport is necessary, please add the backport-skip label

}
notEmpty.signalAll();
}
this.unreadCount -= unreadCount;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this.unreadCount -= unreadCount;
this.unreadCount += unreadCount;

I believe this should be +=, otherwise unreadCount will be incorrect:

> test_dir = "/tmp/logstash-pq-test-#{Time.now.to_i}"
=> "/tmp/logstash-pq-test-1770976075"
> Dir.mkdir(test_dir)
=> 0
> settings = Java::org.logstash.ackedqueue.SettingsImpl.fileSettingsBuilder(test_dir).elementClass(Java::org.logstash.Event.java_class).capacity(10000).build()
=> #<Java::OrgLogstashAckedqueue::SettingsImpl:0x56357e52>
> queue = Java::org.logstash.ackedqueue.Queue.new(settings); queue.open
[2026-02-13T09:48:18,226][INFO ][org.logstash.ackedqueue.QueueUpgrade] No PQ version file found, upgrading to PQ v2.
=> nil
> 100.times { |i| queue.write(Java::org.logstash.Event.new()) }; queue.unreadCount
=> 100
> batch = queue.readBatch(10, 100); batch.unread
=> nil
> queue.getUnreadCount
=> 80 # should be 100 ⚠️ 

I don't think it would cause data loss but it could exert backpressure unnecessarily.

}
}

public void unread(final long firstUnreadSeqNum, final int unreadCount) throws IOException {
Copy link
Member

@jsvd jsvd Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh english is fun..let's talk about unread and unread...

the way I understand this is:

  1. the word "unread" (un-reed) in the method name would be an infinitive form denoting the action of unreading
  2. the second argument int unreadCount would be (un-reedCount) in the sense that we're asking to unread this amount of events
  3. finally, in this.unreadCount, the "unread" is an adjective and thus read un-red (what a tongue-twister).

So the words used for unreadCount (the argument) and unreadCount (the this. variable) are homographs, and the former ends up "soft-shadowing" the latter...

All this to say.. can we rename the 2nd argument to just eventCount, and reflect the change to the rest of the method? 😅

Suggested change
public void unread(final long firstUnreadSeqNum, final int unreadCount) throws IOException {
public void unread(final long firstUnreadSeqNum, final int eventCount) throws IOException {

}
}

public void unread(final long firstUnreadSeqNum, final int unreadCount) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be idempontent?

> 100.times { |i| queue.write(Java::org.logstash.Event.new()) }; queue.unreadCount
=> 100
> batch = queue.readBatch(10, 100); 10.times { batch.unread }
=> 10
> queue.getUnreadCount
=> -10

I don't think we want unread to be called more than once, but I wonder if there's any harm in this extra safety net.

@elasticmachine
Copy link

💚 Build Succeeded

History

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants