Skip to content

Commit 1345e53

Browse files
committed
improve implementation
1 parent 93e4d85 commit 1345e53

File tree

2 files changed

+11
-33
lines changed

2 files changed

+11
-33
lines changed

src/core/queue.ts

Lines changed: 8 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -298,36 +298,13 @@ export abstract class Queue<TJobMap = Record<string, any>, TJobRequest extends B
298298
}
299299

300300
// 4. Execute job (with plugin hooks)
301-
let success = false;
302-
let jobError: unknown;
303-
304-
// We need to track errors for plugins, but handleMessage catches them internally
305-
// So we'll set up an event listener to capture the error
306-
let capturedError: unknown;
307-
const errorListener = (event: QueueEvent) => {
308-
if (event.type === 'afterError' && event.id === message.id) {
309-
capturedError = event.error;
310-
}
311-
};
312-
313-
this.once('afterError', errorListener);
314-
315-
try {
316-
success = await this.handleMessage(message);
317-
jobError = capturedError; // Will be undefined if no error
318-
} catch (error) {
319-
// This shouldn't happen since handleMessage catches errors
320-
jobError = error;
321-
success = false;
322-
} finally {
323-
this.removeListener('afterError', errorListener);
324-
}
301+
const handleResult = await this.handleMessage(message);
325302

326303
// 5. Post-execution hooks
327304
try {
328305
for (const plugin of this.plugins) {
329306
if (plugin.afterJob) {
330-
await plugin.afterJob(message, jobError);
307+
await plugin.afterJob(message, handleResult.success ? undefined : handleResult.error);
331308
}
332309
}
333310
} catch (error) {
@@ -336,12 +313,12 @@ export abstract class Queue<TJobMap = Record<string, any>, TJobRequest extends B
336313
}
337314

338315
// Complete the job if successful, otherwise mark as failed
339-
if (success) {
316+
if (handleResult.success) {
340317
await this.completeJob(message).catch((error) => {
341318
throw QueueError.fromError({ message: 'Error completing job', cause: error });
342319
});
343320
} else {
344-
await this.failJob(message, jobError || new Error('Unknown job failure')).catch((error) => {
321+
await this.failJob(message, handleResult.error).catch((error) => {
345322
throw QueueError.fromError({ message: 'Error failing job', cause: error });
346323
});
347324
}
@@ -361,7 +338,7 @@ export abstract class Queue<TJobMap = Record<string, any>, TJobRequest extends B
361338
* @returns Promise resolving to true if processing succeeded, false if it failed
362339
* @protected
363340
*/
364-
protected async handleMessage(message: QueueMessage): Promise<boolean> {
341+
protected async handleMessage(message: QueueMessage): Promise<{ success: true } | { success: false, error: Error }> {
365342
try {
366343
// Parse the job data (this may have been modified by plugins)
367344
const jobData: JobData = JSON.parse(message.payload);
@@ -391,9 +368,10 @@ export abstract class Queue<TJobMap = Record<string, any>, TJobRequest extends B
391368
const afterEvent: QueueEvent = { type: 'afterExec', id: message.id, name, payload, meta: message.meta, result };
392369
this.emit('afterExec', afterEvent);
393370

394-
return true;
371+
return { success: true };
395372
} catch (error) {
396-
return await this.handleError(message, error);
373+
await this.handleError(message, error);
374+
return { success: false, error: error as Error };
397375
}
398376
}
399377

tests/core/queue.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ describe('Queue', () => {
156156

157157
const result = await queue['handleMessage'](message);
158158

159-
expect(result).toBe(true);
159+
expect(result.success).toBe(true);
160160
expect(handlerSpy).toHaveBeenCalledWith(
161161
expect.objectContaining({
162162
id: '1',
@@ -235,7 +235,7 @@ describe('Queue', () => {
235235

236236
const result = await queue['handleMessage'](message);
237237

238-
expect(result).toBe(false); // Job failed
238+
expect(result.success).toBe(false); // Job failed
239239
expect(errorSpy).toHaveBeenCalledWith(
240240
expect.objectContaining({
241241
type: 'afterError',
@@ -271,7 +271,7 @@ describe('Queue', () => {
271271

272272
const result = await queue['handleMessage'](message);
273273

274-
expect(result).toBe(false); // Job failed
274+
expect(result.success).toBe(false); // Job failed
275275
expect(errorSpy).toHaveBeenCalledWith(
276276
expect.objectContaining({
277277
type: 'afterError',

0 commit comments

Comments
 (0)