Skip to content
10 changes: 8 additions & 2 deletions pymongo/asynchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2779,7 +2779,7 @@ def __init__(
self._last_error: Optional[Exception] = None
self._retrying = False
self._always_retryable = False
self._multiple_retries = _csot.get_timeout() is not None
self._max_retries = float("inf") if _csot.get_timeout() is not None else 1
self._client = mongo_client
self._retry_policy = mongo_client._retry_policy
self._func = func
Expand Down Expand Up @@ -2852,6 +2852,8 @@ async def run(self) -> T:
# ConnectionFailures do not supply a code property
exc_code = getattr(exc, "code", None)
overloaded = exc.has_error_label("SystemOverloadedError")
if overloaded:
self._max_retries = self._client.options.max_adaptive_retries
always_retryable = exc.has_error_label("RetryableError") and overloaded
if not self._client.options.retry_reads or (
not always_retryable
Expand Down Expand Up @@ -2890,6 +2892,8 @@ async def run(self) -> T:
exc_to_check = exc.error
retryable_write_label = exc_to_check.has_error_label("RetryableWriteError")
overloaded = exc_to_check.has_error_label("SystemOverloadedError")
if overloaded:
self._max_retries = self._client.options.max_adaptive_retries
always_retryable = exc_to_check.has_error_label("RetryableError") and overloaded

# Always retry abortTransaction and commitTransaction up to once
Expand Down Expand Up @@ -2943,7 +2947,9 @@ async def run(self) -> T:

def _is_not_eligible_for_retry(self) -> bool:
"""Checks if the exchange is not eligible for retry"""
return not self._retryable or (self._is_retrying() and not self._multiple_retries)
return not self._retryable or (
self._is_retrying() and self._attempt_number >= self._max_retries
)

def _is_retrying(self) -> bool:
"""Checks if the exchange is currently undergoing a retry"""
Expand Down
10 changes: 8 additions & 2 deletions pymongo/synchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2769,7 +2769,7 @@ def __init__(
self._last_error: Optional[Exception] = None
self._retrying = False
self._always_retryable = False
self._multiple_retries = _csot.get_timeout() is not None
self._max_retries = float("inf") if _csot.get_timeout() is not None else 1
self._client = mongo_client
self._retry_policy = mongo_client._retry_policy
self._func = func
Expand Down Expand Up @@ -2842,6 +2842,8 @@ def run(self) -> T:
# ConnectionFailures do not supply a code property
exc_code = getattr(exc, "code", None)
overloaded = exc.has_error_label("SystemOverloadedError")
if overloaded:
self._max_retries = self._client.options.max_adaptive_retries
always_retryable = exc.has_error_label("RetryableError") and overloaded
if not self._client.options.retry_reads or (
not always_retryable
Expand Down Expand Up @@ -2880,6 +2882,8 @@ def run(self) -> T:
exc_to_check = exc.error
retryable_write_label = exc_to_check.has_error_label("RetryableWriteError")
overloaded = exc_to_check.has_error_label("SystemOverloadedError")
if overloaded:
self._max_retries = self._client.options.max_adaptive_retries
always_retryable = exc_to_check.has_error_label("RetryableError") and overloaded

# Always retry abortTransaction and commitTransaction up to once
Expand Down Expand Up @@ -2933,7 +2937,9 @@ def run(self) -> T:

def _is_not_eligible_for_retry(self) -> bool:
"""Checks if the exchange is not eligible for retry"""
return not self._retryable or (self._is_retrying() and not self._multiple_retries)
return not self._retryable or (
self._is_retrying() and self._attempt_number >= self._max_retries
)

def _is_retrying(self) -> bool:
"""Checks if the exchange is currently undergoing a retry"""
Expand Down
39 changes: 37 additions & 2 deletions test/asynchronous/test_client_backpressure.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import pathlib
import sys
from time import perf_counter
from unittest import mock
from unittest.mock import patch

from pymongo.common import MAX_ADAPTIVE_RETRIES
Expand Down Expand Up @@ -228,7 +229,7 @@ async def test_01_operation_retry_uses_exponential_backoff(self, random_func):
self.assertTrue(abs((end1 - start1) - (end0 - start0 + 0.3)) < 0.3)

@async_client_context.require_failCommand_appName
async def test_03_overload_retries_limited(self):
async def test_02_overload_retries_limited(self):
# Drivers should test that overload errors are retried a maximum of two times.

# 1. Let `client` be a `MongoClient`.
Expand Down Expand Up @@ -260,7 +261,7 @@ async def test_03_overload_retries_limited(self):
self.assertEqual(len(self.listener.started_events), MAX_ADAPTIVE_RETRIES + 1)

@async_client_context.require_failCommand_appName
async def test_04_overload_retries_limited_configured(self):
async def test_03_overload_retries_limited_configured(self):
# Drivers should test that overload errors are retried a maximum of maxAdaptiveRetries times.
max_retries = 1

Expand Down Expand Up @@ -294,6 +295,40 @@ async def test_04_overload_retries_limited_configured(self):
# 6. Assert that the total number of started commands is max_retries + 1.
self.assertEqual(len(self.listener.started_events), max_retries + 1)

@async_client_context.require_failCommand_fail_point
async def test_04_backoff_is_not_applied_for_non_overload_errors(self):
# Drivers should test that backoff is not applied for non-overload retryable errors.
if _IS_SYNC:
mock_target = "pymongo.synchronous.helpers._RetryPolicy.backoff"
else:
mock_target = "pymongo.asynchronous.helpers._RetryPolicy.backoff"

# 1. Let `client` be a `MongoClient`.
client = self.client

# 2. Let `coll` be a collection.
coll = client.test.test
await coll.insert_one({})

# 3. Configure a failpoint with a retryable error that is NOT an overload error.
failpoint = {
"configureFailPoint": "failCommand",
"mode": {"times": 1},
"data": {
"failCommands": ["find"],
"errorCode": 91, # ShutdownInProgress
"errorLabels": ["RetryableError"],
},
}

# 4. Perform a find operation with `coll` that succeeds on its first retry attempt.
with mock.patch(mock_target, return_value=1) as mock_backoff:
async with self.fail_point(failpoint):
await coll.find_one({})

# 5. Assert that no backoff was used for the retry attempt.
mock_backoff.assert_not_called()


# Location of JSON test specifications.
if _IS_SYNC:
Expand Down
71 changes: 70 additions & 1 deletion test/asynchronous/test_retryable_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import threading
from test.asynchronous.utils import async_set_fail_point

from pymongo.errors import OperationFailure
from pymongo import MongoClient
from pymongo.common import MAX_ADAPTIVE_RETRIES
from pymongo.errors import OperationFailure, PyMongoError

sys.path[0:0] = [""]

Expand All @@ -38,6 +40,7 @@
)

from pymongo.monitoring import (
CommandFailedEvent,
ConnectionCheckedOutEvent,
ConnectionCheckOutFailedEvent,
ConnectionCheckOutFailedReason,
Expand Down Expand Up @@ -145,6 +148,20 @@ async def test_pool_paused_error_is_retryable(self):


class TestRetryableReads(AsyncIntegrationTest):
async def asyncSetUp(self) -> None:
await super().asyncSetUp()
self.setup_client = MongoClient(**async_client_context.client_options)
self.addCleanup(self.setup_client.close)

# TODO: After PYTHON-4595 we can use async event handlers and remove this workaround.
def configure_fail_point_sync(self, command_args, off=False) -> None:
cmd = {"configureFailPoint": "failCommand"}
cmd.update(command_args)
if off:
cmd["mode"] = "off"
cmd.pop("data", None)
self.setup_client.admin.command(cmd)

@async_client_context.require_multiple_mongoses
@async_client_context.require_failCommand_fail_point
async def test_retryable_reads_are_retried_on_a_different_mongos_when_one_is_available(self):
Expand Down Expand Up @@ -383,6 +400,58 @@ async def test_03_03_retryable_reads_caused_by_overload_errors_are_retried_on_th
# 6. Assert that both events occurred on the same server.
assert listener.failed_events[0].connection_id == listener.succeeded_events[0].connection_id

@async_client_context.require_failCommand_fail_point
@async_client_context.require_version_min(4, 4, 0) # type:ignore[untyped-decorator]
async def test_overload_then_nonoverload_retries_increased_reads(self) -> None:
# Create a client.
listener = OvertCommandListener()

# Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error
# code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels.
overload_fail_point = {
"configureFailPoint": "failCommand",
"mode": {"times": 1},
"data": {
"failCommands": ["find"],
"errorLabels": ["RetryableError", "SystemOverloadedError"],
"errorCode": 91,
},
}

# Configure a fail point with error code `91` (ShutdownInProgress) with only the `RetryableError` error label.
non_overload_fail_point = {
"configureFailPoint": "failCommand",
"mode": "alwaysOn",
"data": {
"failCommands": ["find"],
"errorCode": 91,
"errorLabels": ["RetryableError"],
},
}

def failed(event: CommandFailedEvent) -> None:
# Configure the fail point command only if the failed event is for the 91 error configured in step 2.
if listener.failed_events:
return
assert event.failure["code"] == 91
self.configure_fail_point_sync(non_overload_fail_point)
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
listener.failed_events.append(event)

listener.failed = failed

client = await self.async_rs_client(event_listeners=[listener])
await client.test.test.insert_one({})

self.configure_fail_point_sync(overload_fail_point)
self.addCleanup(self.configure_fail_point_sync, {}, off=True)

with self.assertRaises(PyMongoError):
await client.test.test.find_one()

started_finds = [e for e in listener.started_events if e.command_name == "find"]
self.assertEqual(len(started_finds), MAX_ADAPTIVE_RETRIES + 1)


if __name__ == "__main__":
unittest.main()
51 changes: 51 additions & 0 deletions test/asynchronous/test_retryable_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import threading
from test.asynchronous.utils import async_set_fail_point, flaky

from pymongo.common import MAX_ADAPTIVE_RETRIES

sys.path[0:0] = [""]

from test.asynchronous import (
Expand Down Expand Up @@ -784,6 +786,55 @@ def failed(event: CommandFailedEvent) -> None:
# Assert that the error does not contain the error label `NoWritesPerformed`.
assert "NoWritesPerformed" not in exc.exception.errors["errorLabels"]

async def test_overload_then_nonoverload_retries_increased_writes(self) -> None:
# Create a client with retryWrites=true.
listener = OvertCommandListener()

# Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error
# code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels.
overload_fail_point = {
"configureFailPoint": "failCommand",
"mode": {"times": 1},
"data": {
"failCommands": ["insert"],
"errorLabels": ["RetryableError", "SystemOverloadedError"],
"errorCode": 91,
},
}

# Configure a fail point with error code `91` (ShutdownInProgress) with the `RetryableError` and `RetryableWriteError` error labels.
non_overload_fail_point = {
"configureFailPoint": "failCommand",
"mode": "alwaysOn",
"data": {
"failCommands": ["insert"],
"errorCode": 91,
"errorLabels": ["RetryableError", "RetryableWriteError"],
},
}

def failed(event: CommandFailedEvent) -> None:
# Configure the fail point command only if the failed event is for the 91 error configured in step 2.
if listener.failed_events:
return
assert event.failure["code"] == 91
self.configure_fail_point_sync(non_overload_fail_point)
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
listener.failed_events.append(event)

listener.failed = failed

client = await self.async_rs_client(retryWrites=True, event_listeners=[listener])

self.configure_fail_point_sync(overload_fail_point)
self.addCleanup(self.configure_fail_point_sync, {}, off=True)

with self.assertRaises(PyMongoError):
await client.test.test.insert_one({"x": 1})

started_inserts = [e for e in listener.started_events if e.command_name == "insert"]
self.assertEqual(len(started_inserts), MAX_ADAPTIVE_RETRIES + 1)


if __name__ == "__main__":
unittest.main()
39 changes: 37 additions & 2 deletions test/test_client_backpressure.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import pathlib
import sys
from time import perf_counter
from unittest import mock
from unittest.mock import patch

from pymongo.common import MAX_ADAPTIVE_RETRIES
Expand Down Expand Up @@ -228,7 +229,7 @@ def test_01_operation_retry_uses_exponential_backoff(self, random_func):
self.assertTrue(abs((end1 - start1) - (end0 - start0 + 0.3)) < 0.3)

@client_context.require_failCommand_appName
def test_03_overload_retries_limited(self):
def test_02_overload_retries_limited(self):
# Drivers should test that overload errors are retried a maximum of two times.

# 1. Let `client` be a `MongoClient`.
Expand Down Expand Up @@ -260,7 +261,7 @@ def test_03_overload_retries_limited(self):
self.assertEqual(len(self.listener.started_events), MAX_ADAPTIVE_RETRIES + 1)

@client_context.require_failCommand_appName
def test_04_overload_retries_limited_configured(self):
def test_03_overload_retries_limited_configured(self):
# Drivers should test that overload errors are retried a maximum of maxAdaptiveRetries times.
max_retries = 1

Expand Down Expand Up @@ -292,6 +293,40 @@ def test_04_overload_retries_limited_configured(self):
# 6. Assert that the total number of started commands is max_retries + 1.
self.assertEqual(len(self.listener.started_events), max_retries + 1)

@client_context.require_failCommand_fail_point
def test_04_backoff_is_not_applied_for_non_overload_errors(self):
# Drivers should test that backoff is not applied for non-overload retryable errors.
if _IS_SYNC:
mock_target = "pymongo.synchronous.helpers._RetryPolicy.backoff"
else:
mock_target = "pymongo.helpers._RetryPolicy.backoff"

# 1. Let `client` be a `MongoClient`.
client = self.client

# 2. Let `coll` be a collection.
coll = client.test.test
coll.insert_one({})

# 3. Configure a failpoint with a retryable error that is NOT an overload error.
failpoint = {
"configureFailPoint": "failCommand",
"mode": {"times": 1},
"data": {
"failCommands": ["find"],
"errorCode": 91, # ShutdownInProgress
"errorLabels": ["RetryableError"],
},
}

# 4. Perform a find operation with `coll` that succeeds on its first retry attempt.
with mock.patch(mock_target, return_value=1) as mock_backoff:
with self.fail_point(failpoint):
coll.find_one({})

# 5. Assert that no backoff was used for the retry attempt.
mock_backoff.assert_not_called()


# Location of JSON test specifications.
if _IS_SYNC:
Expand Down
Loading
Loading