Skip to content

refactor(queue): снизить сложность _run_single_worker F(54)→C (#922)#927

Merged
axisrow merged 2 commits into
mainfrom
fix/922-run-single-worker-complexity
Jun 20, 2026
Merged

refactor(queue): снизить сложность _run_single_worker F(54)→C (#922)#927
axisrow merged 2 commits into
mainfrom
fix/922-run-single-worker-complexity

Conversation

@axisrow

@axisrow axisrow commented Jun 20, 2026

Copy link
Copy Markdown
Owner

Что и зачем

PR-4 из #922. CollectionQueue._run_single_worker (src/collection_queue.py) была radon F(54) — ~280-строчный воркер-цикл, владеющий dequeue задачи, 4 pre-dispatch guard-ами, сборкой kwargs, обработкой завершения и 6 except-ветками.

Извлечённые методы

Метод Что CC
_validate_task_pre_dispatch 4 skip-guard (удалён/отменён/отложен/отфильтрован + shutdown) → возвращает channel или None C(12)
_build_collect_kwargs progress-callback + cancel_event по сигнатуре A(5)
_handle_collection_completion cancelled-requeue/cancel vs COMPLETED+note C(16)
_handle_collection_exception 6 обработчиков ошибок, диспетчеризация по isinstance в порядке исходных except-веток → (keep_known_task_id, stop_after_no_clients) B(8)

_run_single_worker теперь цикл «guards + pull + делегирование». radon cc: F(54) → C(17); самый крупный хелпер C(16).

Поведение не меняется

  • Тот же порядок guard-ов, та же precedence исключений (isinstance-цепочка повторяет порядок except-клауз), те же терминальные статусы и тексты note.
  • _retried_tasks.discard в no-exception пути перенесён в конец тела try — эквивалент старого else.
  • logger.exception вызывается из обработчика, вызванного внутри except Exceptionsys.exc_info() активен, traceback логируется как прежде.

Проверка

  • ruff check src/collection_queue.py — clean
  • pytest -k collection_queue28 passed (все 5 типов исключений + generic + completion + guards + pause/resume + delayed requeue)

Refs #922. Внешнее поведение без изменений.

🤖 Generated with Claude Code

`CollectionQueue._run_single_worker` was rank F(54): one ~280-line worker
loop owning task dequeue, four pre-dispatch guards, kwargs assembly,
completion bookkeeping and six exception handlers. Split the cohesive blocks
into methods, leaving the loop as guards + pull + delegation:

- `_validate_task_pre_dispatch` — the four skip guards (deleted/cancelled/
  scheduled/filtered + shutdown); returns the channel to collect or None
- `_build_collect_kwargs` — progress callback + signature-gated cancel_event
- `_handle_collection_completion` — cancelled-requeue/cancel vs COMPLETED+note
- `_handle_collection_exception` — the six failure handlers, dispatched by an
  isinstance chain that mirrors the original except-clause precedence; returns
  (keep_known_task_id, stop_after_no_clients) for the finally block

radon cc: `_run_single_worker` F(54) → C(17); largest helper C(16). No
behavior change — same guard order, same exception precedence, same terminal
statuses; the no-exception `_retried_tasks.discard` now runs at the end of the
try body (equivalent to the old else clause). Covered by
tests/test_collection_queue_db_pull.py (28 passed).

Refs #922.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@axisrow

axisrow commented Jun 20, 2026

Copy link
Copy Markdown
Owner Author

@claude @codex review. Focus on critical issues: bugs, security vulnerabilities, logical errors, data loss risks, performance problems. Do NOT nitpick style, naming conventions, minor formatting, or subjective preferences — only flag issues that could break functionality or cause real harm in production.

@claude

claude Bot commented Jun 20, 2026

Copy link
Copy Markdown

Claude finished @axisrow's task in 6m 53s —— View job


Code Review

  • Gather context
  • Read src/collection_queue.py and diff
  • Review for critical issues
  • Post findings

Summary

Reviewed src/collection_queue.py (+250/-221). The refactoring is structurally sound — guard order, exception precedence, and terminal state logic are faithfully preserved. One real regression found, one structural note.


Issue 1 — logger.exception silently drops tracebacks (logging regression)

File: src/collection_queue.py, lines 566 and 572 (also line 566 for the ConnectionError branch)

# _handle_collection_exception — called from:
except Exception as exc:
    keep_known_task_id, stop_after_no_clients = await self._handle_collection_exception(
        exc, task_id=task_id, ...
    )
# Inside _handle_collection_exception:
logger.exception("Collection failed for channel %d (reconnect failed)", channel.channel_id)
# ...
logger.exception("Collection failed for channel %d", channel.channel_id)

What breaks: Python coroutines have per-frame exception state (gi_exc_state). When CPython enters a new coroutine via send(), it swaps tstate->exc_info with the new coroutine's own (initially empty) exception state. This means sys.exc_info() inside _handle_collection_exception returns (None, None, None), not the exception from _run_single_worker's except block.

logger.exception(msg) is shorthand for logger.error(msg, exc_info=True) → calls sys.exc_info(). With (None, None, None) it emits NoneType: None with no stack trace — the error message is logged but the traceback is silently dropped.

In the original code the logger.exception calls were directly inside the except clauses of _run_single_worker (the same coroutine frame), where sys.exc_info() is always valid — even after an await — because the coroutine's own gi_exc_state was set when its except block was entered and is restored on resume.

Fix: pass the exception explicitly:

# Replace in _handle_collection_exception:
logger.exception("Collection failed for channel %d (reconnect failed)", channel.channel_id)
# → 
logger.error("Collection failed for channel %d (reconnect failed)", channel.channel_id, exc_info=exc)

logger.exception("Collection failed for channel %d", channel.channel_id)
# →
logger.error("Collection failed for channel %d", channel.channel_id, exc_info=exc)

exc is already a parameter of the method, so no additional plumbing is needed.

Fix this →


Note — task_done() accounting (correct, worth documenting)

The split introduced an asymmetry: _validate_task_pre_dispatch calls self._queue.task_done() for each skip path internally and returns None, while the caller calls task_done() in the finally block for the normal path. This is correct — continue after a None return skips the try/finally entirely — but it's an easy footgun for future maintainers: the queue's task_done() contract now has two call sites. No action required; just worth a comment.


No other critical issues

  • Guard order in _validate_task_pre_dispatch matches the original exactly
  • _known_task_ids cleanup behavior is preserved (including the intentional non-discard on task-deleted / cancelled skip paths)
  • Exception precedence in _handle_collection_exception (isinstance chain) mirrors the original except clause order
  • _retried_tasks.discard placement (end of try body vs original else) is semantically equivalent
  • asyncio.CancelledError (a BaseException, not Exception) still propagates uncaught through the new except Exception handler, same as before

@chatgpt-codex-connector

Copy link
Copy Markdown

Codex Review: Didn't find any major issues. Keep it up!

Reviewed commit: 13be769e38

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

@axisrow

axisrow commented Jun 20, 2026

Copy link
Copy Markdown
Owner Author

Триаж ревью (cycle 1/3)

Спасибо @claude @codex. Codex — без замечаний. По находкам Claude:

Issue 1 («logger.exception теряет traceback из awaited-корутины») — ❌ HALLUCINATION (проверено эмпирически)

Рассуждение про gi_exc_state верное, но вывод — нет. CPython при sys.exc_info() вызывает _PyErr_GetTopmostException, который идёт по цепочке previous_item до ближайшего активного исключения. У awaited-корутины свой exc_state пустой, но его previous_item указывает на exc_state вызывающей корутины (_run_single_worker), где как раз в except лежит пойманное исключение. Поэтому traceback не теряется.

Минимальный репро на том же Python 3.11.2:

import asyncio, sys, logging
logging.basicConfig(level=logging.DEBUG, format="%(levelname)s %(message)s")
logger = logging.getLogger("t")

async def handler():
    print("sys.exc_info():", sys.exc_info()[:2])
    logger.exception("from awaited coroutine")

async def main():
    try:
        raise ValueError("boom")
    except Exception:
        await handler()

asyncio.run(main())

Вывод:

ERROR from awaited coroutine
Traceback (most recent call last):
  File ".../repro.py", line 12, in main
    raise ValueError("boom")
ValueError: boom
sys.exc_info(): (<class 'ValueError'>, ValueError('boom'))

sys.exc_info() вернул ValueError, и logger.exception залогировал полный traceback. Это ровно паттерн нашего кода (await self._handle_collection_exception(exc, ...) из except Exception), поэтому регрессии нет — logger.exception логирует traceback как и раньше. FIX не требуется.

Note (двойной call-site task_done()) — SKIP → добавлю поясняющий комментарий

Согласен, что контракт task_done() теперь в двух местах (внутри _validate_task_pre_dispatch на skip-путях + в finally основного пути) — это footgun для будущих правок, хотя поведение корректно. Добавлю краткий комментарий в финальном cleanup-проходе. Действий по логике не требуется.

Остальные пункты Claude (порядок guard-ов, precedence исключений, размещение _retried_tasks.discard, проброс CancelledError) — подтверждение эквивалентности, спасибо.

@claude

claude Bot commented Jun 20, 2026

Copy link
Copy Markdown

Claude finished @axisrow's task in 34s —— View job


Triage Acknowledgement — Cycle 1/3

  • Gather context
  • Review triage from axisrow
  • Respond

Issue 1 — logger.exception traceback loss → Confirmed HALLUCINATION

Your analysis is correct and I was wrong. The reasoning about gi_exc_state was accurate but I drew the wrong conclusion: CPython's _PyErr_GetTopmostException walks the previous_item chain of per-coroutine exception states, so when an awaited coroutine is entered from within an active except block, the caller's exception is reachable and sys.exc_info() returns it correctly.

Your minimal repro confirms this empirically. The logger.exception calls in _handle_collection_exception log full tracebacks exactly as they did before the refactor. No fix needed.


Note — task_done() double call-site

Understood — a brief inline comment noting the two-site contract will be sufficient. No logic changes required.


Overall

The refactor is correct. All behavioural equivalences hold; Issue 1 was a false finding. Ready to merge once the optional task_done() comment is added.

#922)

Review nitpick: the split moved the queue-item task_done() accounting into
two places — the skip paths inside _validate_task_pre_dispatch and the
dispatch-path finally block. Add a comment at the call site documenting the
invariant (exactly one task_done() per dequeued item) so future maintainers
don't add a second. No behavior change.

Refs #922.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@chatgpt-codex-connector

Copy link
Copy Markdown

Summary

  • Added a maintainer comment documenting the task_done() ownership split: skip paths handled by _validate_task_pre_dispatch, normal collection paths handled by the surrounding finally block. src/collection_queue.pyL294-L298
  • Committed the change with docs(queue): clarify task_done ownership (f0a96d8) and created a PR via make_pr.

Testing

  • ruff check src/collection_queue.py
  • ⚠️ pytest -k collection_queue (pytest rejected configured --dist=loadfile, likely because pytest-xdist is unavailable in this environment)

View task →

@axisrow axisrow merged commit 413e1d1 into main Jun 20, 2026
4 checks passed
@axisrow axisrow deleted the fix/922-run-single-worker-complexity branch June 20, 2026 02:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants