Skip to content

Commit 64bb729

Browse files
committed
fix: stalled
1 parent 93f3416 commit 64bb729

File tree

7 files changed

+134
-16
lines changed

7 files changed

+134
-16
lines changed

src/lua/check-stalled.lua

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,26 @@ local now = tonumber(ARGV[2])
88
local gracePeriod = tonumber(ARGV[3]) or 0
99
local maxStalledCount = tonumber(ARGV[4]) or 1
1010

11+
-- Circuit breaker for high concurrency: limit stalled job recovery
12+
-- to prevent excessive Redis load and race conditions
13+
local circuitBreakerKey = ns .. ":stalled:circuit"
14+
local lastCheck = redis.call("GET", circuitBreakerKey)
15+
if lastCheck then
16+
local lastCheckTime = tonumber(lastCheck)
17+
if lastCheckTime and (now - lastCheckTime) < 1000 then
18+
-- Circuit breaker: only check stalled jobs once per second
19+
return {}
20+
end
21+
end
22+
redis.call("SET", circuitBreakerKey, now, "PX", 2000)
23+
24+
-- BullMQ-inspired: Two-phase stalled detection for better accuracy
25+
-- Phase 1: Get potentially stalled jobs (jobs past their deadline)
26+
local potentiallyStalled = redis.call("ZRANGEBYSCORE", processingKey, 0, now - gracePeriod, "LIMIT", 0, 100)
27+
if not potentiallyStalled or #potentiallyStalled == 0 then
28+
return {}
29+
end
30+
1131
local processingKey = ns .. ':processing'
1232
local groupsKey = ns .. ':groups'
1333
local stalledKey = ns .. ':stalled'
@@ -70,7 +90,22 @@ for _, jobId in ipairs(processingJobs) do
7090
-- If job was completed between our snapshot and now, don't re-add it
7191
local stillInProcessing = redis.call('ZSCORE', processingKey, jobId)
7292

73-
if stillInProcessing then
93+
-- Additional safety: check if job status is still 'processing' or 'waiting'
94+
-- If it's 'completed' or 'failed', don't recover it
95+
local currentStatus = redis.call('HGET', jobKey, 'status')
96+
97+
-- CRITICAL: For high concurrency, add extra safety checks
98+
-- Check if job was recently completed (within last 5 seconds)
99+
local finishedOn = redis.call('HGET', jobKey, 'finishedOn')
100+
local recentlyCompleted = false
101+
if finishedOn then
102+
local finishedTime = tonumber(finishedOn)
103+
if finishedTime and (now - finishedTime) < 5000 then
104+
recentlyCompleted = true
105+
end
106+
end
107+
108+
if stillInProcessing and (currentStatus == 'processing' or currentStatus == 'waiting' or not currentStatus) and not recentlyCompleted then
74109
-- Job is confirmed to still be in processing, safe to recover
75110
redis.call('ZREM', processingKey, jobId)
76111

src/lua/get-active-count.lua

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
-- argv: ns
22
local ns = ARGV[1]
3-
local processingKey = ns .. ":processing"
4-
return redis.call("ZCARD", processingKey)
3+
local activeCountKey = ns .. ":count:active"
4+
local count = redis.call("GET", activeCountKey)
5+
return tonumber(count) or 0
56

67

