Skip to content

Adjusting the streaming batchsize to 100#271

Merged
vizsatiz merged 2 commits intodevelopfrom
fix/rag_streaming
Apr 7, 2026
Merged

Adjusting the streaming batchsize to 100#271
vizsatiz merged 2 commits intodevelopfrom
fix/rag_streaming

Conversation

@vizsatiz
Copy link
Copy Markdown
Member

@vizsatiz vizsatiz commented Apr 6, 2026

Summary by CodeRabbit

  • New Features

    • New environment variables: configurable inference service URL and streaming batch size.
    • Added adjustable wait-time parameter for message queue retrievals.
  • Refactor

    • Image embedding now uses the configured remote inference service.
    • Embedding generation made concurrent for improved throughput.

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 6, 2026

📝 Walkthrough

Walkthrough

Adds environment-driven inference URL and streaming batch size, threads wait/size configs through initialization, changes message-queue typing, replaces local image model inference with remote HTTP calls, and parallelizes embedding generation using a ThreadPoolExecutor.

Changes

Cohort / File(s) Summary
Configuration
wavefront/server/background_jobs/rag_ingestion/rag_ingestion/env.py
Added INFERENCE_SERVICE_URL and STREAMING_BATCH_SIZE (int, default 100) environment-derived constants.
Initialization / CLI
wavefront/server/background_jobs/rag_ingestion/rag_ingestion/main.py
Passes streaming_batch_size=STREAMING_BATCH_SIZE into RagStreamListener initialization.
Stream listener
wavefront/server/packages/flo_utils/flo_utils/streaming/stream_listner.py
Added wait_time_sec: int = 20 parameter and self.wait_time_sec; passes wait_time_sec into message queue receive calls.
Message queue types
wavefront/server/packages/flo_cloud/flo_cloud/_types/message_queue.py
Added explicit int type annotations for receive_messages(max_messages, wait_time_sec).
Concurrent embedding processing
wavefront/server/background_jobs/rag_ingestion/rag_ingestion/processors/kb_storage_processor.py
Replaced sequential embedding loop with ThreadPoolExecutor(max_workers=10) and as_completed(...); added private __embed_single_insight helper and adjusted typing/returns.
Remote image embeddings
wavefront/server/background_jobs/rag_ingestion/rag_ingestion/embeddings/image_embed.py
Removed local CLIP/DINO model loading and PyTorch logic; now requires INFERENCE_SERVICE_URL, POSTs base64 image to remote embeddings endpoint, parses JSON (data.response with clip/dino) and constructs KnowledgeBaseEmbeddingObject.

Sequence Diagram(s)

sequenceDiagram
    participant Stream as StreamListener
    participant MQ as MessageQueue/EventManager
    participant Rag as RagStreamListener
    participant KB as KBStorageProcessor
    participant Inf as InferenceService (remote HTTP)
    participant DB as KnowledgeBase / Storage

    Stream->>MQ: receive_messages(max_messages, wait_time_sec)
    MQ-->>Stream: messages
    Stream->>Rag: deliver messages
    Rag->>KB: process message -> produce insights
    KB->>Inf: POST base64 image -> /embeddings (concurrent via ThreadPool)
    Inf-->>KB: JSON response (clip, dino)
    KB->>DB: store embeddings/metadata
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

I nibble bytes and pack them light,
Batch by batch through threaded night,
I post my pics to clouds afar,
Return their vectors — hop, hoorah! 🥕✨

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Title check ⚠️ Warning The PR title mentions 'streaming batchsize to 100', but the changeset includes major architectural changes across 6 files: adding inference service URL configuration, implementing concurrent embedding generation with ThreadPoolExecutor, refactoring image embedding to use remote inference, and adding wait_time_sec parameters throughout the codebase. Update the title to reflect the actual scope of changes, such as 'Implement remote inference service integration and concurrent embedding processing' or provide a more accurate description of the primary architectural changes.
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/rag_streaming

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
wavefront/server/background_jobs/rag_ingestion/rag_ingestion/processors/kb_storage_processor.py (1)

87-107: ⚠️ Potential issue | 🟠 Major

Exception from any single future causes entire batch to fail, losing all completed embeddings.

If one future.result() raises an exception, the loop exits and embeddings already collected in embeddings list are discarded. Consider handling individual future failures gracefully.

Also, line 107 uses logger.info for error logging—should be logger.error or logger.exception.

