Adjusting the streaming batchsize to 100#271
Conversation
📝 WalkthroughWalkthroughAdds environment-driven inference URL and streaming batch size, threads wait/size configs through initialization, changes message-queue typing, replaces local image model inference with remote HTTP calls, and parallelizes embedding generation using a ThreadPoolExecutor. Changes
Sequence Diagram(s)sequenceDiagram
participant Stream as StreamListener
participant MQ as MessageQueue/EventManager
participant Rag as RagStreamListener
participant KB as KBStorageProcessor
participant Inf as InferenceService (remote HTTP)
participant DB as KnowledgeBase / Storage
Stream->>MQ: receive_messages(max_messages, wait_time_sec)
MQ-->>Stream: messages
Stream->>Rag: deliver messages
Rag->>KB: process message -> produce insights
KB->>Inf: POST base64 image -> /embeddings (concurrent via ThreadPool)
Inf-->>KB: JSON response (clip, dino)
KB->>DB: store embeddings/metadata
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
wavefront/server/background_jobs/rag_ingestion/rag_ingestion/processors/kb_storage_processor.py (1)
87-107:⚠️ Potential issue | 🟠 MajorException from any single future causes entire batch to fail, losing all completed embeddings.
If one
future.result()raises an exception, the loop exits and embeddings already collected inembeddingslist are discarded. Consider handling individual future failures gracefully.Also, line 107 uses
logger.infofor error logging—should belogger.errororlogger.exception.🛠️ Proposed fix with per-future error handling
with ThreadPoolExecutor(max_workers=10) as executor: futures = { executor.submit(self.__embed_single_insight, kb_insight): kb_insight for kb_insight in insights } embeddings: List[EmbeddingsToStore] = [] for future in as_completed(futures): - docs, doc_id, kb_id, document_type = future.result() - embeddings.append( - EmbeddingsToStore( - kb_embeddings=docs, - doc_id=doc_id, - kb_id=kb_id, - file_type=document_type, + try: + docs, doc_id, kb_id, document_type = future.result() + embeddings.append( + EmbeddingsToStore( + kb_embeddings=docs, + doc_id=doc_id, + kb_id=kb_id, + file_type=document_type, + ) ) - ) + except Exception as e: + kb_insight = futures[future] + logger.error(f'Failed to embed document {kb_insight.insights.doc_id}: {e}') self.kb_rag_storage.upload_embedding_with_retry(embeddings=embeddings) logger.info('Embeddings are stored in the db') except Exception as err: - logger.info(f'The error message captured is {err}') + logger.error(f'The error message captured is {err}', exc_info=True)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@wavefront/server/background_jobs/rag_ingestion/rag_ingestion/processors/kb_storage_processor.py` around lines 87 - 107, The current ThreadPoolExecutor loop uses future.result() directly which lets any exception from __embed_single_insight abort the loop and discard already-collected EmbeddingsToStore entries; change the handling to catch exceptions per-future (wrap future.result() in try/except), log failures with logger.error or logger.exception (not logger.info), continue processing other futures so successful embeddings accumulate, and only call self.kb_rag_storage.upload_embedding_with_retry(embeddings=embeddings) after iterating all futures; keep references to __embed_single_insight, EmbeddingsToStore, and kb_rag_storage.upload_embedding_with_retry when applying the fix.
🧹 Nitpick comments (1)
wavefront/server/packages/flo_utils/flo_utils/streaming/stream_listner.py (1)
75-78: Be aware of semantic differences across cloud providers.Per the Azure Storage Queue implementation (see
flo_cloud/azure/storage_queue.py),wait_time_secis repurposed asvisibility_timeoutrather than a long-polling timeout. Azure Storage Queue doesn't support long-polling and returns immediately.This means Azure deployments will poll more frequently than AWS/GCP. The 5-second sleep on empty responses (line 82) mitigates this, but behavior may still differ. Consider documenting this or adding provider-specific tuning.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@wavefront/server/packages/flo_utils/flo_utils/streaming/stream_listner.py` around lines 75 - 78, The call to self.event_manager.receive_messages(...) uses self.wait_time_sec which Azure Storage Queue treats as visibility_timeout (no long-polling), so Azure will poll differently; update the code to be provider-aware by checking the event manager type or a provider flag (e.g., inspect self.event_manager.provider or isinstance(self.event_manager, AzureStorageQueueClient)) and set a provider-specific poll parameter (use a separate streaming_poll_interval or set wait_time_sec differently for Azure) and/or implement a stronger backoff when responses are empty (replace fixed 5-second sleep with configurable/exponential backoff tied to the provider); also add a short doc comment describing the provider semantic difference referencing self.event_manager.receive_messages, self.wait_time_sec, self.streaming_batch_size and the empty-response sleep.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In
`@wavefront/server/background_jobs/rag_ingestion/rag_ingestion/processors/kb_storage_processor.py`:
- Around line 87-107: The current ThreadPoolExecutor loop uses future.result()
directly which lets any exception from __embed_single_insight abort the loop and
discard already-collected EmbeddingsToStore entries; change the handling to
catch exceptions per-future (wrap future.result() in try/except), log failures
with logger.error or logger.exception (not logger.info), continue processing
other futures so successful embeddings accumulate, and only call
self.kb_rag_storage.upload_embedding_with_retry(embeddings=embeddings) after
iterating all futures; keep references to __embed_single_insight,
EmbeddingsToStore, and kb_rag_storage.upload_embedding_with_retry when applying
the fix.
---
Nitpick comments:
In `@wavefront/server/packages/flo_utils/flo_utils/streaming/stream_listner.py`:
- Around line 75-78: The call to self.event_manager.receive_messages(...) uses
self.wait_time_sec which Azure Storage Queue treats as visibility_timeout (no
long-polling), so Azure will poll differently; update the code to be
provider-aware by checking the event manager type or a provider flag (e.g.,
inspect self.event_manager.provider or isinstance(self.event_manager,
AzureStorageQueueClient)) and set a provider-specific poll parameter (use a
separate streaming_poll_interval or set wait_time_sec differently for Azure)
and/or implement a stronger backoff when responses are empty (replace fixed
5-second sleep with configurable/exponential backoff tied to the provider); also
add a short doc comment describing the provider semantic difference referencing
self.event_manager.receive_messages, self.wait_time_sec,
self.streaming_batch_size and the empty-response sleep.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 21d12660-8318-49e4-9efa-0384c5974675
📒 Files selected for processing (5)
wavefront/server/background_jobs/rag_ingestion/rag_ingestion/env.pywavefront/server/background_jobs/rag_ingestion/rag_ingestion/main.pywavefront/server/background_jobs/rag_ingestion/rag_ingestion/processors/kb_storage_processor.pywavefront/server/packages/flo_cloud/flo_cloud/_types/message_queue.pywavefront/server/packages/flo_utils/flo_utils/streaming/stream_listner.py
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
wavefront/server/background_jobs/rag_ingestion/rag_ingestion/embeddings/image_embed.py (2)
31-45: Prefer key-based extraction over fixed positions.The current parser assumes CLIP is always first and DINO second. Building a
{model_name: vector}map from the whole list will make this resilient to response reordering or extra model entries.♻️ Proposed refactor
- clip_entry, dino_entry = embeddings[0], embeddings[1] - if not isinstance(clip_entry, dict) or 'clip' not in clip_entry: - raise ValueError(f"Missing CLIP embedding in response entry: {clip_entry!r}") - if not isinstance(dino_entry, dict) or 'dino' not in dino_entry: - raise ValueError(f"Missing DINO embedding in response entry: {dino_entry!r}") + embedding_map = {} + for entry in embeddings: + if isinstance(entry, dict): + for key in ('clip', 'dino'): + if key in entry: + embedding_map[key] = entry[key] + + if 'clip' not in embedding_map or 'dino' not in embedding_map: + raise ValueError( + f"Missing CLIP/DINO embeddings in response payload: {embeddings!r}" + ) return KnowledgeBaseEmbeddingObject( - embedding_vector=clip_entry['clip'], - embedding_vector_1=dino_entry['dino'], + embedding_vector=embedding_map['clip'], + embedding_vector_1=embedding_map['dino'], chunk_text='image data', chunk_index='chunk_0', )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@wavefront/server/background_jobs/rag_ingestion/rag_ingestion/embeddings/image_embed.py` around lines 31 - 45, The parser currently assumes CLIP and DINO vectors are at fixed positions (embeddings[0], embeddings[1]) which breaks if response order changes; instead iterate the list `embeddings` to build a map of model_name→vector (e.g., parsing each entry dict for keys like 'clip' or 'dino'), validate that both 'clip' and 'dino' keys exist in that map, and then pass map['clip'] and map['dino'] into the `KnowledgeBaseEmbeddingObject` constructor (replace uses of `clip_entry`, `dino_entry` and positional indexing with lookups into the built map and raise a clear ValueError if either embedding is missing or malformed).
13-20: Consider lazy-initializingImageEmbedding.
wavefront/server/background_jobs/rag_ingestion/rag_ingestion/processors/kb_storage_processor.py:28-38constructs this class eagerly, whilewavefront/server/background_jobs/rag_ingestion/rag_ingestion/processors/kb_storage_processor.py:58-70only uses it forDocumentType.IMAGE. Deferring this setup until the image path is hit would keep PDF/TEXT ingestion independent of image-service configuration and make non-image tests/workers lighter.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@wavefront/server/background_jobs/rag_ingestion/rag_ingestion/embeddings/image_embed.py` around lines 13 - 20, The ImageEmbedding constructor currently validates INFERENCE_SERVICE_URL and builds _embed_url eagerly; change the code so ImageEmbedding is created lazily only when DocumentType.IMAGE is processed. In practice, remove the eager instantiation in kb_storage_processor (where ImageEmbedding() is constructed during setup) and instead instantiate ImageEmbedding at the point you handle DocumentType.IMAGE (or add a lazy getter/ factory like get_image_embedder() used on first image document). Ensure the __init__ of ImageEmbedding no longer runs at import time for non-image flows (or keep validation but only call it when instantiating on-demand) so PDF/TEXT ingestion and tests that never hit DocumentType.IMAGE don't require the image service config.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@wavefront/server/background_jobs/rag_ingestion/rag_ingestion/embeddings/image_embed.py`:
- Around line 24-29: The HTTP call in image_embed.py (the httpx.post to
self._embed_url followed by response.raise_for_status) must not raise and fail
the whole batch on transient errors; implement a bounded retry with exponential
backoff for transient errors (timeouts, 429, 5xx) around the httpx.post, and if
retries still fail convert the result into a per-item failure instead of raising
— e.g., return/append an error marker or partial result for that image so the
caller (kb_storage_processor using future.result()) can continue processing
other items. Ensure you only retry idempotent/transient conditions, cap retries
and total wait, and use the function wrapping the post (the embedding method
that contains self._embed_url and response.raise_for_status) to surface per-item
errors rather than throwing.
---
Nitpick comments:
In
`@wavefront/server/background_jobs/rag_ingestion/rag_ingestion/embeddings/image_embed.py`:
- Around line 31-45: The parser currently assumes CLIP and DINO vectors are at
fixed positions (embeddings[0], embeddings[1]) which breaks if response order
changes; instead iterate the list `embeddings` to build a map of
model_name→vector (e.g., parsing each entry dict for keys like 'clip' or
'dino'), validate that both 'clip' and 'dino' keys exist in that map, and then
pass map['clip'] and map['dino'] into the `KnowledgeBaseEmbeddingObject`
constructor (replace uses of `clip_entry`, `dino_entry` and positional indexing
with lookups into the built map and raise a clear ValueError if either embedding
is missing or malformed).
- Around line 13-20: The ImageEmbedding constructor currently validates
INFERENCE_SERVICE_URL and builds _embed_url eagerly; change the code so
ImageEmbedding is created lazily only when DocumentType.IMAGE is processed. In
practice, remove the eager instantiation in kb_storage_processor (where
ImageEmbedding() is constructed during setup) and instead instantiate
ImageEmbedding at the point you handle DocumentType.IMAGE (or add a lazy getter/
factory like get_image_embedder() used on first image document). Ensure the
__init__ of ImageEmbedding no longer runs at import time for non-image flows (or
keep validation but only call it when instantiating on-demand) so PDF/TEXT
ingestion and tests that never hit DocumentType.IMAGE don't require the image
service config.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: e84171f1-f8fa-405d-8dae-de64e17b4c06
📒 Files selected for processing (2)
wavefront/server/background_jobs/rag_ingestion/rag_ingestion/embeddings/image_embed.pywavefront/server/background_jobs/rag_ingestion/rag_ingestion/env.py
✅ Files skipped from review due to trivial changes (1)
- wavefront/server/background_jobs/rag_ingestion/rag_ingestion/env.py
| response = httpx.post( | ||
| self._embed_url, | ||
| json=payload, | ||
| timeout=httpx.Timeout(120.0, connect=30.0), | ||
| ) | ||
| response.raise_for_status() |
There was a problem hiding this comment.
Handle transient inference failures per item.
This remote call now sits inside the batch worker path, and wavefront/server/background_jobs/rag_ingestion/rag_ingestion/processors/kb_storage_processor.py:87-102 re-raises worker exceptions via future.result(). With the larger streaming batches, a single timeout/429/5xx here will fail the whole batch. Please add bounded retry/backoff for transient failures, or convert this into a per-item failure the caller can continue past.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@wavefront/server/background_jobs/rag_ingestion/rag_ingestion/embeddings/image_embed.py`
around lines 24 - 29, The HTTP call in image_embed.py (the httpx.post to
self._embed_url followed by response.raise_for_status) must not raise and fail
the whole batch on transient errors; implement a bounded retry with exponential
backoff for transient errors (timeouts, 429, 5xx) around the httpx.post, and if
retries still fail convert the result into a per-item failure instead of raising
— e.g., return/append an error marker or partial result for that image so the
caller (kb_storage_processor using future.result()) can continue processing
other items. Ensure you only retry idempotent/transient conditions, cap retries
and total wait, and use the function wrapping the post (the embedding method
that contains self._embed_url and response.raise_for_status) to surface per-item
errors rather than throwing.
Summary by CodeRabbit
New Features
Refactor