diff --git a/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py b/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py index ed8171ade..1c57f18f0 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py +++ b/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py @@ -699,27 +699,23 @@ def _batch_claim_pending_messages( results = [] try: results = pipe.execute() - except Exception as e: - err_msg = str(e).lower() - if "nogroup" in err_msg or "no such key" in err_msg: - # Fallback: attempt sequential xautoclaim for robustness - for stream_key, need_count, label in claims_spec: - try: - self._ensure_consumer_group(stream_key=stream_key) - res = self._redis_conn.xautoclaim( - name=stream_key, - groupname=self.consumer_group, - consumername=self.consumer_name, - min_idle_time=self.orchestrator.get_task_idle_min(task_label=label), - start_id="0-0", - count=need_count, - justid=False, - ) - results.append(res) - except Exception: - continue - else: - logger.error(f"Pipeline xautoclaim failed: {e}") + except Exception: + # Fallback: attempt sequential xautoclaim for robustness + for stream_key, need_count, label in claims_spec: + try: + self._ensure_consumer_group(stream_key=stream_key) + res = self._redis_conn.xautoclaim( + name=stream_key, + groupname=self.consumer_group, + consumername=self.consumer_name, + min_idle_time=self.orchestrator.get_task_idle_min(task_label=label), + start_id="0-0", + count=need_count, + justid=False, + ) + results.append(res) + except Exception: + continue claimed_pairs: list[tuple[str, list[tuple[str, dict]]]] = [] for (stream_key, _need_count, _label), claimed_result in zip(