Skip to content

Simplify ConsumerWorker to accept unified Queue and unify queue APIs#68

Open
extreme4all wants to merge 1 commit intorefactor/worker-with-event-queuefrom
codex/refactor-worker-for-event_queue-component
Open

Simplify ConsumerWorker to accept unified Queue and unify queue APIs#68
extreme4all wants to merge 1 commit intorefactor/worker-with-event-queuefrom
codex/refactor-worker-for-event_queue-component

Conversation

@extreme4all
Copy link
Contributor

Motivation

  • Reduce complexity by removing separate producer-queue branching in the consumer and relying on the unified Queue abstraction.
  • Narrow the worker configuration surface by using a fixed batch limit and delegating I/O semantics to the Queue implementation.
  • Align worker behavior and interfaces with the event queue error/commit semantics to make retries and error handling clearer.

Description

  • ConsumerWorker now requires queue: Queue[T], drops _producer_queue, and removes configurable max_messages/max_interval_ms in favor of a MAX_MESSAGES constant and direct calls to queue.get_one/get_many/put/commit in components/bot_detector/worker/core.py.
  • BaseWorker was simplified to centralize logging/context helpers and moved consumer/producer-specific logic into concrete ConsumerWorker and ProducerWorker classes; ProducerWorker now uses queue.put to send batches.
  • The queue API was adjusted so QueueProducer.put returns Optional[Exception] and QueueConsumer.commit is exposed, enabling workers to observe producer/commit errors (changes in components/bot_detector/event_queue/core/event_queue.py).
  • Worker exports and interfaces were updated in components/bot_detector/worker/__init__.py and components/bot_detector/worker/interface.py to introduce ConsumerWorkerInterface and ProducerWorkerInterface, and tests were updated to use a combined DummyQueue and new return/error semantics, including a new TestProducerWorker that verifies put is invoked.

Testing

  • No automated tests were run as part of this change; tests in test/components/bot_detector/worker/ were updated to match the new Queue-based API but have not been executed.

Codex Task

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: a5f74c3062

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines 155 to +159
else:
failed_messages = await self._run_one()
await self._produce_failed_messages(failed_messages)
await self._consumer.commit()
commit_error = await self._queue.commit()
if commit_error:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Skip commit when no messages were consumed

ConsumerWorker now calls queue.commit() on every loop iteration, even after _run_one/_run_many returns early for an empty queue (or a fetch error). That is harmless for Kafka, but the in-memory backend’s commit() calls asyncio.Queue.task_done() without checking whether a get_nowait() actually happened. When get_one returns None or get_many returns an empty list, task_done() raises ValueError, and the worker will hit the _run_exception path and sleep 15s every loop. This is a regression for the in-memory queue: commit should be a no-op unless at least one message was consumed (or the in-memory commit should guard against empty).

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant