Conversation
|
@vaquarkhan let us know when it's ready for review |
|
Its ready for review please review pull request its waiting for approval @skrawcz |
|
@vaquarkhan any screenshots of things in action? Also we'll need to add to docs/ for this -- mind adding that part? |
|
let me create google doc for documentation will add all screenshots |
| class EventDrivenBackendMixin(abc.ABC): | ||
| """Mixin for backends that support event-driven updates via SQS. | ||
|
|
||
| BIP-0042: This mixin enables backends to receive real-time notifications | ||
| from SQS instead of polling S3 for new files. | ||
| """ | ||
|
|
||
| @abc.abstractmethod | ||
| async def start_sqs_consumer(self): | ||
| """Start the SQS consumer for event-driven tracking. | ||
|
|
||
| This method should run indefinitely, processing S3 event notifications | ||
| from the configured SQS queue. | ||
| """ | ||
| pass |
There was a problem hiding this comment.
Let's remove SQS from this and make it more generic in naming.
There was a problem hiding this comment.
PR Review: Cloud Native AWS (BIP-042)
Thanks for the contribution! The overall direction of adding cloud-native AWS support is valuable. However, there are several concerns that should be addressed before merging — primarily around vendor neutrality, code quality, and architecture. As an Apache project, keeping the core framework vendor-neutral is critical.
Summary
| # | Severity | Issue |
|---|---|---|
| 1 | Blocker | start_sqs_consumer in the core backend mixin leaks AWS vendor concepts into the abstraction layer |
| 2 | Blocker | terraform/ at repo root with .terraform.lock.hcl committed — should be under examples/ or contrib/ |
| 3 | High | Bedrock integration is orthogonal to S3/SQS tracking — should be a separate PR to reduce scope |
| 4 | High | BedrockAction / BedrockStreamingAction have heavy code duplication (~90% identical __init__ and request-building) |
| 5 | High | No graceful shutdown for the SQS consumer asyncio task — CancelledError not handled |
| 6 | Medium | tracking_mode should be a Python Enum, not magic strings "POLLING" / "SQS" |
| 7 | Medium | SQS consumer only processes Records[0], silently ignoring additional records in a message is this correct behavior? |
| 8 | Medium | Tests are effectively no-ops when boto3 isn't installed (all wrapped in try/except ImportError) |
| 9 | Low | Import style (List, Dict vs list, dict), logging uses f-strings instead of %s-style |
See inline comments for details.
burr/tracking/server/backend.py
Outdated
|
|
||
|
|
||
| class EventDrivenBackendMixin(abc.ABC): | ||
| """Mixin for backends that support event-driven updates via SQS. |
There was a problem hiding this comment.
Blocker — Vendor neutrality: This mixin is part of the core backend abstraction layer, but it leaks AWS-specific concepts. The class name is fine, but start_sqs_consumer names a specific AWS service, and the docstrings reference SQS throughout.
For an Apache project, this should be vendor-neutral:
- Rename
start_sqs_consumer()→start_event_consumer() - Update docstrings to describe generic event-driven updates, not SQS specifically
- The
is_event_driven()method name is already good
This would allow a GCP Pub/Sub or Azure Event Grid implementation to use the same mixin without the naming being misleading.
| prior_snapshots_to_keep: int = 5 | ||
| # BIP-0042: Event-driven tracking settings | ||
| tracking_mode: str = "POLLING" # "POLLING" or "SQS" - POLLING is default for backward compatibility | ||
| sqs_queue_url: Optional[str] = None |
There was a problem hiding this comment.
Medium — Use an Enum instead of magic strings: "POLLING" and "SQS" are magic strings that are error-prone (e.g., typo "Polling" would silently fall through). Consider:
from enum import Enum
class TrackingMode(str, Enum):
POLLING = "POLLING"
EVENT_DRIVEN = "EVENT_DRIVEN"Also, "SQS" is an AWS-specific name. Since this setting is in the S3 backend (already AWS-specific), it's less critical than the core mixin, but "EVENT_DRIVEN" would still be more descriptive of the behavior rather than the implementation.
burr/tracking/server/s3/backend.py
Outdated
|
|
||
| async with self._session.create_client("sqs", region_name=self._sqs_region) as sqs_client: | ||
| while True: | ||
| try: |
There was a problem hiding this comment.
High — No graceful shutdown: This while True loop runs as a fire-and-forget asyncio.create_task() (see run.py). When the server shuts down, this task will be cancelled, but asyncio.CancelledError is not handled. The outer except Exception won't catch it (in Python 3.9+, CancelledError inherits from BaseException).
Recommendation:
try:
while True:
...
except asyncio.CancelledError:
logger.info("SQS consumer shutting down")
raiseAlso, the task reference in run.py is not stored anywhere — it could be garbage collected. Store it and cancel it in the shutdown path of the lifespan context manager.
| logger.info(f"Indexed S3 event: {s3_key}") | ||
| except Exception as e: | ||
| logger.error(f"Failed to handle S3 event {s3_key}: {e}") | ||
|
|
There was a problem hiding this comment.
Medium — Silent error swallowing: Catching Exception and only logging means data loss can go completely unnoticed. If indexing fails for a file, that tracking data is permanently skipped.
Consider either:
- Re-raising after logging (letting the message go back to SQS for retry, eventually to DLQ)
- At minimum, emitting a metric/counter so operators can alert on indexing failures
Currently the message gets deleted from SQS even if _handle_s3_event fails (the delete_message call is outside this try/except), so failed events are lost forever.
burr/integrations/bedrock.py
Outdated
|
|
||
|
|
||
| class BedrockStreamingAction(StreamingAction): | ||
| """Streaming variant of BedrockAction using Converse Stream API.""" |
There was a problem hiding this comment.
High — Code duplication: BedrockStreamingAction duplicates ~90% of BedrockAction:
- Identical
__init__(all parameters, client creation, instance variable assignments) - Identical
reads,writes,nameproperties - Identical request-building logic in
stream_runvsrun_and_update
Consider extracting a shared base class:
class _BedrockBase:
def __init__(self, model_id, input_mapper, reads, writes, ...):
# shared init
def _build_request(self, state: State) -> Dict[str, Any]:
# shared request building
class BedrockAction(_BedrockBase, SingleStepAction): ...
class BedrockStreamingAction(_BedrockBase, StreamingAction): ...There was a problem hiding this comment.
and also move this to a different PR
burr/integrations/bedrock.py
Outdated
|
|
||
| config = Config(retries={"max_attempts": max_retries, "mode": "adaptive"}) | ||
| self._client = boto3.client("bedrock-runtime", region_name=region, config=config) | ||
|
|
There was a problem hiding this comment.
Medium — Client created at construction time: The boto3 client is created eagerly in __init__, which means:
- The action can't be serialized/pickled (relevant for distributed execution)
- Testing requires mocking at construction time or having valid AWS credentials
- Credentials are captured at construction, not at invocation
Consider lazy client creation (create on first run_and_update call) or accepting a client/session as a parameter for testability.
There was a problem hiding this comment.
I think any client should be passed in (i.e. injected) -- that way it's easier for tests etc.
| # BIP-0042: Start SQS consumer for event-driven tracking when configured | ||
| if isinstance(backend, EventDrivenBackendMixin) and backend.is_event_driven(): | ||
| asyncio.create_task(backend.start_sqs_consumer()) | ||
| global initialized |
There was a problem hiding this comment.
High — Task reference not stored: asyncio.create_task() returns a task reference that isn't assigned to anything. This means:
- The task could be garbage collected
- There's no way to cancel it during shutdown
- Exceptions in the task will be silently lost
Store the task and cancel it in the shutdown path:
sqs_task = None
if isinstance(backend, EventDrivenBackendMixin) and backend.is_event_driven():
sqs_task = asyncio.create_task(backend.start_event_consumer())
yield
if sqs_task:
sqs_task.cancel()
try:
await sqs_task
except asyncio.CancelledError:
passAlso, import asyncio should be a top-level import, not inside the function.
terraform/.terraform.lock.hcl
Outdated
| @@ -0,0 +1,45 @@ | |||
| # This file is maintained automatically by "terraform init". | |||
| # Manual edits may be lost in future updates. | |||
There was a problem hiding this comment.
Blocker — Should not be committed: .terraform.lock.hcl is a generated file specific to the contributor's Terraform version and platform. It should be added to .gitignore (or the terraform .gitignore), not committed to the repo.
More broadly: the entire terraform/ directory (including dev.tfvars, prod.tfvars with hardcoded us-east-1) should live under examples/deployment/aws/ or contrib/aws/ rather than at the repo root. Burr is a Python framework library — shipping infrastructure-as-code at the top level makes the project appear AWS-first rather than vendor-neutral.
|
|
||
| assert BedrockAction is not None | ||
| except ImportError as e: | ||
| assert "bedrock" in str(e).lower() or "boto3" in str(e).lower() |
There was a problem hiding this comment.
Medium — Tests are no-ops without boto3: Every test is wrapped in try/except ImportError with an assertion on the error message. When boto3 isn't installed (which is the case in CI unless explicitly added), all tests silently pass without testing anything.
Better approaches:
- Use
pytest.importorskip("boto3")to clearly skip tests when the dependency is missing - Mock
boto3for unit tests so they run regardless of whether boto3 is installed - For integration tests that need real AWS, mark them with
@pytest.mark.integrationand skip in CI
There was a problem hiding this comment.
When this moves to a separate PR, you can follow what we do for the databases plugins and have explicit integration tests -- so we'd just remove the imports
burr/integrations/__init__.py
Outdated
| # under the License. | ||
|
|
||
|
|
||
| def __getattr__(name: str): |
burr/tracking/__init__.py
Outdated
|
|
||
| def __getattr__(name: str): | ||
| """Lazy load S3TrackingClient to avoid requiring boto3 unless used.""" | ||
| if name == "S3TrackingClient": | ||
| from burr.tracking.s3client import S3TrackingClient | ||
| return S3TrackingClient | ||
| raise AttributeError(f"module {__name__!r} has no attribute {name!r}") | ||
|
|
||
|
|
||
| __all__ = ["LocalTrackingClient", "S3TrackingClient"] |
There was a problem hiding this comment.
I don't think we need this?
| @@ -0,0 +1,109 @@ | |||
| # Licensed to the Apache Software Foundation (ASF) under one | |||
There was a problem hiding this comment.
let's move these to an aws folder under terraform or something
…he#664) - Event-driven SQS telemetry: S3 notifications to SQS, near-instant updates - Buffered S3 persistence: SpooledTemporaryFile fixes seek errors on large files - Native BedrockAction and BedrockStreamingAction for Bedrock integration - Terraform module: S3, SQS, IAM with dev/prod tfvars and tutorial
… fixed build error
This reverts commit 61673cd.
c913d03 to
5ca9739
Compare
[Short description explaining the high-level reason for the pull request]
Changes
BIP-042 issue #664
How I tested this
Deploy using terraform
Notes
Checklist