Simplify ConsumerWorker to accept unified Queue and unify queue APIs#68
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: a5f74c3062
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| else: | ||
| failed_messages = await self._run_one() | ||
| await self._produce_failed_messages(failed_messages) | ||
| await self._consumer.commit() | ||
| commit_error = await self._queue.commit() | ||
| if commit_error: |
There was a problem hiding this comment.
Skip commit when no messages were consumed
ConsumerWorker now calls queue.commit() on every loop iteration, even after _run_one/_run_many returns early for an empty queue (or a fetch error). That is harmless for Kafka, but the in-memory backend’s commit() calls asyncio.Queue.task_done() without checking whether a get_nowait() actually happened. When get_one returns None or get_many returns an empty list, task_done() raises ValueError, and the worker will hit the _run_exception path and sleep 15s every loop. This is a regression for the in-memory queue: commit should be a no-op unless at least one message was consumed (or the in-memory commit should guard against empty).
Useful? React with 👍 / 👎.
Motivation
Queueabstraction.Queueimplementation.Description
ConsumerWorkernow requiresqueue: Queue[T], drops_producer_queue, and removes configurablemax_messages/max_interval_msin favor of aMAX_MESSAGESconstant and direct calls toqueue.get_one/get_many/put/commitincomponents/bot_detector/worker/core.py.BaseWorkerwas simplified to centralize logging/context helpers and moved consumer/producer-specific logic into concreteConsumerWorkerandProducerWorkerclasses;ProducerWorkernow usesqueue.putto send batches.QueueProducer.putreturnsOptional[Exception]andQueueConsumer.commitis exposed, enabling workers to observe producer/commit errors (changes incomponents/bot_detector/event_queue/core/event_queue.py).components/bot_detector/worker/__init__.pyandcomponents/bot_detector/worker/interface.pyto introduceConsumerWorkerInterfaceandProducerWorkerInterface, and tests were updated to use a combinedDummyQueueand new return/error semantics, including a newTestProducerWorkerthat verifiesputis invoked.Testing
test/components/bot_detector/worker/were updated to match the newQueue-based API but have not been executed.Codex Task