Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions app/celery.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
import multiprocessing
import os
from datetime import timedelta

import celery_healthcheck
Expand Down Expand Up @@ -115,27 +117,32 @@ def receiver_setup_logging(
pass


def _bind_worker_process_context():
log.local(
worker_id=multiprocessing.current_process().name,
pid=os.getpid(),
)


@signals.worker_process_init.connect
def worker_process_init_setup_logging(**kwargs):
# TODO should add PID and worker ID to the log content, we want the `WARNING/ForkPoolWorker-8` ID in the logs
# https://github.com/celery/celery/discussions/9378
# might need to grab this from? https://pypi.org/project/setproctitle/#history
# log.local(celery_pid=sender.pid)
pass
_bind_worker_process_context()


# tag all jobs with the job name (module path) and uuid for the task
@signals.task_prerun.connect
def on_task_prerun(sender, task_id, task, args, kwargs, **_kwargs):
"https://github.com/hynek/structlog/issues/287#issuecomment-991182054"
log.clear()
_bind_worker_process_context()
log.local(task_id=task_id, task_name=task.name)


# TODO below should be cleaned up and is probably overkill, need to determine exactly which one we should use
@signals.task_postrun.connect
def on_task_postrun(sender, task_id, task, args, kwargs, retval, state, **_kwargs):
log.clear()
_bind_worker_process_context()


@signals.worker_shutdown.connect
Expand Down
Loading