Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions edx_event_bus_redis/internal/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ class EventConsumptionException(Exception):

class RedisEventConsumer(EventBusConsumer):
"""
Construct consumer for the given topic and group. The consumer can then
emit events coming from the topic.
Construct consumer for the given topic and group.

The consumer can then emit events coming from the topic.

Note that the topic should be specified here *without* the optional environment prefix.

Expand All @@ -94,6 +95,7 @@ class RedisEventConsumer(EventBusConsumer):

def __init__(self, topic, group_id, consumer_name, last_read_msg_id=None,
check_backlog=False, claim_msgs_older_than=None):
"""Initialize consumer."""
self.topic = topic
self.group_id = group_id
self.consumer_name = consumer_name
Expand All @@ -109,7 +111,7 @@ def __init__(self, topic, group_id, consumer_name, last_read_msg_id=None,

def _create_db(self) -> Database:
"""
Create a connection to redis
Create a connection to redis.
"""
config = load_common_settings()
if config is None:
Expand All @@ -118,12 +120,11 @@ def _create_db(self) -> Database:

def _create_consumer(self, db: Database, full_topic: str) -> ConsumerGroupStream:
"""
Create a redis stream consumer group and a consumer for the given topic
Create a redis stream consumer group and a consumer for the given topic.

Returns
ConsumerGroupStream
"""

# It is possible to track multiple streams using single consumer group.
# But for simplicity, we are only supporting one stream till the need arises.
consumer = db.consumer_group(self.group_id, [full_topic], consumer=self.consumer_name)
Expand Down Expand Up @@ -170,7 +171,6 @@ def _consume_indefinitely(self):
"""
Consume events from a topic in an infinite loop.
"""

if not REDIS_CONSUMERS_ENABLED.is_enabled():
logger.error("Redis consumers not enabled, exiting.")
return
Expand Down Expand Up @@ -290,7 +290,7 @@ def emit_signals_from_message(self, msg: RedisMessage):

def _check_receiver_results(self, send_results: list, signal: OpenEdxPublicSignal):
"""
Raises exception if any of the receivers produced an exception.
Raise exception if any of the receivers produced an exception.

Arguments:
send_results: Output of ``send_events``, a list of ``(receiver, response)`` tuples.
Expand Down Expand Up @@ -425,7 +425,7 @@ def _add_message_monitoring(self, run_context, message, error=None):

def _is_fatal_redis_error(self, error: Optional[Exception]) -> bool:
"""
Returns True if error is a RedisConnectionError, False otherwise.
Return True if error is a RedisConnectionError, False otherwise.

The redis.ConnectionError can be considered fatal as it is the base exception for errors from which consumer
might not be able to recover like AuthenticationError, AuthorizationError, MaxConnectionsError etc.
Expand Down
5 changes: 3 additions & 2 deletions edx_event_bus_redis/internal/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ class RedisMessage(NamedTuple):
"""
Redis message wrapper with ability to parse to & from redis msg tuple.
"""

topic: str
event_data: bytes
event_metadata: EventsMetadata
msg_id: Optional[bytes] = None

def to_binary_dict(self) -> Dict[bytes, bytes]:
"""
Converts instance to dictionary with binary key value pairs.
Convert instance to dictionary with binary key value pairs.
"""
data = get_headers_from_metadata(self.event_metadata)
data[b"event_data"] = self.event_data
Expand All @@ -37,7 +38,7 @@ def to_binary_dict(self) -> Dict[bytes, bytes]:
@classmethod
def parse(cls, msg: tuple, topic: str):
"""
Takes message from redis stream and parses it to return an instance of RedisMessage.
Take message from redis stream and parses it to return an instance of RedisMessage.

Args:
msg: Tuple with 1st item being msg_id and 2nd data from message.
Expand Down
15 changes: 8 additions & 7 deletions edx_event_bus_redis/internal/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@


class EventProductionException(Exception):
""" An exception we can check for when errors occur in event production code. """
"""An exception we can check for when errors occur in event production code."""


