Skip to content

Fix Redis delayed message promotion#852

Closed
iamdinhthuan wants to merge 1 commit into
Bogdanp:masterfrom
iamdinhthuan:fix-redis-delayed-promotion-431
Closed

Fix Redis delayed message promotion#852
iamdinhthuan wants to merge 1 commit into
Bogdanp:masterfrom
iamdinhthuan:fix-redis-delayed-promotion-431

Conversation

@iamdinhthuan
Copy link
Copy Markdown

@iamdinhthuan iamdinhthuan commented May 14, 2026

Fixes #431.

Redis delayed messages were promoted by enqueueing a copy to the canonical queue and then acking the original delay-queue message as two separate operations. If the original delay message was requeued before the ack completed, the same logical message could be promoted more than once.

This PR adds a Redis-specific consumer hook that promotes due delayed messages atomically in the Redis Lua dispatch script. The generic worker path still falls back to the existing enqueue-then-ack behavior for consumers that do not implement the hook.

Verification:

  • .venv/bin/python -m pytest tests/test_worker.py::test_delayed_messages_use_consumer_promotion_hook tests/test_redis.py::test_redis_consumer_promotes_delayed_messages_atomically -q
  • .venv/bin/python -m pytest tests/test_redis.py tests/test_worker.py -q with Redis 7 running locally via Docker: 23 passed in 66.07s
  • .venv/bin/python -m black --check dramatiq/broker.py dramatiq/worker.py dramatiq/brokers/redis.py tests/test_worker.py tests/test_redis.py
  • .venv/bin/python -m isort -c dramatiq/broker.py dramatiq/worker.py dramatiq/brokers/redis.py tests/test_worker.py tests/test_redis.py
  • .venv/bin/python -m flake8 dramatiq/broker.py dramatiq/worker.py dramatiq/brokers/redis.py tests/test_worker.py tests/test_redis.py
  • .venv/bin/python -m mypy dramatiq/broker.py dramatiq/worker.py dramatiq/brokers/redis.py tests/test_worker.py tests/test_redis.py
  • Standalone fakeredis smoke check for the new Lua command.

@iamdinhthuan iamdinhthuan closed this by deleting the head repository May 18, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Delayed messages are being duplicated on work queue which leads to one message being consumed and processed by many WorkerThreads

1 participant