Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

Set max connections to 50 for client
## [1.9.9] - 2025-09-28
- Changed client retry
## [1.9.8] - 2025-08-21
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
url="https://github.com/weka-io/talker",
license='BSD',
install_requires=[
'redis==3.3.7',
'redis==4.1.0',
'weka-easypy==0.3.1'
],
extras_require={
Expand Down
62 changes: 21 additions & 41 deletions talker/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from redis import StrictRedis
import redis.exceptions
from redis.backoff import ExponentialBackoff
from redis.retry import Retry

from easypy.caching import cached_property, locking_cache
from easypy.collections import chunkify
Expand All @@ -21,17 +23,28 @@
REDIS_SOCKET_TIMEOUT, REDIS_SOCKET_CONNECT_TIMEOUT, REDIS_RETRY_ON_TIMEOUT,
REDIS_HEALTH_CHECK_INTERVAL, AGENT_ACK_TIMEOUT, _logger, _verbose_logger, IS_ALIVE_ACK_TIMEOUT, IS_ALIVE_TIMEOUT
)
from talker.utils import retry_redis_op

REDIS_RETRY_BASE_SECONDS = 1
REDIS_RETRY_CAP_SECONDS = 10
REDIS_RETRY_MAX_ATTEMPTS = 7 # backoff sleeps total 54s: 2+4+8+10+10+10+10


@locking_cache
def get_redis(host, password, port):
# Keep cumulative backoff sleep under one minute.
retry_policy = Retry(
backoff=ExponentialBackoff(base=REDIS_RETRY_BASE_SECONDS, cap=REDIS_RETRY_CAP_SECONDS),
retries=REDIS_RETRY_MAX_ATTEMPTS,
)
return StrictRedis(
host=host, password=password, port=port,
socket_timeout=REDIS_SOCKET_TIMEOUT,
socket_connect_timeout=REDIS_SOCKET_CONNECT_TIMEOUT,
retry=retry_policy,
retry_on_timeout=REDIS_RETRY_ON_TIMEOUT,
health_check_interval=REDIS_HEALTH_CHECK_INTERVAL
retry_on_error=[redis.exceptions.ConnectionError],
health_check_interval=REDIS_HEALTH_CHECK_INTERVAL,
max_connections=50
)


Expand Down Expand Up @@ -177,11 +190,6 @@ def reset_server_error(self, host_id):
def poll(self, cmds):
results = {cmd: dict(ack=cmd.ack, retcode=cmd.retcode) for cmd in cmds}

@retry_redis_op
def _execute_poll_pipeline(pipeline_obj, log_context_extra=None):
self._pipeline_flush_log(pipeline_obj)
return pipeline_obj.execute()

with self.redis.pipeline() as p:
res_idx_to_i = {}
res_idx = 0
Expand All @@ -197,16 +205,8 @@ def _execute_poll_pipeline(pipeline_obj, log_context_extra=None):
res_idx += 1

if res_idx:
host_ids_summary = sorted(list(set(c.host_id for c in cmds if hasattr(c, 'host_id'))))
log_extra_for_retry = {
'client_operation': 'poll',
'num_cmds': len(cmds),
'host_ids_summary': str(host_ids_summary)[:100]
}
pipeline_results = _execute_poll_pipeline(
p,
log_context_extra=log_extra_for_retry
)
self._pipeline_flush_log(p)
pipeline_results = p.execute()

for (i, result) in enumerate(pipeline_results):
cmd_obj_loop, slot = res_idx_to_i[i] # Renamed to cmd_obj_loop
Expand Down Expand Up @@ -383,25 +383,12 @@ def get_output(self, cmds, decode='utf-8'):
"""
ret = []

@retry_redis_op
def _execute_get_output_pipeline_part(pipeline_obj, part_name, log_context_extra=None):
self._pipeline_flush_log(pipeline_obj)
return pipeline_obj.execute()

with self.redis.pipeline() as p:
for cmd in cmds:
cmd._request_outputs(p)

host_ids_summary = sorted(list(set(c.host_id for c in cmds if hasattr(c, 'host_id'))))
log_extra_part1 = {
'client_operation': 'get_output_part1_request',
'num_cmds': len(cmds),
'host_ids_summary': str(host_ids_summary)[:100]
}
pipeline_results_part1 = _execute_get_output_pipeline_part(
p, "request_outputs",
log_context_extra=log_extra_part1
)
self._pipeline_flush_log(p)
pipeline_results_part1 = p.execute()
p.reset()

for i, (stdout_key, stderr_key) in enumerate(chunkify(pipeline_results_part1, 2)):
Expand All @@ -416,15 +403,8 @@ def _execute_get_output_pipeline_part(pipeline_obj, part_name, log_context_extra
ret.append((stdout_content, stderr_content))

if p.command_stack:
log_extra_part2 = {
'client_operation': 'get_output_part2_trim',
'num_cmds': len(cmds),
'host_ids_summary': str(host_ids_summary)[:100]
}
_execute_get_output_pipeline_part(
p, "trim_outputs",
log_context_extra=log_extra_part2
)
self._pipeline_flush_log(p)
p.execute()
return MultiObject(ret)

def iter_output(self, cmds, sleep=1.0, decode='utf-8'):
Expand Down
30 changes: 1 addition & 29 deletions talker/reactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@
from queue import Queue, Empty
from threading import RLock, Event, Semaphore

import redis.exceptions

from easypy.concurrency import _check_exiting, concurrent, _run_with_exception_logging, raise_in_main_thread
from easypy.timing import wait, Timer
from easypy.units import MINUTE

from talker.errors import NoResponseForRedisCommand
from talker.config import _logger, _verbose_logger, REDIS_SOCKET_TIMEOUT
from talker.utils import retry_redis_op


class TalkerReactor():
Expand Down Expand Up @@ -72,37 +69,12 @@ def _send_data(self, items):

assert len(results) == len(items), "Our redis pipeline got out of sync?"

@retry_redis_op
def execute_single_with_retry(redis_client, cmd_name, args, kwargs, log_context_extra=None):
redis_func = getattr(redis_client, cmd_name)
return redis_func(*args, **kwargs)

cmds_summary = ", ".join(item.cmd for item in items)
log_extra_base = {
'reactor_operation': 'retry_single_command',
'commands_summary': cmds_summary[:1000],
'num_items': len(items)
}

for item, result in zip(items, results):
if item.callback:
item.callback()

final_result = result

# Retry only failed items individually (do not resend successful ones)
if isinstance(result, redis.exceptions.RedisError):
current_log_context = {**log_extra_base, 'failed_cmd': item.cmd}
final_result = execute_single_with_retry(
self.talker.redis,
item.cmd,
item.args,
item.kwargs,
log_context_extra=current_log_context
)

if item.event: # non-async
item.results.append(final_result)
item.results.append(result)
item.event.set()
finally:
self._current_workers.release()
Expand Down