def record_producing_error(error, context):
"""
Record an error in producing an event to both the monitoring system and the regular logs
Record an error in producing an event to both the monitoring system and the regular logs.

Arguments:
error: The exception or error raised during producing
Expand All @@ -54,8 +54,9 @@ def record_producing_error(error, context):
@attr.s(kw_only=True, repr=False)
class ProducingContext:
"""
Wrapper class to allow us to link a call to produce() with the on_event_deliver callback
Wrapper class to allow us to link a call to produce() with the on_event_deliver callback.
"""

full_topic = attr.ib(type=str, default=None)
event_key = attr.ib(type=str, default=None)
signal = attr.ib(type=OpenEdxPublicSignal, default=None)
Expand All @@ -65,15 +66,15 @@ class ProducingContext:
event_metadata = attr.ib(type=EventsMetadata, default=None)

def __repr__(self):
"""Create a logging-friendly string"""
"""Create a logging-friendly string."""
return " ".join([f"{key}={value!r}" for key, value in attr.asdict(self, recurse=False).items()])

def on_event_deliver(self, redis_msg_id):
"""
Simple method for debugging event production
Debug event production.

Arguments:
msg_id: Stream event msg_id.
redis_msg_id: Stream event msg_id.
"""
# TODO: see if below ADR can be moved to openedx_events.
# Audit logging on success.
Expand All @@ -98,6 +99,7 @@ class RedisEventProducer(EventBusProducer):
"""

def __init__(self, client):
"""Initialize the RedisEventProducer with a walrus client."""
self.client = client

