Skip to content

Issue 664 cloud native aws#666

Open
vaquarkhan wants to merge 8 commits intoapache:mainfrom
vaquarkhan:issue-664-cloud-native-aws
Open

Issue 664 cloud native aws#666
vaquarkhan wants to merge 8 commits intoapache:mainfrom
vaquarkhan:issue-664-cloud-native-aws

Conversation

@vaquarkhan
Copy link

@vaquarkhan vaquarkhan commented Mar 1, 2026

[Short description explaining the high-level reason for the pull request]

Changes

BIP-042 issue #664

How I tested this

Deploy using terraform

Notes

Checklist

  • PR has an informative and human-readable title (this will be pulled into the release notes)
  • Changes are limited to a single goal (no scope creep)
  • Code passed the pre-commit check & code is left cleaner/nicer than when first encountered.
  • Any change in functionality is tested
  • New functions are documented (with a description, list of inputs, and expected output)
  • Placeholder code is flagged / future TODOs are captured in comments
  • Project documentation has been updated if adding/changing functionality.

@skrawcz
Copy link
Contributor

skrawcz commented Mar 3, 2026

@vaquarkhan let us know when it's ready for review

@vaquarkhan
Copy link
Author

Its ready for review please review pull request its waiting for approval @skrawcz

@skrawcz
Copy link
Contributor

skrawcz commented Mar 12, 2026

@vaquarkhan any screenshots of things in action? Also we'll need to add to docs/ for this -- mind adding that part?

@vaquarkhan
Copy link
Author

let me create google doc for documentation will add all screenshots

Comment on lines +165 to +179
class EventDrivenBackendMixin(abc.ABC):
"""Mixin for backends that support event-driven updates via SQS.

BIP-0042: This mixin enables backends to receive real-time notifications
from SQS instead of polling S3 for new files.
"""

@abc.abstractmethod
async def start_sqs_consumer(self):
"""Start the SQS consumer for event-driven tracking.

This method should run indefinitely, processing S3 event notifications
from the configured SQS queue.
"""
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove SQS from this and make it more generic in naming.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accepted

Copy link
Contributor

@skrawcz skrawcz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Review: Cloud Native AWS (BIP-042)

Thanks for the contribution! The overall direction of adding cloud-native AWS support is valuable. However, there are several concerns that should be addressed before merging — primarily around vendor neutrality, code quality, and architecture. As an Apache project, keeping the core framework vendor-neutral is critical.

Summary

# Severity Issue
1 Blocker start_sqs_consumer in the core backend mixin leaks AWS vendor concepts into the abstraction layer
2 Blocker terraform/ at repo root with .terraform.lock.hcl committed — should be under examples/ or contrib/
3 High Bedrock integration is orthogonal to S3/SQS tracking — should be a separate PR to reduce scope
4 High BedrockAction / BedrockStreamingAction have heavy code duplication (~90% identical __init__ and request-building)
5 High No graceful shutdown for the SQS consumer asyncio task — CancelledError not handled
6 Medium tracking_mode should be a Python Enum, not magic strings "POLLING" / "SQS"
7 Medium SQS consumer only processes Records[0], silently ignoring additional records in a message is this correct behavior?
8 Medium Tests are effectively no-ops when boto3 isn't installed (all wrapped in try/except ImportError)
9 Low Import style (List, Dict vs list, dict), logging uses f-strings instead of %s-style

See inline comments for details.



class EventDrivenBackendMixin(abc.ABC):
"""Mixin for backends that support event-driven updates via SQS.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocker — Vendor neutrality: This mixin is part of the core backend abstraction layer, but it leaks AWS-specific concepts. The class name is fine, but start_sqs_consumer names a specific AWS service, and the docstrings reference SQS throughout.

For an Apache project, this should be vendor-neutral:

  • Rename start_sqs_consumer()start_event_consumer()
  • Update docstrings to describe generic event-driven updates, not SQS specifically
  • The is_event_driven() method name is already good

This would allow a GCP Pub/Sub or Azure Event Grid implementation to use the same mixin without the naming being misleading.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accepted

prior_snapshots_to_keep: int = 5
# BIP-0042: Event-driven tracking settings
tracking_mode: str = "POLLING" # "POLLING" or "SQS" - POLLING is default for backward compatibility
sqs_queue_url: Optional[str] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Medium — Use an Enum instead of magic strings: "POLLING" and "SQS" are magic strings that are error-prone (e.g., typo "Polling" would silently fall through). Consider:

from enum import Enum

class TrackingMode(str, Enum):
    POLLING = "POLLING"
    EVENT_DRIVEN = "EVENT_DRIVEN"

Also, "SQS" is an AWS-specific name. Since this setting is in the S3 backend (already AWS-specific), it's less critical than the core mixin, but "EVENT_DRIVEN" would still be more descriptive of the behavior rather than the implementation.


async with self._session.create_client("sqs", region_name=self._sqs_region) as sqs_client:
while True:
try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High — No graceful shutdown: This while True loop runs as a fire-and-forget asyncio.create_task() (see run.py). When the server shuts down, this task will be cancelled, but asyncio.CancelledError is not handled. The outer except Exception won't catch it (in Python 3.9+, CancelledError inherits from BaseException).

Recommendation:

try:
    while True:
        ...
except asyncio.CancelledError:
    logger.info("SQS consumer shutting down")
    raise

Also, the task reference in run.py is not stored anywhere — it could be garbage collected. Store it and cancel it in the shutdown path of the lifespan context manager.

logger.info(f"Indexed S3 event: {s3_key}")
except Exception as e:
logger.error(f"Failed to handle S3 event {s3_key}: {e}")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Medium — Silent error swallowing: Catching Exception and only logging means data loss can go completely unnoticed. If indexing fails for a file, that tracking data is permanently skipped.

Consider either:

  1. Re-raising after logging (letting the message go back to SQS for retry, eventually to DLQ)
  2. At minimum, emitting a metric/counter so operators can alert on indexing failures

Currently the message gets deleted from SQS even if _handle_s3_event fails (the delete_message call is outside this try/except), so failed events are lost forever.



class BedrockStreamingAction(StreamingAction):
"""Streaming variant of BedrockAction using Converse Stream API."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High — Code duplication: BedrockStreamingAction duplicates ~90% of BedrockAction:

  • Identical __init__ (all parameters, client creation, instance variable assignments)
  • Identical reads, writes, name properties
  • Identical request-building logic in stream_run vs run_and_update

