Skip to content

feat: add Sentry metrics for extract job tracking#348

Merged
matheus1lva merged 4 commits intomainfrom
feat/log-jobs
Mar 5, 2026
Merged

feat: add Sentry metrics for extract job tracking#348
matheus1lva merged 4 commits intomainfrom
feat/log-jobs

Conversation

@matheus1lva
Copy link
Collaborator

@matheus1lva matheus1lva commented Feb 24, 2026

Summary

  • Adds @sentry/node to packages/lib and initializes Sentry in mq.ts when MQ_INVENTORY=true and SENTRY_DSN is set
  • Emits a mq.extract_job_added counter metric on every job added to the extract-1 queue, with attributes: jobName, address, chainId, fromBlock, toBlock, abiPath
  • Flushes Sentry on beforeExit to ensure all metrics are delivered
  • Duplicate/anomaly detection is handled entirely in Sentry via dashboards and alerts — group by jobName + address + chainId + fromBlock + toBlock and alert where count > 1

Files changed

  • packages/lib/package.json — added @sentry/node
  • packages/lib/mq.ts — Sentry init + metrics counter in add() + flush on beforeExit

Test plan

  • Go to Sentry, open the Kong project and find in one of the section, ie: logs, metrics, etc, a code snippet to setup it and you will find the setry dsn secret, or create your own project and use it.
  • Set up env vars in packages/ingest/.env
# add these two lines
MQ_INVENTORY=true
SENTRY_DSN=<your-dsn-from-sentry>
  • Add a test source to config/abis.local.yaml (or use the existing one)
cron:
  name: AbiFanout
  queue: fanout
  job: abis
  schedule: '*/15 * * * *'
  start: false

abis:
  - abiPath: 'yearn/3/vault'
    sources: [
      { chainId: 1, address: '0xAaaFEa48472f77563961Cdb53291DEDfB46F9040', inceptBlock: 24329199 }
    ]
  • Start Redis and Postgres
make dev
  • Navigate to the ingest pane in tmux and run a fanout
ctrl+b then arrow keys to navigate to ingest pane
select: fanout abis
  • Wait for the fanout to dispatch extract jobs (watch the ingest logs)

  • Go to Sentry > Metrics, filter by the Kong project, search for mq.extract_job_added

Expected: counter metric visible, grouped by jobName/address/chainId/fromBlock/toBlock using SUM as aggregation
  • Verify duplicate detection: in the Sentry Metrics explorer, group by address + chainId + fromBlock + toBlock and check for any combination where the count > 1 — these are duplicate dispatches
image you should see a table like this

@vercel
Copy link

vercel bot commented Feb 24, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
kong Ready Ready Preview, Comment Mar 5, 2026 10:39pm

Request Review

@matheus1lva matheus1lva changed the title feat: add csv logging for job queue feat: add Sentry metrics for extract job tracking Mar 3, 2026
@matheus1lva matheus1lva requested a review from murderteeth March 3, 2026 20:43
Copy link
Collaborator

@murderteeth murderteeth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: Sentry metrics for extract job tracking

The pivot from CSV logging to Sentry metrics is clean — no leftover CSV code. PR description accurately matches the final implementation. Sentry.metrics.count() is confirmed supported in SDK v10.25.0+ (currently in open beta).

Three changes requested below.

Copy link
Collaborator

@murderteeth murderteeth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line comments with suggested changes attached.

Comment on lines +80 to +91
if (MQ_INVENTORY && SENTRY_DSN && queue === 'extract-1') {
Sentry.metrics.count('mq.extract_job_added', 1, {
attributes: {
jobName: job.name,
address: String(data.address ?? data.source?.address ?? ''),
chainId: String(data.chainId ?? data.source?.chainId ?? ''),
fromBlock: String(data.from ?? data.fromBlock ?? ''),
toBlock: String(data.to ?? data.toBlock ?? ''),
abiPath: String(data.abiPath ?? data.abi?.abiPath ?? '')
}
})
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're using Sentry, track all queues instead of just extract-1. Remove the queue === 'extract-1' filter and add queue as an attribute — you can still filter to any specific queue in Sentry dashboards.

Suggested change
if (MQ_INVENTORY && SENTRY_DSN && queue === 'extract-1') {
Sentry.metrics.count('mq.extract_job_added', 1, {
attributes: {
jobName: job.name,
address: String(data.address ?? data.source?.address ?? ''),
chainId: String(data.chainId ?? data.source?.chainId ?? ''),
fromBlock: String(data.from ?? data.fromBlock ?? ''),
toBlock: String(data.to ?? data.toBlock ?? ''),
abiPath: String(data.abiPath ?? data.abi?.abiPath ?? '')
}
})
}
if (MQ_INVENTORY && SENTRY_DSN) {
Sentry.metrics.count('mq.job_added', 1, {
attributes: {
queue,
jobName: job.name,
address: String(data.address ?? data.source?.address ?? ''),
chainId: String(data.chainId ?? data.source?.chainId ?? ''),
fromBlock: String(data.from ?? data.fromBlock ?? ''),
toBlock: String(data.to ?? data.toBlock ?? ''),
abiPath: String(data.abiPath ?? data.abi?.abiPath ?? '')
}
})
}

Duplicate detection still works — group by queue + jobName + address + chainId + fromBlock + toBlock in Sentry.

@@ -1,6 +1,14 @@
import { Queue, Worker } from 'bullmq'
import { Job } from './types'
import * as Sentry from '@sentry/node'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sentry/node loads the full Sentry + OpenTelemetry instrumentation stack on import. Since this only runs when MQ_INVENTORY and SENTRY_DSN are set, consider a dynamic import to avoid the overhead in all other processes that touch mq.ts.

Something like:

let Sentry: typeof import('@sentry/node') | null = null

if (MQ_INVENTORY && SENTRY_DSN) {
  import('@sentry/node').then(s => {
    Sentry = s
    Sentry.init({ dsn: SENTRY_DSN })
  })
}

Then guard usage with if (Sentry) { ... }.

Comment on lines +179 to +183
if (MQ_INVENTORY && SENTRY_DSN) {
process.on('beforeExit', async () => {
await Sentry.flush(5000)
})
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Node.js beforeExit does not await promises from async handlers — Sentry.flush(5000) may not complete before the process exits. Also, beforeExit doesn't fire on SIGTERM/SIGINT (the common shutdown signals).

Consider:

for (const signal of ['SIGTERM', 'SIGINT'] as const) {
  process.on(signal, async () => {
    await Sentry.flush(5000)
    process.exit(0)
  })
}

Or call Sentry.flush() explicitly in the existing down() function since that's already the shutdown path.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Collaborator

@murderteeth murderteeth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm. i should have been more clear on when to flush sentry so i went ahead and added a commit for it

@matheus1lva
Copy link
Collaborator Author

lgtm. i should have been more clear on when to flush sentry so i went ahead and added a commit for it

Fair!

@matheus1lva matheus1lva merged commit 0163968 into main Mar 5, 2026
2 checks passed
@murderteeth murderteeth deleted the feat/log-jobs branch March 5, 2026 23:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants