Skip to content

Commit 32d4cd5

Browse files
authored
fix: Read/Write Splitting plugins not subscribed to the execute pipeline (#1117)
1 parent ea48e94 commit 32d4cd5

File tree

15 files changed

+128
-106
lines changed

15 files changed

+128
-106
lines changed

aws_advanced_python_wrapper/pep249_methods.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class DbApiMethod(Enum):
6363
CURSOR_NEXT = (30, "Cursor.__next__", False)
6464
CURSOR_LASTROWID = (31, "Cursor.lastrowid", False)
6565

66-
# AWS Advaced Python Wrapper Methods for
66+
# AWS Advanced Python Wrapper Methods for the execution pipelines.
6767
CONNECT = (32, "connect", True)
6868
FORCE_CONNECT = (33, "force_connect", True)
6969
INIT_HOST_PROVIDER = (34, "init_host_provider", True)

aws_advanced_python_wrapper/read_write_splitting_plugin.py

Lines changed: 38 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
)
2929

3030
from aws_advanced_python_wrapper.errors import (AwsWrapperError, FailoverError,
31+
FailoverFailedError,
3132
ReadWriteSplittingError)
3233
from aws_advanced_python_wrapper.hostinfo import HostInfo, HostRole
3334
from aws_advanced_python_wrapper.pep249_methods import DbApiMethod
@@ -49,6 +50,18 @@ class ReadWriteSplittingConnectionManager(Plugin):
4950
DbApiMethod.CONNECT.method_name,
5051
DbApiMethod.NOTIFY_CONNECTION_CHANGED.method_name,
5152
DbApiMethod.CONNECTION_SET_READ_ONLY.method_name,
53+
54+
DbApiMethod.CONNECTION_COMMIT.method_name,
55+
DbApiMethod.CONNECTION_AUTOCOMMIT.method_name,
56+
DbApiMethod.CONNECTION_AUTOCOMMIT_SETTER.method_name,
57+
DbApiMethod.CONNECTION_IS_READ_ONLY.method_name,
58+
DbApiMethod.CONNECTION_SET_READ_ONLY.method_name,
59+
DbApiMethod.CONNECTION_ROLLBACK.method_name,
60+
61+
DbApiMethod.CURSOR_EXECUTE.method_name,
62+
DbApiMethod.CURSOR_FETCHONE.method_name,
63+
DbApiMethod.CURSOR_FETCHMANY.method_name,
64+
DbApiMethod.CURSOR_FETCHALL.method_name
5265
}
5366
_POOL_PROVIDER_CLASS_NAME = "aws_advanced_python_wrapper.sql_alchemy_connection_provider.SqlAlchemyPooledConnectionProvider"
5467

@@ -137,12 +150,15 @@ def execute(
137150
try:
138151
return execute_func()
139152
except Exception as ex:
153+
if isinstance(ex, FailoverFailedError):
154+
# Evict the current connection from the pool right away since it is not reusable
155+
self._close_connections(False)
140156
if isinstance(ex, FailoverError):
141157
logger.debug(
142158
"ReadWriteSplittingPlugin.FailoverExceptionWhileExecutingCommand",
143159
method_name
144160
)
145-
self._close_idle_connections()
161+
self._close_connections(True)
146162
else:
147163
logger.debug(
148164
"ReadWriteSplittingPlugin.ExceptionWhileExecutingCommand",
@@ -200,6 +216,7 @@ def _initialize_writer_connection(self):
200216
)
201217
self._set_writer_connection(conn, writer_host)
202218
self._switch_current_connection_to(conn, writer_host)
219+
return None
203220

204221
def _switch_connection_if_required(self, read_only: bool):
205222
current_conn = self._plugin_service.current_connection
@@ -293,7 +310,7 @@ def _switch_to_writer_connection(self):
293310
"ReadWriteSplittingPlugin.NoWriterFound")
294311

295312
if self._is_reader_conn_from_internal_pool:
296-
self._close_connection_if_idle(self._reader_connection)
313+
self._close_connection(self._reader_connection)
297314