def send(
Expand All @@ -115,7 +117,6 @@ def send(
event_data: The event data (kwargs) sent to the signal
event_metadata: An EventsMetadata object with all the metadata necessary for the CloudEvent spec
"""

# keep track of the initial arguments for recreating the event in the logs if necessary later
context = ProducingContext(
signal=signal,
Expand Down
1 change: 1 addition & 0 deletions edx_event_bus_redis/internal/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Test module for edx_event_bus_redis package."""
14 changes: 9 additions & 5 deletions edx_event_bus_redis/internal/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
Utilities for converting between message headers and EventsMetadata
Utilities for converting between message headers and EventsMetadata.
"""

import logging
Expand Down Expand Up @@ -50,12 +50,14 @@ def decode(value: bytes) -> str:

class MessageHeader:
"""
Utility class for converting between message headers and EventsMetadata objects
Utility class for converting between message headers and EventsMetadata objects.
"""

_mapping = {}
instances = []

def __init__(self, message_header_key, event_metadata_field, to_metadata=None, from_metadata=None):
"""Initialize a MessageHeader."""
self.message_header_key = message_header_key
self.event_metadata_field = event_metadata_field
self.to_metadata = to_metadata or (lambda x: x)
Expand All @@ -79,12 +81,12 @@ def __init__(self, message_header_key, event_metadata_field, to_metadata=None, f

def get_metadata_from_headers(headers: dict):
"""
Create an EventsMetadata object from the headers of a Redis message
Create an EventsMetadata object from the headers of a Redis message.

Arguments
Arguments:
headers: The list of headers returned from calling message.headers() on a consumed message

Returns
Returns:
An instance of EventsMetadata with the parameters from the headers. Any fields missing from the headers
are set to the defaults of the EventsMetadata class
"""
Expand Down Expand Up @@ -124,7 +126,9 @@ class Timeout:

Some redis calls don't have a timeout parameter, so this can be used to enforce a timeout.
"""

def __init__(self, timeout_seconds):
"""Initialize the Timeout context manager with a timeout in seconds."""
self.timeout_seconds = timeout_seconds

def __enter__(self):
Expand Down
1 change: 1 addition & 0 deletions edx_event_bus_redis/management/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Management commands for edx_event_bus_redis."""
4 changes: 3 additions & 1 deletion edx_event_bus_redis/management/commands/produce_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class Command(BaseCommand):
"""
Management command to produce a test event to the event bus.
"""

help = """
Produce a single test event with the given data to the specified Redis topic.

Expand All @@ -33,7 +34,7 @@ class Command(BaseCommand):
"""

def add_arguments(self, parser):

"""Add command line arguments for producing an event."""
parser.add_argument(
'--signal', nargs=1, required=True,
help="Module:variable path to an OpenEdxPublicSignal instance",
Expand All @@ -52,6 +53,7 @@ def add_arguments(self, parser):
)

def handle(self, *args, **options):
"""Handle the command to produce an event."""
try:
signal = import_string(options['signal'][0])
event_type = signal.event_type
Expand Down
8 changes: 2 additions & 6 deletions requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,6 @@ pycparser==3.0
# via
# -r requirements/quality.txt
# cffi
pydocstyle==6.3.0
# via -r requirements/quality.txt
pygments==2.19.2
# via
# -r requirements/quality.txt
Expand Down Expand Up @@ -261,15 +259,13 @@ redis==7.1.0
# via
# -r requirements/quality.txt
# walrus
ruff==0.14.14
# via -r requirements/quality.txt
six==1.17.0
# via
# -r requirements/quality.txt
# edx-ccx-keys
# edx-lint
snowballstemmer==3.0.1
# via
# -r requirements/quality.txt
# pydocstyle
sqlparse==0.5.5
# via
# -r requirements/quality.txt
Expand Down
2 changes: 1 addition & 1 deletion requirements/quality.in
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@
edx-lint # edX pylint rules and plugins
isort # to standardize order of imports
pycodestyle # PEP 8 compliance validation
pydocstyle # PEP 257 compliance validation
ruff # A modern Python code linter and formatter.
6 changes: 2 additions & 4 deletions requirements/quality.txt
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ pycparser==3.0
# via
# -r requirements/test.txt
# cffi
pydocstyle==6.3.0
# via -r requirements/quality.in
pygments==2.19.2
# via
# -r requirements/test.txt
Expand Down Expand Up @@ -177,13 +175,13 @@ redis==7.1.0
# via
# -r requirements/test.txt
# walrus
ruff==0.14.14
# via -r requirements/quality.in
six==1.17.0
# via
# -r requirements/test.txt
# edx-ccx-keys
# edx-lint
snowballstemmer==3.0.1
# via pydocstyle
sqlparse==0.5.5
# via
# -r requirements/test.txt
Expand Down
28 changes: 28 additions & 0 deletions ruff.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
extend-exclude = ["migrations"]

[lint]

exclude = ["test_*.py"]

select = [
"D", # pydocstyle
]

ignore = [
"D101", # Missing docstring in public class
"D200", # One-line docstring should fit on one line
"D203", # 1 blank line required before class docstring
"D212", # Multi-line docstring summary should start at the first line
"D215", # Section underline is over-indented
"D404", # First word of the docstring should not be "This"
"D405", # Section name should be properly capitalized
"D406", # Section name should end with a newline
"D407", # Missing dashed underline after section
"D408", # Section underline should be in the line following the section's name
"D409", # Section underline should match the length of its name
"D410", # Missing blank line after section
"D411", # Missing blank line before section
"D412", # No blank lines allowed between a section header and its content
"D413", # Missing blank line after last section
"D414", # Section has no content
]
22 changes: 1 addition & 21 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,6 @@ envlist = py{311,312}-django{42,52}, docs, quality
exclude = .git,.tox,migrations
max-line-length = 120

[pydocstyle]
; D101 = Missing docstring in public class
; D200 = One-line docstring should fit on one line with quotes
; D203 = 1 blank line required before class docstring
; D212 = Multi-line docstring summary should start at the first line
; D215 = Section underline is over-indented (numpy style)
; D404 = First word of the docstring should not be This (numpy style)
; D405 = Section name should be properly capitalized (numpy style)
; D406 = Section name should end with a newline (numpy style)
; D407 = Missing dashed underline after section (numpy style)
; D408 = Section underline should be in the line following the section's name (numpy style)
; D409 = Section underline should match the length of its name (numpy style)
; D410 = Missing blank line after section (numpy style)
; D411 = Missing blank line before section (numpy style)
; D412 = No blank lines allowed between a section header and its content (numpy style)
; D413 = Missing blank line after last section (numpy style)
; D414 = Section has no content (numpy style)
ignore = D101,D200,D203,D212,D215,D404,D405,D406,D407,D408,D409,D410,D411,D412,D413,D414
match-dir = (?!migrations)

[pytest]
DJANGO_SETTINGS_MODULE = test_settings
addopts = --cov edx_event_bus_redis --cov-report term-missing --cov-report xml
Expand Down Expand Up @@ -72,7 +52,7 @@ commands =
pylint edx_event_bus_redis tests test_utils manage.py setup.py
rm tests/__init__.py
pycodestyle edx_event_bus_redis tests manage.py setup.py
pydocstyle edx_event_bus_redis tests manage.py setup.py
ruff check edx_event_bus_redis tests manage.py setup.py
isort --check-only --diff tests test_utils edx_event_bus_redis manage.py setup.py test_settings.py
make selfcheck

Expand Down