diff --git a/amber/LICENSE-binary-python b/amber/LICENSE-binary-python index 3bd610c18dd..12901bae788 100644 --- a/amber/LICENSE-binary-python +++ b/amber/LICENSE-binary-python @@ -228,7 +228,7 @@ Python packages: - pympler==1.1 - python-dateutil==2.8.2 - regex==2026.5.9 - - requests==2.34.2 + - requests==2.34.0 - s3transfer==0.14.0 - safetensors==0.8.0 - tenacity==8.5.0 diff --git a/amber/requirements.txt b/amber/requirements.txt index 9cf722e46bb..caf9b6bedc4 100644 --- a/amber/requirements.txt +++ b/amber/requirements.txt @@ -43,3 +43,5 @@ SQLAlchemy==2.0.37 pg8000==1.31.5 pympler==1.1 boto3==1.40.53 +requests==2.34.0 +urllib3==2.7.0 diff --git a/amber/src/main/python/pytexera/storage/dataset_file_document.py b/amber/src/main/python/pytexera/storage/dataset_file_document.py index 31f95d3fc72..5a063f60470 100644 --- a/amber/src/main/python/pytexera/storage/dataset_file_document.py +++ b/amber/src/main/python/pytexera/storage/dataset_file_document.py @@ -19,9 +19,38 @@ import os import requests import urllib.parse +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry class DatasetFileDocument: + # (connect, read) timeout and retry settings for the file-service GETs below. + # Read timeout bounds inactivity between bytes, not total download time. + _CONNECT_TIMEOUT_SECONDS = 5 + _READ_TIMEOUT_SECONDS = 10 + _REQUEST_TIMEOUT = (_CONNECT_TIMEOUT_SECONDS, _READ_TIMEOUT_SECONDS) + _MAX_RETRIES = 3 + _RETRY_BACKOFF_FACTOR = 0.5 + _RETRY_STATUS_FORCELIST = (500, 502, 503, 504) + + @classmethod + def _retry_session(cls) -> requests.Session: + """Returns a Session that retries GETs on connection errors and 5xx.""" + retry = Retry( + total=cls._MAX_RETRIES, + connect=cls._MAX_RETRIES, + read=cls._MAX_RETRIES, + backoff_factor=cls._RETRY_BACKOFF_FACTOR, + status_forcelist=cls._RETRY_STATUS_FORCELIST, + allowed_methods=frozenset({"GET"}), + raise_on_status=False, + ) + adapter = HTTPAdapter(max_retries=retry) + session = requests.Session() + session.mount("http://", adapter) + session.mount("https://", adapter) + return session + def __init__(self, file_path: str): """ Parses the file path into dataset metadata. @@ -69,7 +98,18 @@ def get_presigned_url(self) -> str: params = {"filePath": encoded_file_path} - response = requests.get(self.presign_endpoint, headers=headers, params=params) + try: + with self._retry_session() as session: + response = session.get( + self.presign_endpoint, + headers=headers, + params=params, + timeout=self._REQUEST_TIMEOUT, + ) + except requests.exceptions.RequestException as e: + raise RuntimeError( + f"Failed to get presigned URL: request failed: {e}" + ) from e if response.status_code != 200: raise RuntimeError( @@ -100,7 +140,13 @@ def read_file(self) -> io.BytesIO: :raises: RuntimeError if the retrieval fails. """ presigned_url = self.get_presigned_url() - response = requests.get(presigned_url) + try: + with self._retry_session() as session: + response = session.get(presigned_url, timeout=self._REQUEST_TIMEOUT) + except requests.exceptions.RequestException as e: + raise RuntimeError( + f"Failed to retrieve file content: request failed: {e}" + ) from e if response.status_code != 200: raise RuntimeError( diff --git a/amber/src/test/python/pytexera/storage/test_dataset_file_document.py b/amber/src/test/python/pytexera/storage/test_dataset_file_document.py index 680f512072b..36882fe27ee 100644 --- a/amber/src/test/python/pytexera/storage/test_dataset_file_document.py +++ b/amber/src/test/python/pytexera/storage/test_dataset_file_document.py @@ -18,11 +18,11 @@ import io import pytest +import requests from unittest.mock import patch, MagicMock from pytexera.storage.dataset_file_document import DatasetFileDocument - DEFAULT_ENDPOINT = "http://localhost:9092/api/dataset/presign-download" CUSTOM_ENDPOINT = "https://example.test/api/presign" @@ -95,7 +95,9 @@ def _make_doc(self, monkeypatch, path="/bob@x.com/ds/v1/file.csv"): def test_returns_presigned_url_field_from_json_body(self, monkeypatch): doc = self._make_doc(monkeypatch) - with patch("pytexera.storage.dataset_file_document.requests.get") as mock_get: + with patch( + "pytexera.storage.dataset_file_document.requests.Session.get" + ) as mock_get: mock_get.return_value = make_response( 200, body={"presignedUrl": "https://signed.test/x"} ) @@ -103,7 +105,9 @@ def test_returns_presigned_url_field_from_json_body(self, monkeypatch): def test_sends_bearer_authorization_header_with_jwt(self, monkeypatch): doc = self._make_doc(monkeypatch) - with patch("pytexera.storage.dataset_file_document.requests.get") as mock_get: + with patch( + "pytexera.storage.dataset_file_document.requests.Session.get" + ) as mock_get: mock_get.return_value = make_response(200, body={"presignedUrl": "u"}) doc.get_presigned_url() _, kwargs = mock_get.call_args @@ -113,7 +117,9 @@ def test_url_encodes_filepath_query_parameter(self, monkeypatch): # urllib.parse.quote keeps "/" as safe by default, but encodes "@" # and " " — pin both pieces so the contract is explicit. doc = self._make_doc(monkeypatch, path="/bob@x.com/ds/v1/data file.csv") - with patch("pytexera.storage.dataset_file_document.requests.get") as mock_get: + with patch( + "pytexera.storage.dataset_file_document.requests.Session.get" + ) as mock_get: mock_get.return_value = make_response(200, body={"presignedUrl": "u"}) doc.get_presigned_url() _, kwargs = mock_get.call_args @@ -124,7 +130,9 @@ def test_url_encodes_filepath_query_parameter(self, monkeypatch): def test_calls_configured_endpoint(self, monkeypatch): doc = self._make_doc(monkeypatch) - with patch("pytexera.storage.dataset_file_document.requests.get") as mock_get: + with patch( + "pytexera.storage.dataset_file_document.requests.Session.get" + ) as mock_get: mock_get.return_value = make_response(200, body={"presignedUrl": "u"}) doc.get_presigned_url() args, _ = mock_get.call_args @@ -132,21 +140,27 @@ def test_calls_configured_endpoint(self, monkeypatch): def test_raises_runtime_error_with_status_and_body_on_failure(self, monkeypatch): doc = self._make_doc(monkeypatch) - with patch("pytexera.storage.dataset_file_document.requests.get") as mock_get: + with patch( + "pytexera.storage.dataset_file_document.requests.Session.get" + ) as mock_get: mock_get.return_value = make_response(403, body="forbidden") with pytest.raises(RuntimeError, match=r"403.*forbidden"): doc.get_presigned_url() def test_raises_when_response_body_lacks_presigned_url_key(self, monkeypatch): doc = self._make_doc(monkeypatch) - with patch("pytexera.storage.dataset_file_document.requests.get") as mock_get: + with patch( + "pytexera.storage.dataset_file_document.requests.Session.get" + ) as mock_get: mock_get.return_value = make_response(200, body={"other": "value"}) with pytest.raises(RuntimeError, match="'presignedUrl' missing"): doc.get_presigned_url() def test_raises_when_response_body_is_not_valid_json(self, monkeypatch): doc = self._make_doc(monkeypatch) - with patch("pytexera.storage.dataset_file_document.requests.get") as mock_get: + with patch( + "pytexera.storage.dataset_file_document.requests.Session.get" + ) as mock_get: response = MagicMock() response.status_code = 200 response.json.side_effect = ValueError("Expecting value") @@ -157,14 +171,18 @@ def test_raises_when_response_body_is_not_valid_json(self, monkeypatch): def test_raises_when_presigned_url_is_empty_string(self, monkeypatch): doc = self._make_doc(monkeypatch) - with patch("pytexera.storage.dataset_file_document.requests.get") as mock_get: + with patch( + "pytexera.storage.dataset_file_document.requests.Session.get" + ) as mock_get: mock_get.return_value = make_response(200, body={"presignedUrl": ""}) with pytest.raises(RuntimeError, match="'presignedUrl' missing"): doc.get_presigned_url() def test_raises_when_presigned_url_is_not_a_string(self, monkeypatch): doc = self._make_doc(monkeypatch) - with patch("pytexera.storage.dataset_file_document.requests.get") as mock_get: + with patch( + "pytexera.storage.dataset_file_document.requests.Session.get" + ) as mock_get: mock_get.return_value = make_response(200, body={"presignedUrl": None}) with pytest.raises(RuntimeError, match="'presignedUrl' missing"): doc.get_presigned_url() @@ -178,7 +196,9 @@ def _make_doc(self, monkeypatch): def test_returns_bytesio_with_downloaded_content(self, monkeypatch): doc = self._make_doc(monkeypatch) - with patch("pytexera.storage.dataset_file_document.requests.get") as mock_get: + with patch( + "pytexera.storage.dataset_file_document.requests.Session.get" + ) as mock_get: mock_get.side_effect = [ make_response(200, body={"presignedUrl": "https://signed.test/x"}), make_response(200, content=b"hello-bytes"), @@ -189,14 +209,18 @@ def test_returns_bytesio_with_downloaded_content(self, monkeypatch): def test_propagates_presigned_url_failure(self, monkeypatch): doc = self._make_doc(monkeypatch) - with patch("pytexera.storage.dataset_file_document.requests.get") as mock_get: + with patch( + "pytexera.storage.dataset_file_document.requests.Session.get" + ) as mock_get: mock_get.return_value = make_response(500, body="upstream down") with pytest.raises(RuntimeError, match=r"500.*upstream down"): doc.read_file() def test_raises_runtime_error_when_download_fails(self, monkeypatch): doc = self._make_doc(monkeypatch) - with patch("pytexera.storage.dataset_file_document.requests.get") as mock_get: + with patch( + "pytexera.storage.dataset_file_document.requests.Session.get" + ) as mock_get: mock_get.side_effect = [ make_response(200, body={"presignedUrl": "https://signed.test/x"}), make_response(404, body="missing"), @@ -206,7 +230,9 @@ def test_raises_runtime_error_when_download_fails(self, monkeypatch): def test_downloads_from_presigned_url_returned_by_first_call(self, monkeypatch): doc = self._make_doc(monkeypatch) - with patch("pytexera.storage.dataset_file_document.requests.get") as mock_get: + with patch( + "pytexera.storage.dataset_file_document.requests.Session.get" + ) as mock_get: mock_get.side_effect = [ make_response(200, body={"presignedUrl": "https://signed.test/x"}), make_response(200, content=b""), @@ -214,3 +240,72 @@ def test_downloads_from_presigned_url_returned_by_first_call(self, monkeypatch): doc.read_file() second_call_args, _ = mock_get.call_args_list[1] assert second_call_args[0] == "https://signed.test/x" + + +class TestTimeoutsAndRetries: + def _make_doc(self, monkeypatch): + monkeypatch.setenv("USER_JWT_TOKEN", "test-jwt-token") + monkeypatch.setenv("FILE_SERVICE_GET_PRESIGNED_URL_ENDPOINT", CUSTOM_ENDPOINT) + return DatasetFileDocument("/bob@x.com/ds/v1/file.csv") + + def test_presigned_url_request_passes_request_timeout(self, monkeypatch): + doc = self._make_doc(monkeypatch) + with patch( + "pytexera.storage.dataset_file_document.requests.Session.get" + ) as mock_get: + mock_get.return_value = make_response(200, body={"presignedUrl": "u"}) + doc.get_presigned_url() + _, kwargs = mock_get.call_args + assert kwargs["timeout"] == DatasetFileDocument._REQUEST_TIMEOUT + + def test_download_request_passes_request_timeout(self, monkeypatch): + doc = self._make_doc(monkeypatch) + with patch( + "pytexera.storage.dataset_file_document.requests.Session.get" + ) as mock_get: + mock_get.side_effect = [ + make_response(200, body={"presignedUrl": "https://signed.test/x"}), + make_response(200, content=b"data"), + ] + doc.read_file() + _, download_kwargs = mock_get.call_args_list[1] + assert download_kwargs["timeout"] == DatasetFileDocument._REQUEST_TIMEOUT + + def test_session_mounts_retry_adapter_for_http_and_https(self): + session = DatasetFileDocument._retry_session() + try: + for prefix in ("http://", "https://"): + retry = session.get_adapter(prefix).max_retries + assert retry.total == DatasetFileDocument._MAX_RETRIES + assert retry.connect == DatasetFileDocument._MAX_RETRIES + assert retry.read == DatasetFileDocument._MAX_RETRIES + assert set(retry.status_forcelist) == set( + DatasetFileDocument._RETRY_STATUS_FORCELIST + ) + # Only idempotent GETs should be retried. + assert retry.allowed_methods == frozenset({"GET"}) + finally: + session.close() + + def test_presigned_url_request_timeout_is_wrapped_in_runtime_error( + self, monkeypatch + ): + doc = self._make_doc(monkeypatch) + with patch( + "pytexera.storage.dataset_file_document.requests.Session.get" + ) as mock_get: + mock_get.side_effect = requests.exceptions.ReadTimeout("timed out") + with pytest.raises(RuntimeError, match="request failed"): + doc.get_presigned_url() + + def test_download_request_timeout_is_wrapped_in_runtime_error(self, monkeypatch): + doc = self._make_doc(monkeypatch) + with patch( + "pytexera.storage.dataset_file_document.requests.Session.get" + ) as mock_get: + mock_get.side_effect = [ + make_response(200, body={"presignedUrl": "https://signed.test/x"}), + requests.exceptions.ConnectionError("connection reset"), + ] + with pytest.raises(RuntimeError, match="Failed to retrieve file content"): + doc.read_file()