Skip to content

Commit bd546a0

Browse files
committed
fix: refactor, cleanu, improve concurrency
1 parent 68e3f28 commit bd546a0

16 files changed

+282
-354
lines changed

example/index.ts

Lines changed: 46 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -20,49 +20,30 @@ const run = async () => {
2020
jobShouldTakeThisLongMs: number;
2121
groupId: string;
2222
}>({
23+
logger: true,
2324
redis: redis,
2425
namespace: 'example',
25-
keepCompleted: 100,
26+
keepCompleted: 100000,
2627
keepFailed: 100,
2728
});
2829

29-
const cronQueue = new Queue<{
30-
id: string;
31-
jobShouldTakeThisLongMs: number;
32-
groupId: string;
33-
}>({
34-
redis: redis,
35-
namespace: 'cron',
36-
keepCompleted: 100,
37-
keepFailed: 100,
38-
});
30+
const CONCURRENCY = 4;
31+
const WORKERS = 4;
3932

40-
const worker = new Worker({
41-
concurrency: 2,
42-
queue: queue,
43-
async handler(job) {
44-
await sleep(job.data.jobShouldTakeThisLongMs);
45-
if (Math.random() > 0.9) {
46-
throw new Error('test error');
47-
}
48-
return `basic worker completed job ${job.id}`;
49-
},
50-
});
51-
52-
const cronWorker = new Worker({
53-
queue: cronQueue,
54-
// Run cleanup/scheduler frequently so repeating jobs trigger on time
55-
cleanupIntervalMs: 1000,
56-
async handler(job) {
57-
await sleep(job.data.jobShouldTakeThisLongMs);
58-
if (Math.random() > 0.9) {
59-
throw new Error('test error');
60-
}
61-
return `cron worker completed job ${job.id}`;
62-
},
63-
});
33+
const createWorker = (index: number, concurrency: number) => {
34+
return new Worker({
35+
concurrency: concurrency,
36+
queue: queue,
37+
async handler(job) {
38+
console.log(`processing job worker ${index}`, job.id);
39+
await sleep(job.data.jobShouldTakeThisLongMs);
40+
},
41+
});
42+
};
6443

65-
const workers = [worker, cronWorker];
44+
const workers = Array.from({ length: WORKERS }, (_, i) =>
45+
createWorker(i, CONCURRENCY),
46+
);
6647

6748
workers.forEach((worker) => {
6849
worker.on('completed', (job) => {
@@ -106,58 +87,43 @@ const run = async () => {
10687
const serverAdapter = new HonoAdapter(serveStatic);
10788

10889
createBullBoard({
109-
queues: [
110-
new BullBoardGroupMQAdapter(queue),
111-
new BullBoardGroupMQAdapter(cronQueue),
112-
],
90+
queues: [new BullBoardGroupMQAdapter(queue)],
11391
serverAdapter,
11492
});
11593

11694
const basePath = '/ui';
11795
serverAdapter.setBasePath(basePath);
11896
app.route(basePath, serverAdapter.registerPlugin());
11997

120-
// Add a cron job that runs every 5 seconds
121-
await cronQueue.removeRepeatingJob('groupId', { every: 5000 });
122-
await cronQueue.add({
123-
data: {
124-
id: Math.random().toString(36).substring(2, 15),
125-
jobShouldTakeThisLongMs: Math.random() * 1000,
126-
groupId: 'groupId',
127-
},
128-
groupId: 'groupId',
129-
repeat: { every: 5000 }, // every 5 seconds
130-
});
131-
13298
// Add basic job every 2.5 seconds
13399
const groups = [
134100
...new Array(25).fill(null).map((_, i) => `groupId_${i + 1}`),
135101
];
136-
setInterval(async () => {
137-
const groupId = groups[Math.floor(Math.random() * groups.length)]!;
138-
await queue.add({
139-
data: {
140-
id: Math.random().toString(36).substring(2, 15),
141-
jobShouldTakeThisLongMs: Math.random() * 1000,
142-
groupId,
143-
},
144-
groupId,
145-
});
146-
}, 1_000);
102+
// setInterval(async () => {
103+
// const groupId = groups[Math.floor(Math.random() * groups.length)]!;
104+
// await queue.add({
105+
// data: {
106+
// id: Math.random().toString(36).substring(2, 15),
107+
// jobShouldTakeThisLongMs: Math.random() * 1000,
108+
// groupId,
109+
// },
110+
// groupId,
111+
// });
112+
// }, 1_000);
147113

148114
app.get('/add', async (c) => {
149115
const groups = [
150-
...new Array(25).fill(null).map((_, i) => `groupId_${i + 1}`),
116+
...new Array(1000).fill(null).map((_, i) => `groupId_${i + 1}`),
151117
];
152118
const events = [
153-
...new Array(50).fill(null).map((_, i) => `event_${i + 1}`),
119+
...new Array(200).fill(null).map((_, i) => `event_${i + 1}`),
154120
];
155121
for (const event of events) {
156122
const groupId = groups[Math.floor(Math.random() * groups.length)]!;
157123
await queue.add({
158124
data: {
159125
id: event,
160-
jobShouldTakeThisLongMs: Math.random() * 500 + 500,
126+
jobShouldTakeThisLongMs: Math.random() * 40 + 20,
161127
groupId,
162128
},
163129
groupId,
@@ -167,6 +133,20 @@ const run = async () => {
167133
return c.json({ message: `${events.length} jobs added` });
168134
});
169135

136+
app.get('/add-single', async (c) => {
137+
const groupId = `groupId_${Math.floor(Math.random() * 100) + 1}`;
138+
const event = `event_${Math.floor(Math.random() * 200) + 1}`;
139+
await queue.add({
140+
data: {
141+
id: event,
142+
jobShouldTakeThisLongMs: Math.random() * 40 + 20,
143+
groupId,
144+
},
145+
groupId,
146+
});
147+
return c.json({ message: 'job added' });
148+
});
149+
170150
showRoutes(app);
171151

172152
serve({ fetch: app.fetch, port: 3000 }, ({ address, port }) => {

src/lua/ack.lua

Lines changed: 0 additions & 69 deletions
This file was deleted.

src/lua/check-stalled.lua

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,18 @@ local circuitBreakerKey = ns .. ":stalled:circuit"
1414
local lastCheck = redis.call("GET", circuitBreakerKey)
1515
if lastCheck then
1616
local lastCheckTime = tonumber(lastCheck)
17-
if lastCheckTime and (now - lastCheckTime) < 1000 then
18-
-- Circuit breaker: only check stalled jobs once per second
17+
-- More aggressive circuit breaker for high concurrency: 2 seconds instead of 1
18+
local circuitBreakerInterval = 2000
19+
if lastCheckTime and (now - lastCheckTime) < circuitBreakerInterval then
20+
-- Circuit breaker: only check stalled jobs once per 2 seconds
1921
return {}
2022
end
2123
end
22-
redis.call("SET", circuitBreakerKey, now, "PX", 2000)
24+
redis.call("SET", circuitBreakerKey, now, "PX", 3000)
25+
26+
local processingKey = ns .. ':processing'
27+
local groupsKey = ns .. ':groups'
28+
local stalledKey = ns .. ':stalled'
2329

2430
-- BullMQ-inspired: Two-phase stalled detection for better accuracy
2531
-- Phase 1: Get potentially stalled jobs (jobs past their deadline)
@@ -28,10 +34,6 @@ if not potentiallyStalled or #potentiallyStalled == 0 then
2834
return {}
2935
end
3036

31-
local processingKey = ns .. ':processing'
32-
local groupsKey = ns .. ':groups'
33-
local stalledKey = ns .. ':stalled'
34-
3537
local results = {}
3638

3739
-- Get all jobs in processing state
@@ -57,9 +59,7 @@ for _, jobId in ipairs(processingJobs) do
5759
-- Remove from processing
5860
redis.call('ZREM', processingKey, jobId)
5961

60-
-- Decrement active counter (job is being failed)
61-
local activeCountKey = ns .. ':count:active'
62-
redis.call('DECR', activeCountKey)
62+
-- No counter operations - use ZCARD for counts
6363

6464
-- Remove from group if it's there
6565
local groupKey = ns .. ':g:' .. groupId
@@ -109,11 +109,7 @@ for _, jobId in ipairs(processingJobs) do
109109
-- Job is confirmed to still be in processing, safe to recover
110110
redis.call('ZREM', processingKey, jobId)
111111

112-
-- Update counters: active -> waiting
113-
local activeCountKey = ns .. ':count:active'
114-
local waitingCountKey = ns .. ':count:waiting'
115-
redis.call('DECR', activeCountKey)
116-
redis.call('INCR', waitingCountKey)
112+
-- No counter operations - use ZCARD for counts
117113

118114
-- Release group lock if this job holds it
119115
local lockKey = ns .. ':lock:' .. groupId

src/lua/cleanup.lua

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,7 @@ for _, jobId in ipairs(expiredJobs) do
3232
redis.call("DEL", procKey)
3333
redis.call("ZREM", processingKey, jobId)
3434

35-
-- Update counters: active -> waiting (job recovered from stalled state)
36-
local activeCountKey = ns .. ":count:active"
37-
local waitingCountKey = ns .. ":count:waiting"
38-
redis.call("DECR", activeCountKey)
39-
redis.call("INCR", waitingCountKey)
35+
-- No counter operations - use ZCARD for counts
4036

4137
cleaned = cleaned + 1
4238
end

src/lua/complete-and-reserve-next-with-metadata.lua

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@ local vt = tonumber(ARGV[14])
2020
redis.call("DEL", ns .. ":processing:" .. completedJobId)
2121
redis.call("ZREM", ns .. ":processing", completedJobId)
2222

23-
-- Decrement active counter
24-
local activeCountKey = ns .. ":count:active"
25-
redis.call("DECR", activeCountKey)
23+
-- No counter operations - use ZCARD for counts
2624

2725
-- Part 2: Record job metadata (completed or failed)
2826
local jobKey = ns .. ":job:" .. completedJobId
@@ -121,8 +119,7 @@ redis.call("HSET", procKey, "groupId", groupId, "deadlineAt", tostring(deadline)
121119
local processingKey = ns .. ":processing"
122120
redis.call("ZADD", processingKey, deadline, id)
123121

124-
-- Increment active counter for new job (completed job was already decremented above)
125-
redis.call("INCR", activeCountKey)
122+
-- No counter operations - use ZCARD for counts
126123

127124
local nextHead = redis.call("ZRANGE", gZ, 0, 0, "WITHSCORES")
128125
if nextHead and #nextHead >= 2 then

src/lua/complete-with-metadata.lua

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@ local maxAttempts = ARGV[12]
1818
redis.call("DEL", ns .. ":processing:" .. jobId)
1919
redis.call("ZREM", ns .. ":processing", jobId)
2020

21-
-- Decrement active counter
22-
local activeCountKey = ns .. ":count:active"
23-
redis.call("DECR", activeCountKey)
21+
-- No counter operations - use ZCARD for counts
2422

2523
local lockKey = ns .. ":lock:" .. gid
2624
local val = redis.call("GET", lockKey)

src/lua/dead-letter.lua

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,7 @@ redis.call("ZREM", gZ, jobId)
1212
redis.call("DEL", ns .. ":processing:" .. jobId)
1313
redis.call("ZREM", ns .. ":processing", jobId)
1414

15-
-- Decrement active counter (job is being dead-lettered, not re-queued)
16-
local activeCountKey = ns .. ":count:active"
17-
redis.call("DECR", activeCountKey)
15+
-- No counter operations - use ZCARD for counts
1816

1917
-- Remove idempotence mapping to allow reuse
2018
redis.call("DEL", ns .. ":unique:" .. jobId)

src/lua/flush-epoch.lua

Lines changed: 0 additions & 54 deletions
This file was deleted.

0 commit comments

Comments
 (0)