🛠️ Proposed fix with per-future error handling
             with ThreadPoolExecutor(max_workers=10) as executor:
                 futures = {
                     executor.submit(self.__embed_single_insight, kb_insight): kb_insight
                     for kb_insight in insights
                 }
                 embeddings: List[EmbeddingsToStore] = []
                 for future in as_completed(futures):
-                    docs, doc_id, kb_id, document_type = future.result()
-                    embeddings.append(
-                        EmbeddingsToStore(
-                            kb_embeddings=docs,
-                            doc_id=doc_id,
-                            kb_id=kb_id,
-                            file_type=document_type,
+                    try:
+                        docs, doc_id, kb_id, document_type = future.result()
+                        embeddings.append(
+                            EmbeddingsToStore(
+                                kb_embeddings=docs,
+                                doc_id=doc_id,
+                                kb_id=kb_id,
+                                file_type=document_type,
+                            )
                         )
-                    )
+                    except Exception as e:
+                        kb_insight = futures[future]
+                        logger.error(f'Failed to embed document {kb_insight.insights.doc_id}: {e}')

             self.kb_rag_storage.upload_embedding_with_retry(embeddings=embeddings)
             logger.info('Embeddings are stored in the db')
         except Exception as err:
-            logger.info(f'The error message captured is {err}')
+            logger.error(f'The error message captured is {err}', exc_info=True)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/background_jobs/rag_ingestion/rag_ingestion/processors/kb_storage_processor.py`
around lines 87 - 107, The current ThreadPoolExecutor loop uses future.result()
directly which lets any exception from __embed_single_insight abort the loop and
discard already-collected EmbeddingsToStore entries; change the handling to
catch exceptions per-future (wrap future.result() in try/except), log failures
with logger.error or logger.exception (not logger.info), continue processing
other futures so successful embeddings accumulate, and only call
self.kb_rag_storage.upload_embedding_with_retry(embeddings=embeddings) after
iterating all futures; keep references to __embed_single_insight,
EmbeddingsToStore, and kb_rag_storage.upload_embedding_with_retry when applying
the fix.
🧹 Nitpick comments (1)
wavefront/server/packages/flo_utils/flo_utils/streaming/stream_listner.py (1)

75-78: Be aware of semantic differences across cloud providers.

Per the Azure Storage Queue implementation (see flo_cloud/azure/storage_queue.py), wait_time_sec is repurposed as visibility_timeout rather than a long-polling timeout. Azure Storage Queue doesn't support long-polling and returns immediately.

This means Azure deployments will poll more frequently than AWS/GCP. The 5-second sleep on empty responses (line 82) mitigates this, but behavior may still differ. Consider documenting this or adding provider-specific tuning.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@wavefront/server/packages/flo_utils/flo_utils/streaming/stream_listner.py`
around lines 75 - 78, The call to self.event_manager.receive_messages(...) uses
self.wait_time_sec which Azure Storage Queue treats as visibility_timeout (no
long-polling), so Azure will poll differently; update the code to be
provider-aware by checking the event manager type or a provider flag (e.g.,
inspect self.event_manager.provider or isinstance(self.event_manager,
AzureStorageQueueClient)) and set a provider-specific poll parameter (use a
separate streaming_poll_interval or set wait_time_sec differently for Azure)
and/or implement a stronger backoff when responses are empty (replace fixed
5-second sleep with configurable/exponential backoff tied to the provider); also
add a short doc comment describing the provider semantic difference referencing
self.event_manager.receive_messages, self.wait_time_sec,
self.streaming_batch_size and the empty-response sleep.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In
`@wavefront/server/background_jobs/rag_ingestion/rag_ingestion/processors/kb_storage_processor.py`:
- Around line 87-107: The current ThreadPoolExecutor loop uses future.result()
directly which lets any exception from __embed_single_insight abort the loop and
discard already-collected EmbeddingsToStore entries; change the handling to
catch exceptions per-future (wrap future.result() in try/except), log failures
with logger.error or logger.exception (not logger.info), continue processing
other futures so successful embeddings accumulate, and only call
self.kb_rag_storage.upload_embedding_with_retry(embeddings=embeddings) after
iterating all futures; keep references to __embed_single_insight,
EmbeddingsToStore, and kb_rag_storage.upload_embedding_with_retry when applying
the fix.

---

Nitpick comments:
In `@wavefront/server/packages/flo_utils/flo_utils/streaming/stream_listner.py`:
- Around line 75-78: The call to self.event_manager.receive_messages(...) uses
self.wait_time_sec which Azure Storage Queue treats as visibility_timeout (no
long-polling), so Azure will poll differently; update the code to be
provider-aware by checking the event manager type or a provider flag (e.g.,
inspect self.event_manager.provider or isinstance(self.event_manager,
AzureStorageQueueClient)) and set a provider-specific poll parameter (use a
separate streaming_poll_interval or set wait_time_sec differently for Azure)
and/or implement a stronger backoff when responses are empty (replace fixed
5-second sleep with configurable/exponential backoff tied to the provider); also
add a short doc comment describing the provider semantic difference referencing
self.event_manager.receive_messages, self.wait_time_sec,
self.streaming_batch_size and the empty-response sleep.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 21d12660-8318-49e4-9efa-0384c5974675

📥 Commits

Reviewing files that changed from the base of the PR and between 7a6e647 and ba82226.

📒 Files selected for processing (5)
  • wavefront/server/background_jobs/rag_ingestion/rag_ingestion/env.py
  • wavefront/server/background_jobs/rag_ingestion/rag_ingestion/main.py
  • wavefront/server/background_jobs/rag_ingestion/rag_ingestion/processors/kb_storage_processor.py
  • wavefront/server/packages/flo_cloud/flo_cloud/_types/message_queue.py
  • wavefront/server/packages/flo_utils/flo_utils/streaming/stream_listner.py

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
wavefront/server/background_jobs/rag_ingestion/rag_ingestion/embeddings/image_embed.py (2)

31-45: Prefer key-based extraction over fixed positions.

The current parser assumes CLIP is always first and DINO second. Building a {model_name: vector} map from the whole list will make this resilient to response reordering or extra model entries.

♻️ Proposed refactor
-        clip_entry, dino_entry = embeddings[0], embeddings[1]
-        if not isinstance(clip_entry, dict) or 'clip' not in clip_entry:
-            raise ValueError(f"Missing CLIP embedding in response entry: {clip_entry!r}")
-        if not isinstance(dino_entry, dict) or 'dino' not in dino_entry:
-            raise ValueError(f"Missing DINO embedding in response entry: {dino_entry!r}")
+        embedding_map = {}
+        for entry in embeddings:
+            if isinstance(entry, dict):
+                for key in ('clip', 'dino'):
+                    if key in entry:
+                        embedding_map[key] = entry[key]
+
+        if 'clip' not in embedding_map or 'dino' not in embedding_map:
+            raise ValueError(
+                f"Missing CLIP/DINO embeddings in response payload: {embeddings!r}"
+            )
 
         return KnowledgeBaseEmbeddingObject(
-            embedding_vector=clip_entry['clip'],
-            embedding_vector_1=dino_entry['dino'],
+            embedding_vector=embedding_map['clip'],
+            embedding_vector_1=embedding_map['dino'],
             chunk_text='image data',
             chunk_index='chunk_0',
         )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/background_jobs/rag_ingestion/rag_ingestion/embeddings/image_embed.py`
around lines 31 - 45, The parser currently assumes CLIP and DINO vectors are at
fixed positions (embeddings[0], embeddings[1]) which breaks if response order
changes; instead iterate the list `embeddings` to build a map of
model_name→vector (e.g., parsing each entry dict for keys like 'clip' or
'dino'), validate that both 'clip' and 'dino' keys exist in that map, and then
pass map['clip'] and map['dino'] into the `KnowledgeBaseEmbeddingObject`
constructor (replace uses of `clip_entry`, `dino_entry` and positional indexing
with lookups into the built map and raise a clear ValueError if either embedding
is missing or malformed).

13-20: Consider lazy-initializing ImageEmbedding.

wavefront/server/background_jobs/rag_ingestion/rag_ingestion/processors/kb_storage_processor.py:28-38 constructs this class eagerly, while wavefront/server/background_jobs/rag_ingestion/rag_ingestion/processors/kb_storage_processor.py:58-70 only uses it for DocumentType.IMAGE. Deferring this setup until the image path is hit would keep PDF/TEXT ingestion independent of image-service configuration and make non-image tests/workers lighter.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/background_jobs/rag_ingestion/rag_ingestion/embeddings/image_embed.py`
around lines 13 - 20, The ImageEmbedding constructor currently validates
INFERENCE_SERVICE_URL and builds _embed_url eagerly; change the code so
ImageEmbedding is created lazily only when DocumentType.IMAGE is processed. In
practice, remove the eager instantiation in kb_storage_processor (where
ImageEmbedding() is constructed during setup) and instead instantiate
ImageEmbedding at the point you handle DocumentType.IMAGE (or add a lazy getter/
factory like get_image_embedder() used on first image document). Ensure the
__init__ of ImageEmbedding no longer runs at import time for non-image flows (or
keep validation but only call it when instantiating on-demand) so PDF/TEXT
ingestion and tests that never hit DocumentType.IMAGE don't require the image
service config.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@wavefront/server/background_jobs/rag_ingestion/rag_ingestion/embeddings/image_embed.py`:
- Around line 24-29: The HTTP call in image_embed.py (the httpx.post to
self._embed_url followed by response.raise_for_status) must not raise and fail
the whole batch on transient errors; implement a bounded retry with exponential
backoff for transient errors (timeouts, 429, 5xx) around the httpx.post, and if
retries still fail convert the result into a per-item failure instead of raising
— e.g., return/append an error marker or partial result for that image so the
caller (kb_storage_processor using future.result()) can continue processing
other items. Ensure you only retry idempotent/transient conditions, cap retries
and total wait, and use the function wrapping the post (the embedding method
that contains self._embed_url and response.raise_for_status) to surface per-item
errors rather than throwing.

---

Nitpick comments:
In
`@wavefront/server/background_jobs/rag_ingestion/rag_ingestion/embeddings/image_embed.py`:
- Around line 31-45: The parser currently assumes CLIP and DINO vectors are at
fixed positions (embeddings[0], embeddings[1]) which breaks if response order
changes; instead iterate the list `embeddings` to build a map of
model_name→vector (e.g., parsing each entry dict for keys like 'clip' or
'dino'), validate that both 'clip' and 'dino' keys exist in that map, and then
pass map['clip'] and map['dino'] into the `KnowledgeBaseEmbeddingObject`
constructor (replace uses of `clip_entry`, `dino_entry` and positional indexing
with lookups into the built map and raise a clear ValueError if either embedding
is missing or malformed).
- Around line 13-20: The ImageEmbedding constructor currently validates
INFERENCE_SERVICE_URL and builds _embed_url eagerly; change the code so
ImageEmbedding is created lazily only when DocumentType.IMAGE is processed. In
practice, remove the eager instantiation in kb_storage_processor (where
ImageEmbedding() is constructed during setup) and instead instantiate
ImageEmbedding at the point you handle DocumentType.IMAGE (or add a lazy getter/
factory like get_image_embedder() used on first image document). Ensure the
__init__ of ImageEmbedding no longer runs at import time for non-image flows (or
keep validation but only call it when instantiating on-demand) so PDF/TEXT
ingestion and tests that never hit DocumentType.IMAGE don't require the image
service config.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: e84171f1-f8fa-405d-8dae-de64e17b4c06

📥 Commits

Reviewing files that changed from the base of the PR and between ba82226 and afec2ab.

📒 Files selected for processing (2)
  • wavefront/server/background_jobs/rag_ingestion/rag_ingestion/embeddings/image_embed.py
  • wavefront/server/background_jobs/rag_ingestion/rag_ingestion/env.py
✅ Files skipped from review due to trivial changes (1)
  • wavefront/server/background_jobs/rag_ingestion/rag_ingestion/env.py

Comment on lines +24 to +29
response = httpx.post(
self._embed_url,
json=payload,
timeout=httpx.Timeout(120.0, connect=30.0),
)
response.raise_for_status()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handle transient inference failures per item.

This remote call now sits inside the batch worker path, and wavefront/server/background_jobs/rag_ingestion/rag_ingestion/processors/kb_storage_processor.py:87-102 re-raises worker exceptions via future.result(). With the larger streaming batches, a single timeout/429/5xx here will fail the whole batch. Please add bounded retry/backoff for transient failures, or convert this into a per-item failure the caller can continue past.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/background_jobs/rag_ingestion/rag_ingestion/embeddings/image_embed.py`
around lines 24 - 29, The HTTP call in image_embed.py (the httpx.post to
self._embed_url followed by response.raise_for_status) must not raise and fail
the whole batch on transient errors; implement a bounded retry with exponential
backoff for transient errors (timeouts, 429, 5xx) around the httpx.post, and if
retries still fail convert the result into a per-item failure instead of raising
— e.g., return/append an error marker or partial result for that image so the
caller (kb_storage_processor using future.result()) can continue processing
other items. Ensure you only retry idempotent/transient conditions, cap retries
and total wait, and use the function wrapping the post (the embedding method
that contains self._embed_url and response.raise_for_status) to surface per-item
errors rather than throwing.

@vizsatiz vizsatiz merged commit 6f18242 into develop Apr 7, 2026
10 checks passed
@vizsatiz vizsatiz deleted the fix/rag_streaming branch April 7, 2026 13:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant