-
Notifications
You must be signed in to change notification settings - Fork 30
Adjusting the streaming batchsize to 100 #271
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
95 changes: 36 additions & 59 deletions
95
wavefront/server/background_jobs/rag_ingestion/rag_ingestion/embeddings/image_embed.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,71 +1,48 @@ | ||
| import torch | ||
| from transformers import CLIPProcessor, CLIPModel, AutoImageProcessor, AutoModel | ||
| from PIL import Image | ||
| import io | ||
| import base64 | ||
|
|
||
| import httpx | ||
| from flo_utils.utils.log import logger | ||
|
|
||
| from rag_ingestion.env import INFERENCE_SERVICE_URL | ||
| from rag_ingestion.models.knowledge_base_embeddings import KnowledgeBaseEmbeddingObject | ||
|
|
||
|
|
||
| class ImageEmbedding: | ||
| def __init__(self): | ||
| self.device = 'cuda' if torch.cuda.is_available() else 'cpu' | ||
| print(f'Initializing models on device: {self.device}') | ||
|
|
||
| # CLIP Model (Fixed: Added .to(self.device)) | ||
| self.clip_model_name = 'openai/clip-vit-base-patch32' | ||
| self.model = ( | ||
| CLIPModel.from_pretrained(self.clip_model_name).to(self.device).eval() | ||
| ) | ||
| self.processor = CLIPProcessor.from_pretrained(self.clip_model_name) | ||
| """Image embeddings via the inference service (CLIP + DINO).""" | ||
|
|
||
| # DINO Model (No change needed for device_map="auto") | ||
| self.dino_model_name = 'facebook/dinov3-vitl16-pretrain-lvd1689m' | ||
| self.dino_processor = AutoImageProcessor.from_pretrained(self.dino_model_name) | ||
| self.dino_model = AutoModel.from_pretrained( | ||
| self.dino_model_name, device_map='auto', trust_remote_code=True | ||
| ).eval() | ||
| def __init__(self): | ||
| if not INFERENCE_SERVICE_URL: | ||
| raise ValueError( | ||
| 'INFERENCE_SERVICE_URL must be set for image embedding API calls' | ||
| ) | ||
| base = INFERENCE_SERVICE_URL.rstrip('/') | ||
| self._embed_url = f'{base}/inference/v1/query/embeddings' | ||
| logger.info(f'Image embedding endpoint: {self._embed_url}') | ||
|
|
||
| def embed_image(self, file_content: bytes) -> KnowledgeBaseEmbeddingObject: | ||
| image = Image.open(io.BytesIO(file_content)) | ||
| if image.mode != 'RGB': | ||
| image = image.convert('RGB') | ||
|
|
||
| # CLIP Inputs (Fixed: Added .to(self.device)) | ||
| inputs = self.processor(images=image, return_tensors='pt').to(self.device) | ||
|
|
||
| # --- CLIP EMBEDDING --- | ||
| with torch.no_grad(): | ||
| image_features = self.model.get_image_features(**inputs) | ||
| image_features = image_features / image_features.norm(dim=-1, keepdim=True) | ||
| embedding = image_features.squeeze().cpu().numpy().tolist() | ||
|
|
||
| # --- DINO EMBEDDING CALL --- | ||
| dino_embedding = self.embed_image_dino(file_content) | ||
| payload = {'image_data': base64.b64encode(file_content).decode('ascii')} | ||
| response = httpx.post( | ||
| self._embed_url, | ||
| json=payload, | ||
| timeout=httpx.Timeout(120.0, connect=30.0), | ||
| ) | ||
| response.raise_for_status() | ||
| body = response.json() | ||
| embeddings = body.get('data', {}).get('response') | ||
| if not isinstance(embeddings, list) or len(embeddings) < 2: | ||
| raise ValueError( | ||
| f"Unexpected embedding response shape — expected list of at least 2 entries: {body!r}" | ||
| ) | ||
|
|
||
| clip_entry, dino_entry = embeddings[0], embeddings[1] | ||
| if not isinstance(clip_entry, dict) or 'clip' not in clip_entry: | ||
| raise ValueError(f"Missing CLIP embedding in response entry: {clip_entry!r}") | ||
| if not isinstance(dino_entry, dict) or 'dino' not in dino_entry: | ||
| raise ValueError(f"Missing DINO embedding in response entry: {dino_entry!r}") | ||
|
|
||
| # Pass the DINO embedding to the correct field | ||
| return KnowledgeBaseEmbeddingObject( | ||
| embedding_vector=embedding, | ||
| embedding_vector_1=dino_embedding, | ||
| embedding_vector=clip_entry['clip'], | ||
| embedding_vector_1=dino_entry['dino'], | ||
| chunk_text='image data', | ||
| chunk_index='chunk_0', | ||
| ) | ||
|
|
||
| @torch.inference_mode() | ||
| def embed_image_dino(self, file_content: bytes) -> list: | ||
| image = Image.open(io.BytesIO(file_content)) | ||
| if image.mode != 'RGB': | ||
| image = image.convert('RGB') | ||
|
|
||
| inputs = self.dino_processor(images=image, return_tensors='pt') | ||
|
|
||
| target_device = self.dino_model.device | ||
| # Move inputs to the DINO model's device | ||
| inputs = {k: v.to(target_device) for k, v in inputs.items()} | ||
|
|
||
| outputs = self.dino_model(**inputs) | ||
|
|
||
| image_features = outputs.last_hidden_state[:, 0] | ||
|
|
||
| image_features = image_features / image_features.norm(dim=-1, keepdim=True) | ||
| embedding = image_features.squeeze().cpu().numpy().tolist() | ||
|
|
||
| return embedding | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle transient inference failures per item.
This remote call now sits inside the batch worker path, and
wavefront/server/background_jobs/rag_ingestion/rag_ingestion/processors/kb_storage_processor.py:87-102re-raises worker exceptions viafuture.result(). With the larger streaming batches, a single timeout/429/5xx here will fail the whole batch. Please add bounded retry/backoff for transient failures, or convert this into a per-item failure the caller can continue past.🤖 Prompt for AI Agents