298315
logger.debug(
299316
"ReadWriteSplittingPlugin.SwitchedFromReaderToWriter",
@@ -319,7 +336,7 @@ def _switch_to_reader_connection(self):
319336
)
320337
):
321338
# The old reader cannot be used anymore, close it.
322-
self._close_connection_if_idle(self._reader_connection)
339+
self._close_connection(self._reader_connection)
323340

324341
self._in_read_write_split = True
325342
if not self._is_connection_usable(self._reader_connection, driver_dialect):
@@ -345,7 +362,7 @@ def _switch_to_reader_connection(self):
345362
self._initialize_reader_connection()
346363

347364
if self._is_writer_conn_from_internal_pool:
348-
self._close_connection_if_idle(self._writer_connection)
365+
self._close_connection(self._writer_connection)
349366

350367
def _initialize_reader_connection(self):
351368
if self._connection_handler.has_no_readers():
@@ -383,38 +400,35 @@ def _initialize_reader_connection(self):
383400
"ReadWriteSplittingPlugin.SwitchedFromWriterToReader", reader_host.url
384401
)
385402

386-
def _close_connection_if_idle(self, internal_conn: Optional[Connection]):
403+
def _close_connection(self, internal_conn: Optional[Connection], close_only_if_idle: bool = True):
387404
if internal_conn is None:
388405
return
389406

390407
current_conn = self._plugin_service.current_connection
391408
driver_dialect = self._plugin_service.driver_dialect
392409

410+
if close_only_if_idle and internal_conn == current_conn:
411+
# Connection is in use, do not close
412+
return
413+
393414
try:
394-
if internal_conn != current_conn and self._is_connection_usable(
395-
internal_conn, driver_dialect
396-
):
415+
if self._is_connection_usable(internal_conn, driver_dialect):
397416
driver_dialect.execute(DbApiMethod.CONNECTION_CLOSE.method_name, lambda: internal_conn.close())
398-
if internal_conn == self._writer_connection:
399-
self._writer_connection = None
400-
self._writer_host_info = None
401-
if internal_conn == self._reader_connection:
402-
self._reader_connection = None
403-
self._reader_host_info = None
404417
except Exception:
405418
# Ignore exceptions during cleanup - connection might already be dead
406419
pass
420+
finally:
421+
if internal_conn == self._writer_connection:
422+
self._writer_connection = None
423+
self._writer_host_info = None
424+
if internal_conn == self._reader_connection:
425+
self._reader_connection = None
426+
self._reader_host_info = None
407427

408-
def _close_idle_connections(self):
428+
def _close_connections(self, close_only_if_idle: bool = True):
409429
logger.debug("ReadWriteSplittingPlugin.ClosingInternalConnections")
410-
self._close_connection_if_idle(self._reader_connection)
411-
self._close_connection_if_idle(self._writer_connection)
412-
413-
# Always clear cached references even if connections couldn't be closed
414-
self._reader_connection = None
415-
self._reader_host_info = None
416-
self._writer_connection = None
417-
self._writer_host_info = None
430+
self._close_connection(self._reader_connection, close_only_if_idle)
431+
self._close_connection(self._writer_connection, close_only_if_idle)
418432

419433
@staticmethod
420434
def log_and_raise_exception(log_msg: str):
@@ -450,7 +464,7 @@ def host_list_provider_service(self) -> Optional[HostListProviderService]:
450464
...
451465

452466
@host_list_provider_service.setter
453-
def host_list_provider_service(self, new_value: int) -> None:
467+
def host_list_provider_service(self, new_value: HostListProviderService) -> None:
454468
"""The setter for the 'host_list_provider_service' attribute."""
455469
...
456470

aws_advanced_python_wrapper/resources/aws_advanced_python_wrapper_messages.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,7 @@ UnknownDialect.AbortConnection=[UnknownDialect] abort_connection was called, but
462462
Wrapper.ConnectMethod=[Wrapper] Target driver should be a target driver's connect() method/function.
463463
Wrapper.RequiredTargetDriver=[Wrapper] Target driver is required.
464464
Wrapper.UnsupportedAttribute=[Wrapper] Target driver does not have the attribute: '{}'
465-
Wrapper.Properties=[Wrapper] "Connection Properties: "
465+
Wrapper.Properties=[Wrapper] "Connection Properties: {}"
466466