src/lua/reserve-atomic.lua

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ redis.call("HSET", procKey, "groupId", groupId, "deadlineAt", tostring(deadline)
9191
local processingKey = ns .. ":processing"
9292
redis.call("ZADD", processingKey, deadline, id)
9393

94+
-- Increment active counter
95+
local activeCountKey = ns .. ":count:active"
96+
redis.call("INCR", activeCountKey)
97+
9498
local nextHead = redis.call("ZRANGE", gZ, 0, 0, "WITHSCORES")
9599
if nextHead and #nextHead >= 2 then
96100
local nextScore = tonumber(nextHead[2])

src/lua/reserve-batch.lua

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ for i = 1, #groups, 2 do
5252
redis.call("HSET", procKey, "groupId", gid, "deadlineAt", tostring(deadline))
5353
redis.call("ZADD", processingKey, deadline, id)
5454

55+
-- Increment active counter
56+
local activeCountKey = ns .. ":count:active"
57+
redis.call("INCR", activeCountKey)
58+
5559
-- Re-add group if there is a new head job (next oldest)
5660
local nextHead = redis.call("ZRANGE", gZ, 0, 0, "WITHSCORES")
5761
if nextHead and #nextHead >= 2 then

src/lua/reserve.lua

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ redis.call("HSET", procKey, "groupId", chosenGid, "deadlineAt", tostring(deadlin
9090
local processingKey2 = ns .. ":processing"
9191
redis.call("ZADD", processingKey2, deadline, id)
9292

93+
-- Increment active counter
94+
local activeCountKey = ns .. ":count:active"
95+
redis.call("INCR", activeCountKey)
96+
9397
local nextHead = redis.call("ZRANGE", gZ, 0, 0, "WITHSCORES")
9498
if nextHead and #nextHead >= 2 then
9599
local nextScore = tonumber(nextHead[2])

src/queue.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,14 @@ export class Queue<T = any> {
639639
}
640640
}
641641

642+
/**
643+
* Check if a job is currently in processing state
644+
*/
645+
async isJobProcessing(jobId: string): Promise<boolean> {
646+
const score = await this.r.zscore(`${this.ns}:processing`, jobId);
647+
return score !== null;
648+
}
649+
642650
async retry(jobId: string, backoffMs = 0) {
643651
return evalScript<number>(this.r, 'retry', [
644652
this.ns,

src/worker.ts

Lines changed: 75 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -793,6 +793,42 @@ class _Worker<T = any> extends TypedEventEmitter<WorkerEvents<T>> {
793793
);
794794
return nextJob;
795795
}
796+
// CRITICAL FIX: If atomic completion failed, we need to check if the job was actually completed
797+
// The job completion happens BEFORE the next job reservation in the Lua script
798+
// So if it failed, the job might still be completed in Redis
799+
this.logger.debug(
800+
`Atomic completion failed for job ${job.id}, checking if job was completed in Redis`,
801+
);
802+
803+
// Check if the job is still in processing - if not, it was completed
804+
const isStillProcessing = await this.q.isJobProcessing(job.id);
805+
if (!isStillProcessing) {
806+
this.logger.debug(
807+
`Job ${job.id} was completed in Redis despite atomic failure, group ${job.groupId} should be unlocked`,
808+
);
809+
// Job was completed, just ensure group is unlocked for next job
810+
await this.q.complete(job);
811+
} else {
812+
this.logger.warn(
813+
`Job ${job.id} is still in processing after atomic failure - this should not happen`,
814+
);
815+
// Fallback: complete the job normally
816+
await this.q.completeWithMetadata(job, handlerResult, {
817+
processedOn: processedOn || Date.now(),
818+
finishedOn: finishedOn || Date.now(),
819+
attempts: job.attempts,
820+
maxAttempts: job.maxAttempts,
821+
});
822+
}
823+
824+
// CRITICAL: For high concurrency, add a small delay to prevent thundering herd
825+
// This reduces the chance of multiple workers hitting the same race condition
826+
if (Math.random() < 0.1) {
827+
// 10% chance
828+
await new Promise((resolve) =>
829+
setTimeout(resolve, Math.random() * 100),
830+
);
831+
}
796832
} else {
797833
// Use completeWithMetadata for atomic completion with metadata
798834
await this.q.completeWithMetadata(job, handlerResult, {
@@ -808,11 +844,15 @@ class _Worker<T = any> extends TypedEventEmitter<WorkerEvents<T>> {
808844

809845
/**
810846
* Start monitoring for stuck worker conditions
847+
* BullMQ-inspired: More aggressive monitoring for high concurrency
811848
*/
812849
private startStuckDetection(): void {
850+
// More frequent checks for high concurrency environments
851+
const checkInterval = this.concurrency > 10 ? 15000 : 30000; // 15s for high concurrency, 30s otherwise
852+
813853
this.stuckDetectionTimer = setInterval(async () => {
814854
await this.checkForStuckConditions();
815-
}, 30000); // Check every 30 seconds
855+
}, checkInterval);
816856
}
817857

818858
/**
@@ -900,6 +940,7 @@ class _Worker<T = any> extends TypedEventEmitter<WorkerEvents<T>> {
900940

901941
/**
902942
* Check if worker appears to be stuck
943+
* BullMQ-inspired: More sophisticated monitoring for high concurrency
903944
*/
904945
private async checkForStuckConditions(): Promise<void> {
905946
if (this.stopping || this.closed) return;
@@ -909,33 +950,54 @@ class _Worker<T = any> extends TypedEventEmitter<WorkerEvents<T>> {
909950
const timeSinceLastJob =
910951
this.lastJobPickupTime > 0 ? now - this.lastJobPickupTime : now;
911952

953+
// BullMQ-inspired: Adaptive thresholds based on concurrency
954+
const activityThreshold = this.concurrency > 10 ? 30000 : 60000; // 30s for high concurrency, 60s otherwise
955+
const emptyReservesThreshold = this.concurrency > 10 ? 25 : 50; // Lower threshold for high concurrency
956+
const jobStarvationThreshold = this.concurrency > 10 ? 60000 : 120000; // 1min for high concurrency, 2min otherwise
957+
912958
// Check for stuck conditions
913-
if (timeSinceLastActivity > 60000) {
914-
// 1 minute without any activity
959+
if (timeSinceLastActivity > activityThreshold) {
960+
// No activity for threshold time
915961
this.logger.warn(
916-
`STUCK WORKER ALERT: No activity for ${Math.round(timeSinceLastActivity / 1000)}s`,
962+
`STUCK WORKER ALERT: No activity for ${Math.round(timeSinceLastActivity / 1000)}s (concurrency: ${this.concurrency})`,
917963
);
918964
await this.logWorkerStatus();
919965
}
920966

921967
if (
922-
this.blockingStats.consecutiveEmptyReserves > 50 &&
968+
this.blockingStats.consecutiveEmptyReserves > emptyReservesThreshold &&
923969
this.shouldWarnAboutEmptyReserves()
924970
) {
925971
// Too many empty reserves (but queue might have jobs)
926972
this.logger.warn(
927-
`BLOCKING ALERT: ${this.blockingStats.consecutiveEmptyReserves} consecutive empty reserves`,
973+
`BLOCKING ALERT: ${this.blockingStats.consecutiveEmptyReserves} consecutive empty reserves (threshold: ${emptyReservesThreshold})`,
928974
);
929975
await this.logWorkerStatus();
930976
}
931977

932-
if (timeSinceLastJob > 120000 && this.totalJobsProcessed > 0) {
933-
// 2 minutes since last job (but has processed jobs before)
978+
if (
979+
timeSinceLastJob > jobStarvationThreshold &&
980+
this.totalJobsProcessed > 0
981+
) {
982+
// No jobs for threshold time (but has processed jobs before)
934983
this.logger.warn(
935-
`JOB STARVATION ALERT: No jobs for ${Math.round(timeSinceLastJob / 1000)}s`,
984+
`JOB STARVATION ALERT: No jobs for ${Math.round(timeSinceLastJob / 1000)}s (concurrency: ${this.concurrency})`,
936985
);
937986
await this.logWorkerStatus();
938987
}
988+
989+
// BullMQ-inspired: Check for heartbeat failures in high concurrency
990+
if (this.concurrency > 10 && this.jobsInProgress.size > 0) {
991+
const longRunningJobs = Array.from(this.jobsInProgress).filter(
992+
(item) => now - item.ts > (this.q.jobTimeoutMs || 30000) / 2,
993+
);
994+
995+
if (longRunningJobs.length > 0) {
996+
this.logger.warn(
997+
`HEARTBEAT ALERT: ${longRunningJobs.length} jobs running longer than half timeout (${this.q.jobTimeoutMs || 30000}ms) - check for event loop blocking`,
998+
);
999+
}
1000+
}
9391001
}
9401002

9411003
/**
@@ -1177,14 +1239,14 @@ class _Worker<T = any> extends TypedEventEmitter<WorkerEvents<T>> {
11771239
let heartbeatDelayTimer: NodeJS.Timeout | undefined;
11781240

11791241
const startHeartbeat = () => {
1180-
// Extend lock every jobTimeout/2 for more aggressive renewal
1242+
// BullMQ-inspired: Adaptive heartbeat interval based on concurrency
11811243
const minInterval = Math.max(
1182-
this.hbMs,
1244+
this.hbMs, // Use the worker's configured heartbeat interval
11831245
Math.floor((this.q.jobTimeoutMs || 30000) / 2),
11841246
);
11851247

11861248
this.logger.debug(
1187-
`Starting heartbeat for job ${job.id} (interval: ${minInterval}ms)`,
1249+
`Starting heartbeat for job ${job.id} (interval: ${minInterval}ms, concurrency: ${this.concurrency})`,
11881250
);
11891251

11901252
hbTimer = setInterval(async () => {
@@ -1221,7 +1283,7 @@ class _Worker<T = any> extends TypedEventEmitter<WorkerEvents<T>> {
12211283
};
12221284

12231285
try {
1224-
// Smart heartbeat: only start for jobs that might actually timeout
1286+
// BullMQ-inspired: Smart heartbeat with adaptive timing
12251287
// Skip heartbeat for short jobs (< jobTimeoutMs / 3) to reduce Redis load
12261288
const jobTimeout = this.q.jobTimeoutMs || 30000;
12271289
const heartbeatThreshold = jobTimeout / 3;

0 commit comments

Comments
 (0)