Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions src/conductor/client/http/async_rest.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import json
import logging
import os
import re

import httpx
from six.moves.urllib.parse import urlencode

logger = logging.getLogger(__name__)


class RESTResponse:

Expand Down Expand Up @@ -44,6 +47,8 @@ def getheaders(self):

class AsyncRESTClientObject(object):
def __init__(self, connection=None):
# Set once we fall back from HTTP/2 to HTTP/1.1 after a protocol error.
self._http2_downgraded = False
if connection is None:
self._http2_enabled = self._is_http2_enabled()
self.connection = self._create_default_httpx_client()
Expand All @@ -57,6 +62,13 @@ def _is_http2_enabled(self) -> bool:
val = os.getenv("CONDUCTOR_HTTP2_ENABLED", "true").strip().lower()
return val not in ("0", "false", "no", "off")

def _is_http2_auto_fallback_enabled(self) -> bool:
# A protocol-level error on an HTTP/2 connection triggers a one-way
# fallback to HTTP/1.1 for the rest of the process. Opt out with
# CONDUCTOR_HTTP2_AUTO_FALLBACK=false to keep retrying on HTTP/2.
val = os.getenv("CONDUCTOR_HTTP2_AUTO_FALLBACK", "true").strip().lower()
return val not in ("0", "false", "no", "off")

def _create_default_httpx_client(self) -> httpx.AsyncClient:
limits = httpx.Limits(
max_connections=100, # Total connections across all hosts
Expand All @@ -77,9 +89,21 @@ def _create_default_httpx_client(self) -> httpx.AsyncClient:
http2=bool(self._http2_enabled)
)

