diff --git a/src/conductor/client/http/async_rest.py b/src/conductor/client/http/async_rest.py index fb2a0ff6..51f5eb89 100644 --- a/src/conductor/client/http/async_rest.py +++ b/src/conductor/client/http/async_rest.py @@ -1,10 +1,13 @@ import json +import logging import os import re import httpx from six.moves.urllib.parse import urlencode +logger = logging.getLogger(__name__) + class RESTResponse: @@ -44,6 +47,8 @@ def getheaders(self): class AsyncRESTClientObject(object): def __init__(self, connection=None): + # Set once we fall back from HTTP/2 to HTTP/1.1 after a protocol error. + self._http2_downgraded = False if connection is None: self._http2_enabled = self._is_http2_enabled() self.connection = self._create_default_httpx_client() @@ -57,6 +62,13 @@ def _is_http2_enabled(self) -> bool: val = os.getenv("CONDUCTOR_HTTP2_ENABLED", "true").strip().lower() return val not in ("0", "false", "no", "off") + def _is_http2_auto_fallback_enabled(self) -> bool: + # A protocol-level error on an HTTP/2 connection triggers a one-way + # fallback to HTTP/1.1 for the rest of the process. Opt out with + # CONDUCTOR_HTTP2_AUTO_FALLBACK=false to keep retrying on HTTP/2. + val = os.getenv("CONDUCTOR_HTTP2_AUTO_FALLBACK", "true").strip().lower() + return val not in ("0", "false", "no", "off") + def _create_default_httpx_client(self) -> httpx.AsyncClient: limits = httpx.Limits( max_connections=100, # Total connections across all hosts @@ -77,9 +89,21 @@ def _create_default_httpx_client(self) -> httpx.AsyncClient: http2=bool(self._http2_enabled) ) - async def _reset_connection(self) -> None: + async def _reset_connection(self, downgrade_http2=False) -> None: if not getattr(self, "_owns_connection", False): return + if (downgrade_http2 + and getattr(self, "_http2_enabled", None) + and self._is_http2_auto_fallback_enabled()): + # Protocol error on a long-lived HTTP/2 connection — fall back to + # HTTP/1.1 for the rest of this process instead of rebuilding + # another HTTP/2 client that stalls the poll loop the same way. + self._http2_enabled = False + self._http2_downgraded = True + logger.warning( + "httpx protocol error on HTTP/2 connection; disabled HTTP/2 " + "and fell back to HTTP/1.1 for this process" + ) try: if getattr(self, "connection", None) is not None: await self.connection.aclose() @@ -182,7 +206,9 @@ async def request(self, method, url, query_params=None, headers=None, break except (httpx.ProtocolError, httpx.ReadError, httpx.WriteError) as e: if attempt == 0 and self._owns_connection: - await self._reset_connection() + await self._reset_connection( + downgrade_http2=bool(getattr(self, "_http2_enabled", False)) + ) if method in idempotent_methods: continue msg = f"Protocol error ({type(e).__name__}): {e}" diff --git a/src/conductor/client/http/rest.py b/src/conductor/client/http/rest.py index c82708f8..645ca461 100644 --- a/src/conductor/client/http/rest.py +++ b/src/conductor/client/http/rest.py @@ -64,6 +64,9 @@ def __init__(self, connection=None): # discovering the same broken connection produces at most ONE real # reset + warning line, not N. self._reset_lock = threading.Lock() + # Set once we fall back from HTTP/2 to HTTP/1.1 after a protocol error, + # so we don't keep rebuilding HTTP/2 clients that hit the same wall. + self._http2_downgraded = False if connection is None: self._http2_enabled = self._is_http2_enabled() self.connection = self._create_default_httpx_client() @@ -78,6 +81,13 @@ def _is_http2_enabled(self) -> bool: val = os.getenv("CONDUCTOR_HTTP2_ENABLED", "true").strip().lower() return val not in ("0", "false", "no", "off") + def _is_http2_auto_fallback_enabled(self) -> bool: + # When HTTP/2 is enabled, a protocol-level error on the connection + # triggers a one-way fallback to HTTP/1.1 for the rest of the process. + # Opt out with CONDUCTOR_HTTP2_AUTO_FALLBACK=false to keep retrying on HTTP/2. + val = os.getenv("CONDUCTOR_HTTP2_AUTO_FALLBACK", "true").strip().lower() + return val not in ("0", "false", "no", "off") + def _create_default_httpx_client(self) -> httpx.Client: # Create httpx client with connection pooling. # Use HTTP/2 when enabled (default), but allow opting out via CONDUCTOR_HTTP2_ENABLED. @@ -111,9 +121,17 @@ def _is_client_closed(self) -> bool: except Exception: return False - def _reset_connection(self, expected=None) -> bool: + def _reset_connection(self, expected=None, downgrade_http2=False) -> bool: """Close the current httpx client (if any) and create a fresh one. + When `downgrade_http2` is True and HTTP/2 is currently enabled, the + replacement client is built on HTTP/1.1 and HTTP/2 stays disabled for + the remainder of this process. This is the auto-fallback path: a + protocol-level error on a long-lived HTTP/2 connection (common with + proxies/LBs that mishandle h2, e.g. GOAWAY storms) would otherwise be + "healed" into another HTTP/2 client that fails the same way, stalling + the poll loop. Downgrading once breaks that cycle. + This is a thread-safe compare-and-swap: - If `expected` is provided, the reset is only performed when the @@ -135,6 +153,11 @@ def _reset_connection(self, expected=None) -> bool: if expected is not None and current is not expected: # Someone else already healed since our caller last looked. return False + if (downgrade_http2 + and getattr(self, "_http2_enabled", None) + and self._is_http2_auto_fallback_enabled()): + self._http2_enabled = False + self._http2_downgraded = True try: if current is not None: current.close() @@ -262,9 +285,20 @@ def request(self, method, url, query_params=None, headers=None, # Reset the client to recover without requiring process restart. # Only auto-retry idempotent methods to avoid duplicating side effects. if attempt == 0: - reset_done = self._reset_connection(expected=client_at_send) + # If we're on HTTP/2, treat a protocol error as a signal to + # fall back to HTTP/1.1 for the rest of this process. + downgrade = bool(getattr(self, "_http2_enabled", False)) + reset_done = self._reset_connection( + expected=client_at_send, downgrade_http2=downgrade + ) if reset_done: - logger.warning("httpx protocol error; re-established client: %s", e) + if downgrade and getattr(self, "_http2_downgraded", False): + logger.warning( + "httpx protocol error on HTTP/2 connection; disabled HTTP/2 " + "and fell back to HTTP/1.1 for this process: %s", e + ) + else: + logger.warning("httpx protocol error; re-established client: %s", e) else: logger.debug( "httpx protocol error on stale client (already healed by another thread): %s", diff --git a/tests/unit/api_client/test_async_rest_client.py b/tests/unit/api_client/test_async_rest_client.py new file mode 100644 index 00000000..e05070ba --- /dev/null +++ b/tests/unit/api_client/test_async_rest_client.py @@ -0,0 +1,68 @@ +import asyncio +import unittest +from unittest.mock import AsyncMock, MagicMock, patch + +import httpx + +from conductor.client.http import async_rest + + +def _ok_response(): + response = MagicMock() + response.status_code = 200 + response.reason_phrase = "OK" + response.headers = {} + response.text = "" + return response + + +def _mock_async_client(): + """An httpx.AsyncClient stand-in whose request/aclose are awaitable.""" + c = MagicMock() + c.request = AsyncMock() + c.aclose = AsyncMock() + return c + + +class TestAsyncRESTClientObject(unittest.TestCase): + @patch.dict("os.environ", {"CONDUCTOR_HTTP2_ENABLED": "true"}) + @patch.object(async_rest.AsyncRESTClientObject, "_create_default_httpx_client") + def test_http2_protocol_error_downgrades_to_http1(self, mock_create_client): + first_client = _mock_async_client() + second_client = _mock_async_client() + mock_create_client.side_effect = [first_client, second_client] + + first_client.request.side_effect = httpx.RemoteProtocolError("ConnectionTerminated") + second_client.request.return_value = _ok_response() + + client = async_rest.AsyncRESTClientObject(connection=None) + self.assertTrue(client._http2_enabled) # default on + + result = asyncio.run(client.request("GET", "http://example")) + + self.assertEqual(result.status, 200) + self.assertFalse(client._http2_enabled) # HTTP/2 turned off + self.assertTrue(client._http2_downgraded) + self.assertTrue(first_client.aclose.called) + + @patch.dict("os.environ", {"CONDUCTOR_HTTP2_ENABLED": "true", + "CONDUCTOR_HTTP2_AUTO_FALLBACK": "false"}) + @patch.object(async_rest.AsyncRESTClientObject, "_create_default_httpx_client") + def test_http2_auto_fallback_can_be_disabled(self, mock_create_client): + first_client = _mock_async_client() + second_client = _mock_async_client() + mock_create_client.side_effect = [first_client, second_client] + + first_client.request.side_effect = httpx.RemoteProtocolError("ConnectionTerminated") + second_client.request.return_value = _ok_response() + + client = async_rest.AsyncRESTClientObject(connection=None) + result = asyncio.run(client.request("GET", "http://example")) + + self.assertEqual(result.status, 200) + self.assertTrue(client._http2_enabled) # still HTTP/2 + self.assertFalse(client._http2_downgraded) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/api_client/test_rest_client.py b/tests/unit/api_client/test_rest_client.py index 3aa69b48..926c9bf3 100644 --- a/tests/unit/api_client/test_rest_client.py +++ b/tests/unit/api_client/test_rest_client.py @@ -44,6 +44,47 @@ def test_resets_and_retries_on_remote_protocol_error(self, mock_create_client): self.assertTrue(first_client.close.called) self.assertEqual(result.status, 200) + @patch.dict("os.environ", {"CONDUCTOR_HTTP2_ENABLED": "true"}) + @patch.object(rest.RESTClientObject, "_create_default_httpx_client") + def test_http2_protocol_error_downgrades_to_http1(self, mock_create_client): + """A protocol error on an HTTP/2 connection disables HTTP/2 for the + rest of the process and heals onto HTTP/1.1.""" + first_client = _mock_client() + second_client = _mock_client() + mock_create_client.side_effect = [first_client, second_client] + + first_client.request.side_effect = httpx.RemoteProtocolError("ConnectionTerminated") + second_client.request.return_value = _ok_response() + + client = rest.RESTClientObject(connection=None) + self.assertTrue(client._http2_enabled) # default on + + result = client.request("GET", "http://example") + + self.assertEqual(result.status, 200) + self.assertFalse(client._http2_enabled) # HTTP/2 turned off + self.assertTrue(client._http2_downgraded) + + @patch.dict("os.environ", {"CONDUCTOR_HTTP2_ENABLED": "true", + "CONDUCTOR_HTTP2_AUTO_FALLBACK": "false"}) + @patch.object(rest.RESTClientObject, "_create_default_httpx_client") + def test_http2_auto_fallback_can_be_disabled(self, mock_create_client): + """With CONDUCTOR_HTTP2_AUTO_FALLBACK=false we still self-heal, but + stay on HTTP/2 instead of downgrading.""" + first_client = _mock_client() + second_client = _mock_client() + mock_create_client.side_effect = [first_client, second_client] + + first_client.request.side_effect = httpx.RemoteProtocolError("ConnectionTerminated") + second_client.request.return_value = _ok_response() + + client = rest.RESTClientObject(connection=None) + result = client.request("GET", "http://example") + + self.assertEqual(result.status, 200) + self.assertTrue(client._http2_enabled) # still HTTP/2 + self.assertFalse(client._http2_downgraded) + def test_is_closed_client_error_recognises_httpx_messages(self): self.assertTrue( rest._is_closed_client_error(