Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 115 additions & 13 deletions apps/workers/workers/assetPreprocessingWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,38 +70,115 @@ export class AssetPreprocessingWorker {
}
}

async function readImageText(buffer: Buffer) {
function getAbortError(signal: AbortSignal): Error {
try {
signal.throwIfAborted();
} catch (error) {
return error as Error;
}

const abortError = new Error("The operation was aborted");
(abortError as { name: string }).name = "AbortError";
return abortError;
}

function abortPromise(signal: AbortSignal): Promise<never> {
return new Promise((_, reject) => {
const onAbort = () => {
reject(getAbortError(signal));
};

if (signal.aborted) {
onAbort();
return;
}

signal.addEventListener("abort", onAbort, { once: true });
});
}

function isAbortError(error: unknown): error is Error {
return error instanceof Error && error.name === "AbortError";
}

async function readImageText(buffer: Buffer, abortSignal: AbortSignal) {
abortSignal.throwIfAborted();
if (serverConfig.ocr.langs.length == 1 && serverConfig.ocr.langs[0] == "") {
return null;
}
const worker = await createWorker(serverConfig.ocr.langs, undefined, {
cachePath: serverConfig.ocr.cacheDir ?? os.tmpdir(),
});
const onAbort = () => {
void worker.terminate();
};
abortSignal.addEventListener("abort", onAbort, { once: true });
try {
const ret = await worker.recognize(buffer);
const ret = await Promise.race([
worker.recognize(buffer),
abortPromise(abortSignal),
]);
Comment on lines +117 to +120

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Prevent abort race from causing unhandled rejection

In readImageText the abort handling races worker.recognize against abortPromise, but the abort promise is left pending when OCR wins the race. If the job’s abort signal fires later (e.g., the queue timeout or a manual cancellation after OCR completed), that pending promise will reject with no catch handler, producing an unhandled rejection and potentially taking down the worker process under Node’s default unhandledRejection behavior; the same pattern also exists in the PDF screenshot race. The abort promise needs to be cancelled or observed so late aborts don’t crash the worker.

Useful? React with 👍 / 👎.

abortSignal.throwIfAborted();
if (ret.data.confidence <= serverConfig.ocr.confidenceThreshold) {
return null;
}
return ret.data.text;
} finally {
abortSignal.removeEventListener("abort", onAbort);
await worker.terminate();
}
}

async function readPDFText(buffer: Buffer): Promise<{
async function readPDFText(
buffer: Buffer,
abortSignal: AbortSignal,
): Promise<{
text: string;
metadata: Record<string, object>;
}> {
abortSignal.throwIfAborted();
return new Promise((resolve, reject) => {
const pdfParser = new PDFParser(null, true);
pdfParser.on("pdfParser_dataError", reject);
const parserCleanup = pdfParser as unknown as {
removeAllListeners?: () => void;
destroy?: () => void;
};
const cleanup = () => {
parserCleanup.removeAllListeners?.();
abortSignal.removeEventListener("abort", onAbort);
};
const onAbort = () => {
cleanup();
if (typeof parserCleanup.destroy === "function") {
try {
parserCleanup.destroy();
} catch (error) {
logger.warn(
"[assetPreprocessing] Failed to destroy pdf parser on abort:",
error,
);
}
}
reject(getAbortError(abortSignal));
};
pdfParser.on("pdfParser_dataError", (error) => {
cleanup();
reject(error);
});
pdfParser.on("pdfParser_dataReady", (pdfData) => {
cleanup();
resolve({
text: pdfParser.getRawTextContent(),
metadata: pdfData.Meta,
});
});
pdfParser.parseBuffer(buffer);
abortSignal.addEventListener("abort", onAbort, { once: true });
try {
pdfParser.parseBuffer(buffer);
} catch (error) {
cleanup();
reject(error);
}
});
}

Expand All @@ -110,7 +187,9 @@ export async function extractAndSavePDFScreenshot(
asset: Buffer,
bookmark: NonNullable<Awaited<ReturnType<typeof getBookmark>>>,
isFixMode: boolean,
abortSignal: AbortSignal,
): Promise<boolean> {
abortSignal.throwIfAborted();
{
const alreadyHasScreenshot =
bookmark.assets.find(
Expand All @@ -127,16 +206,20 @@ export async function extractAndSavePDFScreenshot(
`[assetPreprocessing][${jobId}] Attempting to generate PDF screenshot for bookmarkId: ${bookmark.id}`,
);
try {
abortSignal.throwIfAborted();
/**
* If you encountered any issues with this library, make sure you have ghostscript and graphicsmagick installed following this URL
* https://github.com/yakovmeister/pdf2image/blob/HEAD/docs/gm-installation.md
*/
const screenshot = await fromBuffer(asset, {
density: 100,
quality: 100,
format: "png",
preserveAspectRatio: true,
})(1, { responseType: "buffer" });
const screenshot = await Promise.race([
fromBuffer(asset, {
density: 100,
quality: 100,
format: "png",
preserveAspectRatio: true,
})(1, { responseType: "buffer" }),
abortPromise(abortSignal),
]);

if (!screenshot.buffer) {
logger.error(
Expand All @@ -146,13 +229,15 @@ export async function extractAndSavePDFScreenshot(
}

// Check storage quota before inserting
abortSignal.throwIfAborted();
const quotaApproved = await QuotaService.checkStorageQuota(
db,
bookmark.userId,
screenshot.buffer.byteLength,
);

// Store the screenshot
abortSignal.throwIfAborted();
const assetId = newAssetId();
const fileName = "screenshot.png";
const contentType = "image/png";
Expand Down Expand Up @@ -183,6 +268,9 @@ export async function extractAndSavePDFScreenshot(
);
return true;
} catch (error) {
if (isAbortError(error)) {
throw error;
}
if (error instanceof StorageQuotaError) {
logger.warn(
`[assetPreprocessing][${jobId}] Skipping PDF screenshot due to quota exceeded: ${error.message}`,
Expand All @@ -201,7 +289,9 @@ async function extractAndSaveImageText(
asset: Buffer,
bookmark: NonNullable<Awaited<ReturnType<typeof getBookmark>>>,
isFixMode: boolean,
abortSignal: AbortSignal,
): Promise<boolean> {
abortSignal.throwIfAborted();
{
const alreadyHasText = !!bookmark.asset.content;
if (alreadyHasText && isFixMode) {
Expand All @@ -216,8 +306,11 @@ async function extractAndSaveImageText(
`[assetPreprocessing][${jobId}] Attempting to extract text from image.`,
);
try {
imageText = await readImageText(asset);
imageText = await readImageText(asset, abortSignal);
} catch (e) {
if (isAbortError(e)) {
throw e;
}
logger.error(
`[assetPreprocessing][${jobId}] Failed to read image text: ${e}`,
);
Expand All @@ -244,7 +337,9 @@ async function extractAndSavePDFText(
asset: Buffer,
bookmark: NonNullable<Awaited<ReturnType<typeof getBookmark>>>,
isFixMode: boolean,
abortSignal: AbortSignal,
): Promise<boolean> {
abortSignal.throwIfAborted();
{
const alreadyHasText = !!bookmark.asset.content;
if (alreadyHasText && isFixMode) {
Expand All @@ -257,7 +352,7 @@ async function extractAndSavePDFText(
logger.info(
`[assetPreprocessing][${jobId}] Attempting to extract text from pdf.`,
);
const pdfParse = await readPDFText(asset);
const pdfParse = await readPDFText(asset, abortSignal);
if (!pdfParse?.text) {
throw new Error(
`[assetPreprocessing][${jobId}] PDF text is empty. Please make sure that the PDF includes text and not just images.`,
Expand Down Expand Up @@ -291,6 +386,7 @@ async function run(req: DequeuedJob<AssetPreprocessingRequest>) {
const jobId = req.id;
const bookmarkId = req.data.bookmarkId;

req.abortSignal.throwIfAborted();
const bookmark = await db.query.bookmarks.findFirst({
where: eq(bookmarks.id, bookmarkId),
with: {
Expand All @@ -299,6 +395,8 @@ async function run(req: DequeuedJob<AssetPreprocessingRequest>) {
},
});

req.abortSignal.throwIfAborted();

logger.info(
`[assetPreprocessing][${jobId}] Starting an asset preprocessing job for bookmark with id "${bookmarkId}"`,
);
Expand All @@ -318,6 +416,7 @@ async function run(req: DequeuedJob<AssetPreprocessingRequest>) {
assetId: bookmark.asset.assetId,
});

req.abortSignal.throwIfAborted();
if (!asset) {
throw new Error(
`[assetPreprocessing][${jobId}] AssetId ${bookmark.asset.assetId} for bookmark ${bookmarkId} not found`,
Expand All @@ -332,6 +431,7 @@ async function run(req: DequeuedJob<AssetPreprocessingRequest>) {
asset,
bookmark,
isFixMode,
req.abortSignal,
);
anythingChanged ||= extractedText;
break;
Expand All @@ -342,12 +442,14 @@ async function run(req: DequeuedJob<AssetPreprocessingRequest>) {
asset,
bookmark,
isFixMode,
req.abortSignal,
);
const extractedScreenshot = await extractAndSavePDFScreenshot(
jobId,
asset,
bookmark,
isFixMode,
req.abortSignal,
);
anythingChanged ||= extractedText || extractedScreenshot;
break;
Expand Down