Skip to content

Commit 7b77c63

Browse files
rosaclaude
andcommitted
Fix race condition between job enqueue and concurrency unblock
This addresses #456. There is a race condition in the concurrency control mechanism where a job that finishes and tries to unblock the next blocked execution can miss a `BlockedExecution` that is being created concurrently. This causes the blocked job to remain stuck until the │ `ConcurrencyMaintenance` periodic task runs (potentially minutes later). │ It happens as follows: 1. Job A is running (semaphore value=0) 2. Job B enqueue starts: reads semaphore (value=0, no row lock) → decides to block 3. Job A finishes: `Semaphore.signal` → `UPDATE` value to 1 (succeeds immediately since no lock held) 4. Job A: `BlockedExecution.release_one` → `SELECT` finds nothing (Job B's `BlockedExecution` not committed yet) 5. Job B enqueue commits: `BlockedExecution` now exists but nobody will unblock it The root cause is that `Semaphore::Proxy#wait` doesn't lock the semaphore row when checking the semaphore. This allows the concurrent `signal` to complete before the enqueue transaction commits, creating a window where the `BlockedExecution` is invisible. To fix, we lock the semaphore row with `FOR UPDATE` during the wait check so that the enqueue transaction holds the lock from the check through `BlockedExecution` creation and commit. This forces a concurrent signal `UPDATE` to wait, guaranteeing the `BlockedExecution` is visible when release_one runs. This shouldn't introduce any dead locks, as there's no new circular dependencies introduced by these two: - Enqueue path: locks `Semaphore` row → `INSERT`s `BlockedExecution` (no lock on existing rows) - `release_one` path: locks `BlockedExecution` row (`SKIP LOCKED`) → locks `Semaphore` row (via wait in release) Co-Authored-By: Claude Opus 4.6 <[email protected]>
1 parent 552f7d0 commit 7b77c63

File tree

2 files changed

+127
-1
lines changed

2 files changed

+127
-1
lines changed

app/models/solid_queue/semaphore.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def initialize(job)
4040
end
4141

4242
def wait
43-
if semaphore = Semaphore.find_by(key: key)
43+
if semaphore = Semaphore.lock.find_by(key: key)
4444
semaphore.value > 0 && attempt_decrement
4545
else
4646
attempt_creation
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
# frozen_string_literal: true
2+
3+
require "test_helper"
4+
5+
class SolidQueue::SemaphoreTest < ActiveSupport::TestCase
6+
self.use_transactional_tests = false
7+
8+
class NonOverlappingJob < ApplicationJob
9+
limits_concurrency key: ->(job_result, **) { job_result }
10+
11+
def perform(job_result)
12+
end
13+
end
14+
15+
setup do
16+
@result = JobResult.create!(queue_name: "default")
17+
end
18+
19+
test "wait acquires a row lock that blocks concurrent signal" do
20+
skip_on_sqlite
21+
22+
# Enqueue first job to create semaphore with value=0
23+
NonOverlappingJob.perform_later(@result)
24+
concurrency_key = SolidQueue::Job.last.concurrency_key
25+
assert_equal 0, SolidQueue::Semaphore.find_by(key: concurrency_key).value
26+
27+
lock_held = Concurrent::Event.new
28+
29+
# Background thread: holds a FOR UPDATE lock on the semaphore row
30+
locker = Thread.new do
31+
SolidQueue::Record.connection_pool.with_connection do
32+
SolidQueue::Record.transaction do
33+
SolidQueue::Semaphore.where(key: concurrency_key).lock.first
34+
lock_held.set
35+
sleep 1
36+
end
37+
end
38+
end
39+
40+
lock_held.wait(5)
41+
sleep 0.1
42+
43+
# Main thread: this UPDATE should block until the locker's transaction commits
44+
start = monotonic_now
45+
SolidQueue::Semaphore.where(key: concurrency_key).update_all("value = value + 1")
46+
elapsed = monotonic_now - start
47+
48+
locker.join(5)
49+
50+
assert elapsed >= 0.5, "UPDATE should have been blocked by FOR UPDATE lock (took #{elapsed.round(3)}s)"
51+
assert_equal 1, SolidQueue::Semaphore.find_by(key: concurrency_key).value
52+
end
53+
54+
test "blocked execution created during enqueue is visible to release_one after signal" do
55+
skip_on_sqlite
56+
57+
# Enqueue first job to create semaphore with value=0
58+
NonOverlappingJob.perform_later(@result)
59+
job_a = SolidQueue::Job.last
60+
concurrency_key = job_a.concurrency_key
61+
assert_equal 0, SolidQueue::Semaphore.find_by(key: concurrency_key).value
62+
63+
lock_held = Concurrent::Event.new
64+
65+
# Background thread: simulates the enqueue path for a second job.
66+
# Locks the semaphore row (as our fix does), creates a BlockedExecution,
67+
# then holds the transaction open to simulate the window where the race occurs.
68+
enqueue_thread = Thread.new do
69+
SolidQueue::Record.connection_pool.with_connection do
70+
SolidQueue::Record.transaction do
71+
# Lock the semaphore (same as Semaphore::Proxy#wait with our fix)
72+
SolidQueue::Semaphore.where(key: concurrency_key).lock.first
73+
74+
# Create job and blocked execution bypassing after_create callbacks
75+
# to avoid re-entering Semaphore.wait
76+
uuid = SecureRandom.uuid
77+
SolidQueue::Job.insert({
78+
queue_name: "default",
79+
class_name: "SolidQueue::SemaphoreTest::NonOverlappingJob",
80+
concurrency_key: concurrency_key,
81+
active_job_id: uuid,
82+
arguments: "{}",
83+
scheduled_at: Time.current
84+
})
85+
job_b_id = SolidQueue::Job.where(active_job_id: uuid).pick(:id)
86+
87+
SolidQueue::BlockedExecution.insert({
88+
job_id: job_b_id,
89+
queue_name: "default",
90+
concurrency_key: concurrency_key,
91+
expires_at: SolidQueue.default_concurrency_control_period.from_now,
92+
priority: 0
93+
})
94+
95+
lock_held.set
96+
97+
# Hold the transaction open so the signal path must wait
98+
sleep 1
99+
end
100+
end
101+
end
102+
103+
lock_held.wait(5)
104+
sleep 0.1
105+
106+
# Main thread: simulates job_a finishing — signal + release_one.
107+
# The signal UPDATE will block until the enqueue transaction commits,
108+
# guaranteeing the BlockedExecution is visible to release_one.
109+
assert SolidQueue::Semaphore.signal(job_a)
110+
assert SolidQueue::BlockedExecution.release_one(concurrency_key),
111+
"release_one should find the BlockedExecution created by the concurrent enqueue"
112+
113+
enqueue_thread.join(5)
114+
115+
assert_equal 0, SolidQueue::BlockedExecution.where(concurrency_key: concurrency_key).count
116+
end
117+
118+
private
119+
def skip_on_sqlite
120+
skip "Row-level locking not supported on SQLite" if SolidQueue::Record.connection.adapter_name.downcase.include?("sqlite")
121+
end
122+
123+
def monotonic_now
124+
Process.clock_gettime(Process::CLOCK_MONOTONIC)
125+
end
126+
end

0 commit comments

Comments
 (0)