-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Adjust timeout handling in client-side operations to account for RTT variations #1793
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…variations. JAVA-5375
# Conflicts: # driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java # driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java # driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java
| List<CommandStartedEvent> commandStartedEvents = commandListener.getCommandStartedEvents(); | ||
| assertCommandStartedEventsInOder(Arrays.asList("aggregate", "getMore", "getMore", "getMore", "killCursors"), | ||
| commandStartedEvents); | ||
| assertOnlyOneCommandTimeoutFailure("getMore"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no guarantee that the last getMore command will fail with a timeout. If maxAwaitTimeMS is higher than the remaining overall timeout, the driver uses the remaining timeout as maxTimeMS.
This means a response can arrive right before expiration (especially when RTT is near 0), and the driver can throw a timeout on the next operation before sending the next getMore command.
Example from logs:
* [2025/12/30 19:25:05.019] STARTED: aggregate - {"aggregate": "ClientSideOperationTimeoutProseTest_42", "readConcern": {"level": "majority"}, "pipeline": [{"$changeStream": {}}], "cursor": {"batchSize": 2}, "maxTimeMS": 2495, "$db": "JavaDriverTest", "lsid": {"id": {"$binary": {"base64": "nSsnGd4rT3i2tTKN/Xo98A==", "subType": "04"}}}}
* [2025/12/30 19:25:05.019] SUCCEEDED: aggregate (elapsed: 1ms)
* [2025/12/30 19:25:05.019] STARTED: getMore - {"getMore": 5528154821143727891, "collection": "ClientSideOperationTimeoutProseTest_42", "batchSize": 2, "maxTimeMS": 1000, "$db": "JavaDriverTest", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1767151095, "i": 4}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "nSsnGd4rT3i2tTKN/Xo98A==", "subType": "04"}}}}
* [2025/12/30 19:25:05.019] SUCCEEDED: getMore (elapsed: 999ms)
* [2025/12/30 19:25:05.019] STARTED: getMore - {"getMore": 5528154821143727891, "collection": "ClientSideOperationTimeoutProseTest_42", "batchSize": 2, "maxTimeMS": 1000, "$db": "JavaDriverTest", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1767151095, "i": 4}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "nSsnGd4rT3i2tTKN/Xo98A==", "subType": "04"}}}}
* [2025/12/30 19:25:05.019] SUCCEEDED: getMore (elapsed: 999ms)
* [2025/12/30 19:25:05.019] STARTED: getMore - {"getMore": 5528154821143727891, "collection": "ClientSideOperationTimeoutProseTest_42", "batchSize": 2, "maxTimeMS": 499, "$db": "JavaDriverTest", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1767151097, "i": 1}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "nSsnGd4rT3i2tTKN/Xo98A==", "subType": "04"}}}}
* [2025/12/30 19:25:05.019] SUCCEEDED: getMore (elapsed: 498ms)
* [2025/12/30 19:25:05.019] STARTED: killCursors - {"killCursors": "ClientSideOperationTimeoutProseTest_42", "cursors": [5528154821143727891], "maxTimeMS": 1000, "$db": "JavaDriverTest", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1767151097, "i": 1}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "lsid": {"id": {"$binary": {"base64": "nSsnGd4rT3i2tTKN/Xo98A==", "subType": "04"}}}}
* [2025/12/30 19:25:05.019] SUCCEEDED: killCursors (elapsed: 0ms)
* [2025/12/30 19:25:05.019] STARTED: endSessions - {"endSessions": [{"id": {"$binary": {"base64": "nSsnGd4rT3i2tTKN/Xo98A==", "subType": "04"}}}], "$db": "admin", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1767151097, "i": 1}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "$readPreference": {"mode": "primaryPreferred"}}
* [2025/12/30 19:25:05.019] SUCCEEDED: endSessions (elapsed: 0ms)
This was one of the cases for flakiness in this test.
| sleep(postSessionCloseSleep()); | ||
| CommandFailedEvent abortTransactionEvent = assertDoesNotThrow(() -> commandListener.getCommandFailedEvent("abortTransaction")); | ||
| long elapsedTime = abortTransactionEvent.getElapsedTime(TimeUnit.MILLISECONDS); | ||
| assertInstanceOf(MongoOperationTimeoutException.class, abortTransactionEvent.getThrowable()); | ||
| assertTrue(elapsedTime <= 400, "Took too long to time out, elapsedMS: " + elapsedTime); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Context
- This “End Session” tests used to live in the shared abstract test, so they ran for both sync and async.
- postSessionCloseSleep() is used to wait until async close operation is finished, because we can't use Mono.block as publisher is not exposed in reactive close().
Why this moved/split
- Async computed “elapsed time” as 400ms - postSessionCloseSleep(). That’s just subtracting a constant sleep from a constant budget. It does not measure how long session close actually took, because the async close path doesn’t give us a point where we can say “close finished” (no callback/future is checked here).
- The sync test also effectively depended on that fixed sleep for async test. Unless 400ms and postSessionCloseSleep() are tuned to match real timing, the assertion becomes sensitive.
The change
- Sync: measure time around the actual close() (sample before/after).
- Async: we can’t time close() directly, so we anchor the timeout check to something we can observe: the CommandFailedEvent timestamp (i.e., when the failure is observed on the async event path).
| long rtt = ClusterFixture.getPrimaryRTT(); | ||
| try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() | ||
| .timeout(rtt + 300, TimeUnit.MILLISECONDS))) { | ||
| .timeout(500, TimeUnit.MILLISECONDS))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still wait >500ms (currently thenAwait(Duration.ofMillis(600))) before issuing the next getMore, so the test continues to validate that the timeout is refreshed for the follow-up getMore operation.
The longer await is purely to reduce flakiness under slower setups (e.g., occasional TLS handshake / connection establishment overhead and thus consuming more of the allocated timeout time).
| long rtt = ClusterFixture.getPrimaryRTT(); | ||
| try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder() | ||
| .timeout(rtt + 300, TimeUnit.MILLISECONDS))) { | ||
| .timeout(500, TimeUnit.MILLISECONDS))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This follows the same approach used in: https://github.com/mongodb/mongo-java-driver/pull/1793/files#r2691975997
|
|
||
| gridFsBucket.uploadFromPublisher("filename", eventPublisher.getEventStream()) | ||
| .subscribe(testSubscriber); | ||
| GridFSUploadPublisher<ObjectId> filename = gridFsBucket.uploadFromPublisher("filename", eventPublisher.getEventStream()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The command log below shows several operations occur before the one we expect to time out (e.g., find, listIndexes -> createIndexes for both fs.files and fs.chunks). Each of these consumes part of the timeoutMS budget.
On slower runs, the index-creation steps can take longer than usual, which sometimes causes the test to fail earlier (e.g., at createIndexes) instead of at the intended failing operation.
Increasing timeoutMS gives more headroom for the setup operations while still preserving the assertion: the final operation fails with a timeout (see the insert failing at ~maxTimeMS in the log).
12:59:06.893 Command Started: find db=JavaDriverTest coll=db.fs.files maxTimeMS=570
12:59:06.919 Command Succeeded: find elapsedMs=26 (response omitted)
12:59:06.933 Command Started: listIndexes db=JavaDriverTest coll=db.fs.files maxTimeMS=523
12:59:06.940 Command Failed: listIndexes elapsedMs=7 code=26 (NamespaceNotFound) errmsg="ns does not exist: JavaDriverTest.db.fs.files"
12:59:06.947 Command Started: createIndexes db=JavaDriverTest coll=db.fs.files maxTimeMS=507
12:59:07.053 Command Succeeded: createIndexes elapsedMs=105 (response omitted)
12:59:07.055 Command Started: listIndexes db=JavaDriverTest coll=db.fs.chunks maxTimeMS=396
12:59:07.080 Command Failed: listIndexes elapsedMs=25 code=26 (NamespaceNotFound) errmsg="ns does not exist: JavaDriverTest.db.fs.chunks"
12:59:07.081 Command Started: createIndexes db=JavaDriverTest coll=db.fs.chunks maxTimeMS=369
12:59:07.175 Command Succeeded: createIndexes elapsedMs=93 (response omitted)
12:59:07.191 Command Started: insert db=JavaDriverTest coll=db.fs.chunks maxTimeMS=265
12:59:07.456 Command Failed: insert elapsedMs=264 errmsg="Operation exceeded the timeout limit: Timeout while receiving message"
| Mono<MongoCollection<Document>> chunksCollectionMono = collectionWithTimeoutDeferred(chunksCollection, timeout, | ||
| TIMEOUT_ERROR_MESSAGE_UPLOAD_CANCELLATION); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: this isn’t required for the main change in this PR, but it came up while debugging timeouts.
The current generic error message wasn’t actionable: in the reactive stack trace it points only to a lambda of deferred Mono and doesn’t help identify where the deferred Mono was created. This change adds more context to the timeout failure message.
This PR removes the ClusterFixture.getPrimaryRTT()- based adjustment in CSOT Prose Tests, because in practice the RTT is near 0ms (after min-subtraction), however, handshakes right after opening connections are causing average RTT to be skewed which introduces noise/flakiness to RTT calculation in the tests.
Problem
We rely on ClusterFixture.getPrimaryRTT() to adjust for RTT fluctuations across environment. ClusterFixture.getPrimaryRTT() reports a running average across 10 samples from its monitor thread. However,
ClusterFixture.getPrimaryRTT()metrics are by itself are not reliable.Root cause 1:
A per-source histogram shows that “Opening connection” samples can include extreme outliers (up to ~1020ms), while subsequent samples immediately return to near-zero. Because getPrimaryRTT() is a running average over 10 samples, these early outliers can dominate the reported RTT when CSOT runs early.
Root cause 2: unrelated test polluted cluster RTT monitor and cascaded into later tests
A separate source of skew was identified: shouldUseConnectTimeoutMsWhenEstablishingConnectionInBackground blocked "hello" / "isMaster" for ~500ms without using a uniquely scoped failpoint. This affected the shared ClusterFixture monitor, causing elevated RTT readings and cascading into subsequent tests, increasing flakiness.
Change in this PR
JAVA-5375