Consider extracting a shared base class:

class _BedrockBase:
    def __init__(self, model_id, input_mapper, reads, writes, ...):
        # shared init
    
    def _build_request(self, state: State) -> Dict[str, Any]:
        # shared request building

class BedrockAction(_BedrockBase, SingleStepAction): ...
class BedrockStreamingAction(_BedrockBase, StreamingAction): ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and also move this to a different PR


config = Config(retries={"max_attempts": max_retries, "mode": "adaptive"})
self._client = boto3.client("bedrock-runtime", region_name=region, config=config)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Medium — Client created at construction time: The boto3 client is created eagerly in __init__, which means:

  1. The action can't be serialized/pickled (relevant for distributed execution)
  2. Testing requires mocking at construction time or having valid AWS credentials
  3. Credentials are captured at construction, not at invocation

Consider lazy client creation (create on first run_and_update call) or accepting a client/session as a parameter for testability.

Copy link
Contributor

@skrawcz skrawcz Mar 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think any client should be passed in (i.e. injected) -- that way it's easier for tests etc.

# BIP-0042: Start SQS consumer for event-driven tracking when configured
if isinstance(backend, EventDrivenBackendMixin) and backend.is_event_driven():
asyncio.create_task(backend.start_sqs_consumer())
global initialized
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High — Task reference not stored: asyncio.create_task() returns a task reference that isn't assigned to anything. This means:

  1. The task could be garbage collected
  2. There's no way to cancel it during shutdown
  3. Exceptions in the task will be silently lost

Store the task and cancel it in the shutdown path:

sqs_task = None
if isinstance(backend, EventDrivenBackendMixin) and backend.is_event_driven():
    sqs_task = asyncio.create_task(backend.start_event_consumer())
yield
if sqs_task:
    sqs_task.cancel()
    try:
        await sqs_task
    except asyncio.CancelledError:
        pass

Also, import asyncio should be a top-level import, not inside the function.

@@ -0,0 +1,45 @@
# This file is maintained automatically by "terraform init".
# Manual edits may be lost in future updates.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocker — Should not be committed: .terraform.lock.hcl is a generated file specific to the contributor's Terraform version and platform. It should be added to .gitignore (or the terraform .gitignore), not committed to the repo.

More broadly: the entire terraform/ directory (including dev.tfvars, prod.tfvars with hardcoded us-east-1) should live under examples/deployment/aws/ or contrib/aws/ rather than at the repo root. Burr is a Python framework library — shipping infrastructure-as-code at the top level makes the project appear AWS-first rather than vendor-neutral.


assert BedrockAction is not None
except ImportError as e:
assert "bedrock" in str(e).lower() or "boto3" in str(e).lower()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Medium — Tests are no-ops without boto3: Every test is wrapped in try/except ImportError with an assertion on the error message. When boto3 isn't installed (which is the case in CI unless explicitly added), all tests silently pass without testing anything.

Better approaches:

  1. Use pytest.importorskip("boto3") to clearly skip tests when the dependency is missing
  2. Mock boto3 for unit tests so they run regardless of whether boto3 is installed
  3. For integration tests that need real AWS, mark them with @pytest.mark.integration and skip in CI

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this moves to a separate PR, you can follow what we do for the databases plugins and have explicit integration tests -- so we'd just remove the imports

# under the License.


def __getattr__(name: str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to separate PR

Comment on lines +20 to +29

def __getattr__(name: str):
"""Lazy load S3TrackingClient to avoid requiring boto3 unless used."""
if name == "S3TrackingClient":
from burr.tracking.s3client import S3TrackingClient
return S3TrackingClient
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")


__all__ = ["LocalTrackingClient", "S3TrackingClient"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this?

@@ -0,0 +1,109 @@
# Licensed to the Apache Software Foundation (ASF) under one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's move these to an aws folder under terraform or something

@vaquarkhan vaquarkhan force-pushed the issue-664-cloud-native-aws branch from c913d03 to 5ca9739 Compare March 16, 2026 08:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants