From 59f0cc0873fe18f41d677a937028d80a0db2d9f2 Mon Sep 17 00:00:00 2001 From: Yonatan Abir Date: Wed, 11 Feb 2026 16:25:37 +0200 Subject: [PATCH 1/2] [DEVOPS-3839] limiting max connections to 50 --- CHANGELOG.md | 2 +- talker/client.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dab9b4e..bb165d3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] - +Set max connections to 50 for client ## [1.9.9] - 2025-09-28 - Changed client retry ## [1.9.8] - 2025-08-21 diff --git a/talker/client.py b/talker/client.py index 83f68de..8107909 100644 --- a/talker/client.py +++ b/talker/client.py @@ -31,7 +31,8 @@ def get_redis(host, password, port): socket_timeout=REDIS_SOCKET_TIMEOUT, socket_connect_timeout=REDIS_SOCKET_CONNECT_TIMEOUT, retry_on_timeout=REDIS_RETRY_ON_TIMEOUT, - health_check_interval=REDIS_HEALTH_CHECK_INTERVAL + health_check_interval=REDIS_HEALTH_CHECK_INTERVAL, + max_connections=50 ) From 6d2ec6e5924a05610af55ef06d5fc68c1862c092 Mon Sep 17 00:00:00 2001 From: Yonatan Abir Date: Tue, 24 Feb 2026 15:54:33 +0200 Subject: [PATCH 2/2] removing old retries. updating redis package, adding retries to redis client for connection errors --- setup.py | 2 +- talker/client.py | 59 +++++++++++++++-------------------------------- talker/reactor.py | 30 +----------------------- 3 files changed, 21 insertions(+), 70 deletions(-) diff --git a/setup.py b/setup.py index a27d9b3..2bd2cd4 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ url="https://github.com/weka-io/talker", license='BSD', install_requires=[ - 'redis==3.3.7', + 'redis==4.1.0', 'weka-easypy==0.3.1' ], extras_require={ diff --git a/talker/client.py b/talker/client.py index 8107909..3d33b46 100644 --- a/talker/client.py +++ b/talker/client.py @@ -4,6 +4,8 @@ from redis import StrictRedis import redis.exceptions +from redis.backoff import ExponentialBackoff +from redis.retry import Retry from easypy.caching import cached_property, locking_cache from easypy.collections import chunkify @@ -21,16 +23,26 @@ REDIS_SOCKET_TIMEOUT, REDIS_SOCKET_CONNECT_TIMEOUT, REDIS_RETRY_ON_TIMEOUT, REDIS_HEALTH_CHECK_INTERVAL, AGENT_ACK_TIMEOUT, _logger, _verbose_logger, IS_ALIVE_ACK_TIMEOUT, IS_ALIVE_TIMEOUT ) -from talker.utils import retry_redis_op + +REDIS_RETRY_BASE_SECONDS = 1 +REDIS_RETRY_CAP_SECONDS = 10 +REDIS_RETRY_MAX_ATTEMPTS = 7 # backoff sleeps total 54s: 2+4+8+10+10+10+10 @locking_cache def get_redis(host, password, port): + # Keep cumulative backoff sleep under one minute. + retry_policy = Retry( + backoff=ExponentialBackoff(base=REDIS_RETRY_BASE_SECONDS, cap=REDIS_RETRY_CAP_SECONDS), + retries=REDIS_RETRY_MAX_ATTEMPTS, + ) return StrictRedis( host=host, password=password, port=port, socket_timeout=REDIS_SOCKET_TIMEOUT, socket_connect_timeout=REDIS_SOCKET_CONNECT_TIMEOUT, + retry=retry_policy, retry_on_timeout=REDIS_RETRY_ON_TIMEOUT, + retry_on_error=[redis.exceptions.ConnectionError], health_check_interval=REDIS_HEALTH_CHECK_INTERVAL, max_connections=50 ) @@ -178,11 +190,6 @@ def reset_server_error(self, host_id): def poll(self, cmds): results = {cmd: dict(ack=cmd.ack, retcode=cmd.retcode) for cmd in cmds} - @retry_redis_op - def _execute_poll_pipeline(pipeline_obj, log_context_extra=None): - self._pipeline_flush_log(pipeline_obj) - return pipeline_obj.execute() - with self.redis.pipeline() as p: res_idx_to_i = {} res_idx = 0 @@ -198,16 +205,8 @@ def _execute_poll_pipeline(pipeline_obj, log_context_extra=None): res_idx += 1 if res_idx: - host_ids_summary = sorted(list(set(c.host_id for c in cmds if hasattr(c, 'host_id')))) - log_extra_for_retry = { - 'client_operation': 'poll', - 'num_cmds': len(cmds), - 'host_ids_summary': str(host_ids_summary)[:100] - } - pipeline_results = _execute_poll_pipeline( - p, - log_context_extra=log_extra_for_retry - ) + self._pipeline_flush_log(p) + pipeline_results = p.execute() for (i, result) in enumerate(pipeline_results): cmd_obj_loop, slot = res_idx_to_i[i] # Renamed to cmd_obj_loop @@ -384,25 +383,12 @@ def get_output(self, cmds, decode='utf-8'): """ ret = [] - @retry_redis_op - def _execute_get_output_pipeline_part(pipeline_obj, part_name, log_context_extra=None): - self._pipeline_flush_log(pipeline_obj) - return pipeline_obj.execute() - with self.redis.pipeline() as p: for cmd in cmds: cmd._request_outputs(p) - host_ids_summary = sorted(list(set(c.host_id for c in cmds if hasattr(c, 'host_id')))) - log_extra_part1 = { - 'client_operation': 'get_output_part1_request', - 'num_cmds': len(cmds), - 'host_ids_summary': str(host_ids_summary)[:100] - } - pipeline_results_part1 = _execute_get_output_pipeline_part( - p, "request_outputs", - log_context_extra=log_extra_part1 - ) + self._pipeline_flush_log(p) + pipeline_results_part1 = p.execute() p.reset() for i, (stdout_key, stderr_key) in enumerate(chunkify(pipeline_results_part1, 2)): @@ -417,15 +403,8 @@ def _execute_get_output_pipeline_part(pipeline_obj, part_name, log_context_extra ret.append((stdout_content, stderr_content)) if p.command_stack: - log_extra_part2 = { - 'client_operation': 'get_output_part2_trim', - 'num_cmds': len(cmds), - 'host_ids_summary': str(host_ids_summary)[:100] - } - _execute_get_output_pipeline_part( - p, "trim_outputs", - log_context_extra=log_extra_part2 - ) + self._pipeline_flush_log(p) + p.execute() return MultiObject(ret) def iter_output(self, cmds, sleep=1.0, decode='utf-8'): diff --git a/talker/reactor.py b/talker/reactor.py index a8cde53..3fe002d 100644 --- a/talker/reactor.py +++ b/talker/reactor.py @@ -5,15 +5,12 @@ from queue import Queue, Empty from threading import RLock, Event, Semaphore -import redis.exceptions - from easypy.concurrency import _check_exiting, concurrent, _run_with_exception_logging, raise_in_main_thread from easypy.timing import wait, Timer from easypy.units import MINUTE from talker.errors import NoResponseForRedisCommand from talker.config import _logger, _verbose_logger, REDIS_SOCKET_TIMEOUT -from talker.utils import retry_redis_op class TalkerReactor(): @@ -72,37 +69,12 @@ def _send_data(self, items): assert len(results) == len(items), "Our redis pipeline got out of sync?" - @retry_redis_op - def execute_single_with_retry(redis_client, cmd_name, args, kwargs, log_context_extra=None): - redis_func = getattr(redis_client, cmd_name) - return redis_func(*args, **kwargs) - - cmds_summary = ", ".join(item.cmd for item in items) - log_extra_base = { - 'reactor_operation': 'retry_single_command', - 'commands_summary': cmds_summary[:1000], - 'num_items': len(items) - } - for item, result in zip(items, results): if item.callback: item.callback() - final_result = result - - # Retry only failed items individually (do not resend successful ones) - if isinstance(result, redis.exceptions.RedisError): - current_log_context = {**log_extra_base, 'failed_cmd': item.cmd} - final_result = execute_single_with_retry( - self.talker.redis, - item.cmd, - item.args, - item.kwargs, - log_context_extra=current_log_context - ) - if item.event: # non-async - item.results.append(final_result) + item.results.append(result) item.event.set() finally: self._current_workers.release()