async def _reset_connection(self) -> None:
async def _reset_connection(self, downgrade_http2=False) -> None:
if not getattr(self, "_owns_connection", False):
return
if (downgrade_http2
and getattr(self, "_http2_enabled", None)
and self._is_http2_auto_fallback_enabled()):
# Protocol error on a long-lived HTTP/2 connection — fall back to
# HTTP/1.1 for the rest of this process instead of rebuilding
# another HTTP/2 client that stalls the poll loop the same way.
self._http2_enabled = False
self._http2_downgraded = True
logger.warning(
"httpx protocol error on HTTP/2 connection; disabled HTTP/2 "
"and fell back to HTTP/1.1 for this process"
)
try:
if getattr(self, "connection", None) is not None:
await self.connection.aclose()
Expand Down Expand Up @@ -182,7 +206,9 @@ async def request(self, method, url, query_params=None, headers=None,
break
except (httpx.ProtocolError, httpx.ReadError, httpx.WriteError) as e:
if attempt == 0 and self._owns_connection:
await self._reset_connection()
await self._reset_connection(
downgrade_http2=bool(getattr(self, "_http2_enabled", False))
)
if method in idempotent_methods:
continue
msg = f"Protocol error ({type(e).__name__}): {e}"
Expand Down
40 changes: 37 additions & 3 deletions src/conductor/client/http/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ def __init__(self, connection=None):
# discovering the same broken connection produces at most ONE real
# reset + warning line, not N.
self._reset_lock = threading.Lock()
# Set once we fall back from HTTP/2 to HTTP/1.1 after a protocol error,
# so we don't keep rebuilding HTTP/2 clients that hit the same wall.
self._http2_downgraded = False
if connection is None:
self._http2_enabled = self._is_http2_enabled()
self.connection = self._create_default_httpx_client()
Expand All @@ -78,6 +81,13 @@ def _is_http2_enabled(self) -> bool:
val = os.getenv("CONDUCTOR_HTTP2_ENABLED", "true").strip().lower()
return val not in ("0", "false", "no", "off")

def _is_http2_auto_fallback_enabled(self) -> bool:
# When HTTP/2 is enabled, a protocol-level error on the connection
# triggers a one-way fallback to HTTP/1.1 for the rest of the process.
# Opt out with CONDUCTOR_HTTP2_AUTO_FALLBACK=false to keep retrying on HTTP/2.
val = os.getenv("CONDUCTOR_HTTP2_AUTO_FALLBACK", "true").strip().lower()
return val not in ("0", "false", "no", "off")

def _create_default_httpx_client(self) -> httpx.Client:
# Create httpx client with connection pooling.
# Use HTTP/2 when enabled (default), but allow opting out via CONDUCTOR_HTTP2_ENABLED.
Expand Down Expand Up @@ -111,9 +121,17 @@ def _is_client_closed(self) -> bool:
except Exception:
return False

def _reset_connection(self, expected=None) -> bool:
def _reset_connection(self, expected=None, downgrade_http2=False) -> bool:
"""Close the current httpx client (if any) and create a fresh one.

When `downgrade_http2` is True and HTTP/2 is currently enabled, the
replacement client is built on HTTP/1.1 and HTTP/2 stays disabled for
the remainder of this process. This is the auto-fallback path: a
protocol-level error on a long-lived HTTP/2 connection (common with
proxies/LBs that mishandle h2, e.g. GOAWAY storms) would otherwise be
"healed" into another HTTP/2 client that fails the same way, stalling
the poll loop. Downgrading once breaks that cycle.

This is a thread-safe compare-and-swap:

- If `expected` is provided, the reset is only performed when the
Expand All @@ -135,6 +153,11 @@ def _reset_connection(self, expected=None) -> bool:
if expected is not None and current is not expected:
# Someone else already healed since our caller last looked.
return False
if (downgrade_http2
and getattr(self, "_http2_enabled", None)
and self._is_http2_auto_fallback_enabled()):
self._http2_enabled = False
self._http2_downgraded = True
try:
if current is not None:
current.close()
Expand Down Expand Up @@ -262,9 +285,20 @@ def request(self, method, url, query_params=None, headers=None,
# Reset the client to recover without requiring process restart.
# Only auto-retry idempotent methods to avoid duplicating side effects.
if attempt == 0:
reset_done = self._reset_connection(expected=client_at_send)
# If we're on HTTP/2, treat a protocol error as a signal to
# fall back to HTTP/1.1 for the rest of this process.
downgrade = bool(getattr(self, "_http2_enabled", False))
reset_done = self._reset_connection(
expected=client_at_send, downgrade_http2=downgrade
)
if reset_done:
logger.warning("httpx protocol error; re-established client: %s", e)
if downgrade and getattr(self, "_http2_downgraded", False):
logger.warning(
"httpx protocol error on HTTP/2 connection; disabled HTTP/2 "
"and fell back to HTTP/1.1 for this process: %s", e
)
else:
logger.warning("httpx protocol error; re-established client: %s", e)
else:
logger.debug(
"httpx protocol error on stale client (already healed by another thread): %s",
Expand Down
68 changes: 68 additions & 0 deletions tests/unit/api_client/test_async_rest_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import asyncio
import unittest
from unittest.mock import AsyncMock, MagicMock, patch

import httpx

from conductor.client.http import async_rest


def _ok_response():
response = MagicMock()
response.status_code = 200
response.reason_phrase = "OK"
response.headers = {}
response.text = ""
return response


def _mock_async_client():
"""An httpx.AsyncClient stand-in whose request/aclose are awaitable."""
c = MagicMock()
c.request = AsyncMock()
c.aclose = AsyncMock()
return c


class TestAsyncRESTClientObject(unittest.TestCase):
@patch.dict("os.environ", {"CONDUCTOR_HTTP2_ENABLED": "true"})
@patch.object(async_rest.AsyncRESTClientObject, "_create_default_httpx_client")
def test_http2_protocol_error_downgrades_to_http1(self, mock_create_client):
first_client = _mock_async_client()
second_client = _mock_async_client()
mock_create_client.side_effect = [first_client, second_client]

first_client.request.side_effect = httpx.RemoteProtocolError("ConnectionTerminated")
second_client.request.return_value = _ok_response()

client = async_rest.AsyncRESTClientObject(connection=None)
self.assertTrue(client._http2_enabled) # default on

result = asyncio.run(client.request("GET", "http://example"))

self.assertEqual(result.status, 200)
self.assertFalse(client._http2_enabled) # HTTP/2 turned off
self.assertTrue(client._http2_downgraded)
self.assertTrue(first_client.aclose.called)

@patch.dict("os.environ", {"CONDUCTOR_HTTP2_ENABLED": "true",
"CONDUCTOR_HTTP2_AUTO_FALLBACK": "false"})
@patch.object(async_rest.AsyncRESTClientObject, "_create_default_httpx_client")
def test_http2_auto_fallback_can_be_disabled(self, mock_create_client):
first_client = _mock_async_client()
second_client = _mock_async_client()
mock_create_client.side_effect = [first_client, second_client]

first_client.request.side_effect = httpx.RemoteProtocolError("ConnectionTerminated")
second_client.request.return_value = _ok_response()

client = async_rest.AsyncRESTClientObject(connection=None)
result = asyncio.run(client.request("GET", "http://example"))

self.assertEqual(result.status, 200)
self.assertTrue(client._http2_enabled) # still HTTP/2
self.assertFalse(client._http2_downgraded)


if __name__ == "__main__":
unittest.main()
41 changes: 41 additions & 0 deletions tests/unit/api_client/test_rest_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,47 @@ def test_resets_and_retries_on_remote_protocol_error(self, mock_create_client):
self.assertTrue(first_client.close.called)
self.assertEqual(result.status, 200)

@patch.dict("os.environ", {"CONDUCTOR_HTTP2_ENABLED": "true"})
@patch.object(rest.RESTClientObject, "_create_default_httpx_client")
def test_http2_protocol_error_downgrades_to_http1(self, mock_create_client):
"""A protocol error on an HTTP/2 connection disables HTTP/2 for the
rest of the process and heals onto HTTP/1.1."""
first_client = _mock_client()
second_client = _mock_client()
mock_create_client.side_effect = [first_client, second_client]

first_client.request.side_effect = httpx.RemoteProtocolError("ConnectionTerminated")
second_client.request.return_value = _ok_response()

client = rest.RESTClientObject(connection=None)
self.assertTrue(client._http2_enabled) # default on

result = client.request("GET", "http://example")

self.assertEqual(result.status, 200)
self.assertFalse(client._http2_enabled) # HTTP/2 turned off
self.assertTrue(client._http2_downgraded)

@patch.dict("os.environ", {"CONDUCTOR_HTTP2_ENABLED": "true",
"CONDUCTOR_HTTP2_AUTO_FALLBACK": "false"})
@patch.object(rest.RESTClientObject, "_create_default_httpx_client")
def test_http2_auto_fallback_can_be_disabled(self, mock_create_client):
"""With CONDUCTOR_HTTP2_AUTO_FALLBACK=false we still self-heal, but
stay on HTTP/2 instead of downgrading."""
first_client = _mock_client()
second_client = _mock_client()
mock_create_client.side_effect = [first_client, second_client]

first_client.request.side_effect = httpx.RemoteProtocolError("ConnectionTerminated")
second_client.request.return_value = _ok_response()

client = rest.RESTClientObject(connection=None)
result = client.request("GET", "http://example")

self.assertEqual(result.status, 200)
self.assertTrue(client._http2_enabled) # still HTTP/2
self.assertFalse(client._http2_downgraded)

def test_is_closed_client_error_recognises_httpx_messages(self):
self.assertTrue(
rest._is_closed_client_error(
Expand Down
Loading