Skip to content

Commit 6bd0214

Browse files
committed
PYTHON-5794 - Add prose tests to verify correct retry behavior when a mix of overload and non-overload errors are encountered
1 parent e1751ff commit 6bd0214

6 files changed

Lines changed: 262 additions & 6 deletions

File tree

pymongo/asynchronous/mongo_client.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2779,7 +2779,7 @@ def __init__(
27792779
self._last_error: Optional[Exception] = None
27802780
self._retrying = False
27812781
self._always_retryable = False
2782-
self._multiple_retries = _csot.get_timeout() is not None
2782+
self._max_retries = float("inf") if _csot.get_timeout() is not None else 1
27832783
self._client = mongo_client
27842784
self._retry_policy = mongo_client._retry_policy
27852785
self._func = func
@@ -2852,6 +2852,8 @@ async def run(self) -> T:
28522852
# ConnectionFailures do not supply a code property
28532853
exc_code = getattr(exc, "code", None)
28542854
overloaded = exc.has_error_label("SystemOverloadedError")
2855+
if overloaded:
2856+
self._max_retries = self._client.options.max_adaptive_retries
28552857
always_retryable = exc.has_error_label("RetryableError") and overloaded
28562858
if not self._client.options.retry_reads or (
28572859
not always_retryable
@@ -2890,6 +2892,8 @@ async def run(self) -> T:
28902892
exc_to_check = exc.error
28912893
retryable_write_label = exc_to_check.has_error_label("RetryableWriteError")
28922894
overloaded = exc_to_check.has_error_label("SystemOverloadedError")
2895+
if overloaded:
2896+
self._max_retries = self._client.options.max_adaptive_retries
28932897
always_retryable = exc_to_check.has_error_label("RetryableError") and overloaded
28942898

28952899
# Always retry abortTransaction and commitTransaction up to once
@@ -2943,7 +2947,9 @@ async def run(self) -> T:
29432947

29442948
def _is_not_eligible_for_retry(self) -> bool:
29452949
"""Checks if the exchange is not eligible for retry"""
2946-
return not self._retryable or (self._is_retrying() and not self._multiple_retries)
2950+
return not self._retryable or (
2951+
self._is_retrying() and self._attempt_number >= self._max_retries
2952+
)
29472953

29482954
def _is_retrying(self) -> bool:
29492955
"""Checks if the exchange is currently undergoing a retry"""

pymongo/synchronous/mongo_client.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2769,7 +2769,7 @@ def __init__(
27692769
self._last_error: Optional[Exception] = None
27702770
self._retrying = False
27712771
self._always_retryable = False
2772-
self._multiple_retries = _csot.get_timeout() is not None
2772+
self._max_retries = float("inf") if _csot.get_timeout() is not None else 1
27732773
self._client = mongo_client
27742774
self._retry_policy = mongo_client._retry_policy
27752775
self._func = func
@@ -2842,6 +2842,8 @@ def run(self) -> T:
28422842
# ConnectionFailures do not supply a code property
28432843
exc_code = getattr(exc, "code", None)
28442844
overloaded = exc.has_error_label("SystemOverloadedError")
2845+
if overloaded:
2846+
self._max_retries = self._client.options.max_adaptive_retries
28452847
always_retryable = exc.has_error_label("RetryableError") and overloaded
28462848
if not self._client.options.retry_reads or (
28472849
not always_retryable
@@ -2880,6 +2882,8 @@ def run(self) -> T:
28802882
exc_to_check = exc.error
28812883
retryable_write_label = exc_to_check.has_error_label("RetryableWriteError")
28822884
overloaded = exc_to_check.has_error_label("SystemOverloadedError")
2885+
if overloaded:
2886+
self._max_retries = self._client.options.max_adaptive_retries
28832887
always_retryable = exc_to_check.has_error_label("RetryableError") and overloaded
28842888

28852889
# Always retry abortTransaction and commitTransaction up to once
@@ -2933,7 +2937,9 @@ def run(self) -> T:
29332937

29342938
def _is_not_eligible_for_retry(self) -> bool:
29352939
"""Checks if the exchange is not eligible for retry"""
2936-
return not self._retryable or (self._is_retrying() and not self._multiple_retries)
2940+
return not self._retryable or (
2941+
self._is_retrying() and self._attempt_number >= self._max_retries
2942+
)
29372943

29382944
def _is_retrying(self) -> bool:
29392945
"""Checks if the exchange is currently undergoing a retry"""

test/asynchronous/test_retryable_reads.py

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import threading
2222
from test.asynchronous.utils import async_set_fail_point
2323

24-
from pymongo.errors import OperationFailure
24+
from pymongo import AsyncMongoClient, MongoClient
25+
from pymongo.common import MAX_ADAPTIVE_RETRIES
26+
from pymongo.errors import OperationFailure, PyMongoError
2527

2628
sys.path[0:0] = [""]
2729

@@ -38,6 +40,7 @@
3840
)
3941

4042
from pymongo.monitoring import (
43+
CommandFailedEvent,
4144
ConnectionCheckedOutEvent,
4245
ConnectionCheckOutFailedEvent,
4346
ConnectionCheckOutFailedReason,
@@ -145,6 +148,20 @@ async def test_pool_paused_error_is_retryable(self):
145148

146149

147150
class TestRetryableReads(AsyncIntegrationTest):
151+
async def asyncSetUp(self) -> None:
152+
await super().asyncSetUp()
153+
self.setup_client = MongoClient(**async_client_context.default_client_options)
154+
self.addCleanup(self.setup_client.close)
155+
156+
# TODO: After PYTHON-4595 we can use async event handlers and remove this workaround.
157+
def configure_fail_point_sync(self, command_args, off=False) -> None:
158+
cmd = {"configureFailPoint": "failCommand"}
159+
cmd.update(command_args)
160+
if off:
161+
cmd["mode"] = "off"
162+
cmd.pop("data", None)
163+
self.setup_client.admin.command(cmd)
164+
148165
@async_client_context.require_multiple_mongoses
149166
@async_client_context.require_failCommand_fail_point
150167
async def test_retryable_reads_are_retried_on_a_different_mongos_when_one_is_available(self):
@@ -383,6 +400,58 @@ async def test_03_03_retryable_reads_caused_by_overload_errors_are_retried_on_th
383400
# 6. Assert that both events occurred on the same server.
384401
assert listener.failed_events[0].connection_id == listener.succeeded_events[0].connection_id
385402

403+
@async_client_context.require_failCommand_fail_point
404+
@async_client_context.require_version_min(4, 4, 0)
405+
async def test_overload_then_nonoverload_retries_increased_reads(self) -> None:
406+
# Create a client.
407+
listener = OvertCommandListener()
408+
409+
# Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error
410+
# code `91` (NotWritablePrimary) and `RetryableError` and `SystemOverloadedError` labels.
411+
overload_fail_point = {
412+
"configureFailPoint": "failCommand",
413+
"mode": {"times": 1},
414+
"data": {
415+
"failCommands": ["find"],
416+
"errorLabels": ["RetryableError", "SystemOverloadedError"],
417+
"errorCode": 91,
418+
},
419+
}
420+
421+
# Configure a fail point with error code `91` (ShutdownInProgress) with only the `RetryableError` error label.
422+
non_overload_fail_point = {
423+
"configureFailPoint": "failCommand",
424+
"mode": "alwaysOn",
425+
"data": {
426+
"failCommands": ["find"],
427+
"errorCode": 91,
428+
"errorLabels": ["RetryableError"],
429+
},
430+
}
431+
432+
def failed(event: CommandFailedEvent) -> None:
433+
# Configure the fail point command only if the failed event is for the 91 error configured in step 2.
434+
if listener.failed_events:
435+
return
436+
assert event.failure["code"] == 91
437+
self.configure_fail_point_sync(non_overload_fail_point)
438+
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
439+
listener.failed_events.append(event)
440+
441+
listener.failed = failed
442+
443+
client = await self.async_rs_client(event_listeners=[listener])
444+
await client.test.test.insert_one({})
445+
446+
self.configure_fail_point_sync(overload_fail_point)
447+
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
448+
449+
with self.assertRaises(PyMongoError):
450+
await client.test.test.find_one()
451+
452+
started_finds = [e for e in listener.started_events if e.command_name == "find"]
453+
self.assertEqual(len(started_finds), MAX_ADAPTIVE_RETRIES + 1)
454+
386455

387456
if __name__ == "__main__":
388457
unittest.main()

test/asynchronous/test_retryable_writes.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import threading
2323
from test.asynchronous.utils import async_set_fail_point, flaky
2424

25+
from pymongo.common import MAX_ADAPTIVE_RETRIES
26+
2527
sys.path[0:0] = [""]
2628

2729
from test.asynchronous import (
@@ -784,6 +786,57 @@ def failed(event: CommandFailedEvent) -> None:
784786
# Assert that the error does not contain the error label `NoWritesPerformed`.
785787
assert "NoWritesPerformed" not in exc.exception.errors["errorLabels"]
786788

789+
@async_client_context.require_failCommand_fail_point
790+
@async_client_context.require_version_min(4, 4, 0)
791+
async def test_overload_then_nonoverload_retries_increased_writes(self) -> None:
792+
# Create a client with retryWrites=true.
793+
listener = OvertCommandListener()
794+
795+
# Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error
796+
# code `91` (NotWritablePrimary) and `RetryableError` and `SystemOverloadedError` labels.
797+
overload_fail_point = {
798+
"configureFailPoint": "failCommand",
799+
"mode": {"times": 1},
800+
"data": {
801+
"failCommands": ["insert"],
802+
"errorLabels": ["RetryableError", "SystemOverloadedError"],
803+
"errorCode": 91,
804+
},
805+
}
806+
807+
# Configure a fail point with error code `91` (ShutdownInProgress) with the `RetryableError` and `RetryableWriteError` error labels.
808+
non_overload_fail_point = {
809+
"configureFailPoint": "failCommand",
810+
"mode": "alwaysOn",
811+
"data": {
812+
"failCommands": ["insert"],
813+
"errorCode": 91,
814+
"errorLabels": ["RetryableError", "RetryableWriteError"],
815+
},
816+
}
817+
818+
def failed(event: CommandFailedEvent) -> None:
819+
# Configure the fail point command only if the failed event is for the 91 error configured in step 2.
820+
if listener.failed_events:
821+
return
822+
assert event.failure["code"] == 91
823+
self.configure_fail_point_sync(non_overload_fail_point)
824+
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
825+
listener.failed_events.append(event)
826+
827+
listener.failed = failed
828+
829+
client = await self.async_rs_client(retryWrites=True, event_listeners=[listener])
830+
831+
self.configure_fail_point_sync(overload_fail_point)
832+
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
833+
834+
with self.assertRaises(PyMongoError):
835+
await client.test.test.insert_one({"x": 1})
836+
837+
started_inserts = [e for e in listener.started_events if e.command_name == "insert"]
838+
self.assertEqual(len(started_inserts), MAX_ADAPTIVE_RETRIES + 1)
839+
787840

788841
if __name__ == "__main__":
789842
unittest.main()

test/test_retryable_reads.py

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import threading
2222
from test.utils import set_fail_point
2323

24-
from pymongo.errors import OperationFailure
24+
from pymongo import MongoClient
25+
from pymongo.common import MAX_ADAPTIVE_RETRIES
26+
from pymongo.errors import OperationFailure, PyMongoError
2527

2628
sys.path[0:0] = [""]
2729

@@ -38,6 +40,7 @@
3840
)
3941

4042
from pymongo.monitoring import (
43+
CommandFailedEvent,
4144
ConnectionCheckedOutEvent,
4245
ConnectionCheckOutFailedEvent,
4346
ConnectionCheckOutFailedReason,
@@ -145,6 +148,20 @@ def test_pool_paused_error_is_retryable(self):
145148

146149

147150
class TestRetryableReads(IntegrationTest):
151+
def setUp(self) -> None:
152+
super().setUp()
153+
self.setup_client = MongoClient(**client_context.default_client_options)
154+
self.addCleanup(self.setup_client.close)
155+
156+
# TODO: After PYTHON-4595 we can use async event handlers and remove this workaround.
157+
def configure_fail_point_sync(self, command_args, off=False) -> None:
158+
cmd = {"configureFailPoint": "failCommand"}
159+
cmd.update(command_args)
160+
if off:
161+
cmd["mode"] = "off"
162+
cmd.pop("data", None)
163+
self.setup_client.admin.command(cmd)
164+
148165
@client_context.require_multiple_mongoses
149166
@client_context.require_failCommand_fail_point
150167
def test_retryable_reads_are_retried_on_a_different_mongos_when_one_is_available(self):
@@ -381,6 +398,58 @@ def test_03_03_retryable_reads_caused_by_overload_errors_are_retried_on_the_same
381398
# 6. Assert that both events occurred on the same server.
382399
assert listener.failed_events[0].connection_id == listener.succeeded_events[0].connection_id
383400

401+
@client_context.require_failCommand_fail_point
402+
@client_context.require_version_min(4, 4, 0)
403+
def test_overload_then_nonoverload_retries_increased_reads(self) -> None:
404+
# Create a client.
405+
listener = OvertCommandListener()
406+
407+
# Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error
408+
# code `91` (NotWritablePrimary) and `RetryableError` and `SystemOverloadedError` labels.
409+
overload_fail_point = {
410+
"configureFailPoint": "failCommand",
411+
"mode": {"times": 1},
412+
"data": {
413+
"failCommands": ["find"],
414+
"errorLabels": ["RetryableError", "SystemOverloadedError"],
415+
"errorCode": 91,
416+
},
417+
}
418+
419+
# Configure a fail point with error code `91` (ShutdownInProgress) with only the `RetryableError` error label.
420+
non_overload_fail_point = {
421+
"configureFailPoint": "failCommand",
422+
"mode": "alwaysOn",
423+
"data": {
424+
"failCommands": ["find"],
425+
"errorCode": 91,
426+
"errorLabels": ["RetryableError"],
427+
},
428+
}
429+
430+
def failed(event: CommandFailedEvent) -> None:
431+
# Configure the fail point command only if the failed event is for the 91 error configured in step 2.
432+
if listener.failed_events:
433+
return
434+
assert event.failure["code"] == 91
435+
self.configure_fail_point_sync(non_overload_fail_point)
436+
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
437+
listener.failed_events.append(event)
438+
439+
listener.failed = failed
440+
441+
client = self.rs_client(event_listeners=[listener])
442+
client.test.test.insert_one({})
443+
444+
self.configure_fail_point_sync(overload_fail_point)
445+
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
446+
447+
with self.assertRaises(PyMongoError):
448+
client.test.test.find_one()
449+
450+
started_finds = [e for e in listener.started_events if e.command_name == "find"]
451+
self.assertEqual(len(started_finds), MAX_ADAPTIVE_RETRIES + 1)
452+
384453

385454
if __name__ == "__main__":
386455
unittest.main()

0 commit comments

Comments
 (0)