467467
WriterFailoverHandler.AlreadyWriter=[WriterFailoverHandler] Current reader connection is actually a new writer connection.
468468
WriterFailoverHandler.CurrentTopologyNone=[WriterFailoverHandler] Current topology cannot be None.

aws_advanced_python_wrapper/utils/telemetry/xray_telemetry.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ def set_success(self, success: bool):
5555

5656
def set_attribute(self, key: str, value: AttributeValue):
5757
if self._trace_entity is not None:
58-
self._trace_entity.put_annotation(key, value)
58+
# XRay only supports str, bool, int, float - not sequences
59+
if isinstance(value, (str, bool, int, float)):
60+
self._trace_entity.put_annotation(key, value)
5961

6062
def set_exception(self, exception: Exception):
6163
if self._trace_entity is not None and exception is not None:
@@ -90,8 +92,7 @@ def _clone_and_close_context(context: XRayTelemetryContext, trace_level: Telemet
9092

9193
clone._trace_entity.start_time = context._trace_entity.start_time
9294

93-
for key in context._trace_entity.annotations.items():
94-
value = context._trace_entity.annotations[key]
95+
for key, value in context._trace_entity.annotations.items():
9596
if key != TelemetryConst.TRACE_NAME_ANNOTATION and value is not None:
9697
clone.set_attribute(key, value)
9798

docs/examples/PGXRayTelemetry.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
print("-- running application")
3030
logging.basicConfig(level=logging.DEBUG)
3131

32-
xray_recorder.configure(sampler=LocalSampler({"version": 1, "default": {"fixed_target": 1, "rate": 1.0}}))
32+
xray_recorder.configure(sampler=LocalSampler({"version": 1, "default": {"fixed_target": 1, "rate": 1.0}, "rules": []}))
3333
global_sdk_config.set_sdk_enabled(True)
3434

3535
with xray_recorder.in_segment("python_xray_telemetry_app") as segment:

tests/integration/container/test_aurora_failover.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ def test_writer_fail_within_transaction_start_transaction(
328328

329329
@pytest.mark.parametrize("plugins", ["aurora_connection_tracker,failover", "aurora_connection_tracker,failover_v2"])
330330
@enable_on_features([TestEnvironmentFeatures.FAILOVER_SUPPORTED])
331-
@pytest.mark.repeat(5)
331+
@pytest.mark.repeat(5) # Run this test case a few more times since it is a flakey test
332332
def test_writer_failover_in_idle_connections(
333333
self, test_driver: TestDriver, props, conn_utils, aurora_utility, plugins):
334334
target_driver_connect = DriverHelper.get_connect_func(test_driver)

tests/integration/container/test_custom_endpoint.py

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,9 @@ def rds_utils(self):
6161
return RdsTestUtility(region)
6262

6363
@pytest.fixture(scope='class')
64-
def props(self):
64+
def default_props(self):
6565
p: Properties = Properties(
66-
{"plugins": "custom_endpoint,read_write_splitting,failover", "connect_timeout": 10_000, "autocommit": True, "cluster_id": "cluster1"})
66+
{"connect_timeout": 10_000, "autocommit": True, "cluster_id": "cluster1"})
6767

6868
features = TestEnvironment.get_current().get_features()
6969
if TestEnvironmentFeatures.TELEMETRY_TRACES_ENABLED in features \
@@ -77,6 +77,18 @@ def props(self):
7777

7878
return p
7979

80+
@pytest.fixture(scope='class')
81+
def props_with_failover(self, default_props):
82+
p = default_props.copy()
83+
p["plugins"] = "custom_endpoint,read_write_splitting,failover"
84+
return p
85+
86+
@pytest.fixture(scope='class')
87+
def props(self, default_props):
88+
p = default_props.copy()
89+
p["plugins"] = "custom_endpoint,read_write_splitting"
90+
return p
91+
8092
@pytest.fixture(scope='class', autouse=True)
8193
def setup_and_teardown(self):
8294
env_info = TestEnvironment.get_current().get_info()
@@ -193,7 +205,7 @@ def _wait_until_endpoint_deleted(self, rds_client):
193205
else:
194206
self.logger.debug(f"Custom endpoint '{self.endpoint_id}' successfully deleted.")
195207

196-
def wait_until_endpoint_has_members(self, rds_client, expected_members: Set[str]):
208+
def wait_until_endpoint_has_members(self, rds_client, expected_members: Set[str], rds_utils):
197209
start_ns = perf_counter_ns()
198210
end_ns = perf_counter_ns() + 20 * 60 * 1_000_000_000 # 20 minutes
199211
has_correct_state = False
@@ -218,16 +230,17 @@ def wait_until_endpoint_has_members(self, rds_client, expected_members: Set[str]
218230
pytest.fail(f"Timed out while waiting for the custom endpoint to stabilize: "
219231
f"'{TestCustomEndpoint.endpoint_id}'.")
220232

233+
rds_utils.make_sure_instances_up(list(expected_members))
221234
duration_sec = (perf_counter_ns() - start_ns) / 1_000_000_000
222235
self.logger.debug(f"wait_until_endpoint_has_specified_members took {duration_sec} seconds.")
223236

224-
def test_custom_endpoint_failover(self, test_driver: TestDriver, conn_utils, props, rds_utils):
225-
props["failover_mode"] = "reader_or_writer"
237+
def test_custom_endpoint_failover(self, test_driver: TestDriver, conn_utils, props_with_failover, rds_utils):
238+
props_with_failover["failover_mode"] = "reader_or_writer"
226239

227240
target_driver_connect = DriverHelper.get_connect_func(test_driver)
228241
kwargs = conn_utils.get_connect_params()
229242
kwargs["host"] = self.endpoint_info["Endpoint"]
230-
conn = AwsWrapperConnection.connect(target_driver_connect, **kwargs, **props)
243+
conn = AwsWrapperConnection.connect(target_driver_connect, **kwargs, **props_with_failover)
231244

232245
endpoint_members = self.endpoint_info["StaticMembers"]
233246
instance_id = rds_utils.query_instance_id(conn)
@@ -281,7 +294,7 @@ def _setup_custom_endpoint_role(self, target_driver_connect, conn_kwargs, rds_ut
281294
self.logger.debug("Custom endpoint instance successfully set to role: " + host_role.name)
282295

283296
def test_custom_endpoint_read_write_splitting__with_custom_endpoint_changes__with_reader_as_init_conn(
284-
self, test_driver: TestDriver, conn_utils, props, rds_utils):
297+
self, test_driver: TestDriver, conn_utils, props_with_failover, rds_utils):
285298
'''
286299
Will test for the following scenario:
287300
1. Initially connect to a reader instance via the custom endpoint.
@@ -297,13 +310,13 @@ def test_custom_endpoint_read_write_splitting__with_custom_endpoint_changes__wit
297310
kwargs["host"] = self.endpoint_info["Endpoint"]
298311
# This setting is not required for the test, but it allows us to also test re-creation of expired monitors since
299312
# it takes more than 30 seconds to modify the cluster endpoint (usually around 140s).
300-
props["custom_endpoint_idle_monitor_expiration_ms"] = 30_000
301-
props["wait_for_custom_endpoint_info_timeout_ms"] = 30_000
313+
props_with_failover["custom_endpoint_idle_monitor_expiration_ms"] = 30_000
314+
props_with_failover["wait_for_custom_endpoint_info_timeout_ms"] = 30_000
302315

303316
# Ensure that we are starting with a reader connection
304317
self._setup_custom_endpoint_role(target_driver_connect, kwargs, rds_utils, HostRole.READER)
305318

306-
conn = AwsWrapperConnection.connect(target_driver_connect, **kwargs, **props)
319+
conn = AwsWrapperConnection.connect(target_driver_connect, **kwargs, **props_with_failover)
307320
endpoint_members = self.endpoint_info["StaticMembers"]
308321
original_reader_id = rds_utils.query_instance_id(conn)
309322
assert original_reader_id in endpoint_members
@@ -323,7 +336,7 @@ def test_custom_endpoint_read_write_splitting__with_custom_endpoint_changes__wit
323336
)
324337

325338
try:
326-
self.wait_until_endpoint_has_members(rds_client, {original_reader_id, writer_id})
339+
self.wait_until_endpoint_has_members(rds_client, {original_reader_id, writer_id}, rds_utils)
327340

328341
# We should now be able to switch to writer.
329342
conn.read_only = False
@@ -339,7 +352,7 @@ def test_custom_endpoint_read_write_splitting__with_custom_endpoint_changes__wit
339352
rds_client.modify_db_cluster_endpoint(
340353
DBClusterEndpointIdentifier=self.endpoint_id,
341354
StaticMembers=[original_reader_id])
342-
self.wait_until_endpoint_has_members(rds_client, {original_reader_id})
355+
self.wait_until_endpoint_has_members(rds_client, {original_reader_id}, rds_utils)
343356

344357
# We should not be able to switch again because new_member was removed from the custom endpoint.
345358
# We are connected to the reader. Attempting to switch to the writer will throw an exception.
@@ -350,16 +363,16 @@ def test_custom_endpoint_read_write_splitting__with_custom_endpoint_changes__wit
350363

351364
def test_custom_endpoint_read_write_splitting__with_custom_endpoint_changes__with_writer_as_init_conn(
352365
self, test_driver: TestDriver, conn_utils, props, rds_utils):
353-
'''
366+
"""
354367
Will test for the following scenario:
355-
1. Iniitially connect to the writer instance via the custom endpoint.
368+
1. Initially connect to the writer instance via the custom endpoint.
356369
2. Attempt to switch to reader instance - should succeed, but will still use writer instance as reader.
357370
3. Modify the custom endpoint to add a reader instance as a static member.
358371
4. Switch to reader instance - should succeed.
359372
5. Switch back to writer instance - should succeed.
360373
6. Modify the custom endpoint to remove the reader instance as a static member.
361374
7. Attempt to switch to reader instance - should fail since the custom endpoint no longer has the reader instance.
362-
'''
375+
"""
363376

364377
target_driver_connect = DriverHelper.get_connect_func(test_driver)
365378
kwargs = conn_utils.get_connect_params()
@@ -401,7 +414,7 @@ def test_custom_endpoint_read_write_splitting__with_custom_endpoint_changes__wit
401414
)
402415

403416
try:
404-
self.wait_until_endpoint_has_members(rds_client, {original_writer_id, reader_id_to_add})
417+
self.wait_until_endpoint_has_members(rds_client, {original_writer_id, reader_id_to_add}, rds_utils)
405418
# We should now be able to switch to new_member.
406419
conn.read_only = True
407420
new_instance_id = rds_utils.query_instance_id(conn)
@@ -414,7 +427,7 @@ def test_custom_endpoint_read_write_splitting__with_custom_endpoint_changes__wit
414427
rds_client.modify_db_cluster_endpoint(
415428
DBClusterEndpointIdentifier=self.endpoint_id,
416429
StaticMembers=[original_writer_id])
417-
self.wait_until_endpoint_has_members(rds_client, {original_writer_id})
430+
self.wait_until_endpoint_has_members(rds_client, {original_writer_id}, rds_utils)
418431

419432
# We should not be able to switch again because new_member was removed from the custom endpoint.
420433
# We are connected to the writer. Attempting to switch to the reader will not work but will intentionally

tests/integration/container/test_read_write_splitting.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -832,6 +832,7 @@ def test_pooled_connection__cluster_url_failover(
832832
TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED,
833833
TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED])
834834
@disable_on_engines([DatabaseEngine.MYSQL])
835+
@pytest.mark.repeat(10) # Run this test case a few more times since it is a flakey test
835836
def test_pooled_connection__failover_failed(
836837
self,
837838
test_environment: TestEnvironment,

tests/integration/container/utils/test_environment.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ def _create() -> TestEnvironment:
102102
xray_recorder.configure(daemon_address=xray_daemon_endpoint,
103103
context_missing="IGNORE_ERROR",
104104
sampler=LocalSampler(
105-
{"version": 1, "default": {"fixed_target": 1, "rate": 1.0}}))
105+
{"version": 1, "default": {"fixed_target": 1, "rate": 1.0}, "rules": []}))
106106
global_sdk_config.set_sdk_enabled(True)
107107

108108
if TestEnvironmentFeatures.TELEMETRY_METRICS_ENABLED in env.get_features():

0 commit comments

Comments
 (0)