diff --git a/pipelines/google/google_gemini.py b/pipelines/google/google_gemini.py index 8b0e7fb..e8213a6 100644 --- a/pipelines/google/google_gemini.py +++ b/pipelines/google/google_gemini.py @@ -4,7 +4,7 @@ author_url: https://github.com/owndev/ project_url: https://github.com/owndev/Open-WebUI-Functions funding_url: https://github.com/sponsors/owndev -version: 1.11.1 +version: 1.11.2 required_open_webui_version: 0.6.26 license: Apache License 2.0 description: Highly optimized Google Gemini pipeline with advanced image generation capabilities, intelligent compression, and streamlined processing workflows. @@ -328,6 +328,9 @@ async def _gather_history_images( ) -> List[Dict[str, Any]]: history_images: List[Dict[str, Any]] = [] for msg in messages: + # Defensive check: msg can be None in some cases + if not msg: + continue if msg is last_user_msg: continue if msg.get("role") not in {"user", "assistant"}: @@ -434,7 +437,7 @@ async def _emit_image_stats( ordered_stats: stats list in the exact order images will be sent (same length as combined image list) reused_flags: parallel list indicating whether image originated from history """ - if not ordered_stats: + if not ordered_stats or not __event_emitter__: return for idx, stat in enumerate(ordered_stats, start=1): reused = reused_flags[idx - 1] if idx - 1 < len(reused_flags) else False @@ -555,17 +558,18 @@ async def _build_image_generation_contents( } for i in range(len(combined)) ] - await __event_emitter__( - { - "type": "status", - "data": { - "action": "image_reference_map", - "description": f"{len(combined)} image(s) included (limit {self.valves.IMAGE_HISTORY_MAX_REFERENCES}).", - "images": mapping, - "done": True, - }, - } - ) + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "action": "image_reference_map", + "description": f"{len(combined)} image(s) included (limit {self.valves.IMAGE_HISTORY_MAX_REFERENCES}).", + "images": mapping, + "done": True, + }, + } + ) # Build parts parts: List[Dict[str, Any]] = [] @@ -1111,6 +1115,9 @@ def _prepare_content( # Prepare contents for the API contents = [] for message in messages: + # Defensive check: message can be None in some cases + if not message: + continue role = message.get("role") if role == "system": continue # Skip system messages, handled separately @@ -1149,6 +1156,9 @@ def _process_multimodal_content( parts = [] for item in content_list: + # Defensive check: item can be None in some cases + if not item: + continue if item.get("type") == "text": parts.append({"text": item.get("text", "")}) elif item.get("type") == "image_url": @@ -1248,6 +1258,9 @@ def _add_image(data_url: str): # Structured multimodal array if isinstance(content, list): for item in content: + # Defensive check: item can be None in some cases + if not item: + continue if item.get("type") == "text": txt = item.get("text", "") text_segments.append(txt) @@ -1535,7 +1548,8 @@ async def _fetch_file_as_base64(self, file_url: str) -> Optional[str]: async with aiofiles.open(file_path, "rb") as fp: raw = await fp.read() enc = base64.b64encode(raw).decode() - mime = file_obj.meta.get("content_type", "image/png") + # Defensive check: file_obj.meta can be None + mime = (file_obj.meta or {}).get("content_type", "image/png") return f"data:{mime};base64,{enc}" except Exception as e: self.log.warning(f"Could not fetch file {file_url}: {e}") @@ -1556,18 +1570,21 @@ async def _upload_image_with_status( URL to uploaded image or data URL fallback """ try: - await __event_emitter__( - { - "type": "status", - "data": { - "action": "image_upload", - "description": "Uploading generated image to your library...", - "done": False, - }, - } - ) + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "action": "image_upload", + "description": "Uploading generated image to your library...", + "done": False, + }, + } + ) - self.user = user = Users.get_user_by_id(__user__["id"]) + self.user = user = ( + Users.get_user_by_id(__user__["id"]) if __user__ else None + ) # Convert image data to base64 string if needed if isinstance(image_data, bytes): @@ -1582,16 +1599,17 @@ async def _upload_image_with_status( mime_type=mime_type, ) - await __event_emitter__( - { - "type": "status", - "data": { - "action": "image_upload", - "description": "Image uploaded successfully!", - "done": True, - }, - } - ) + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "action": "image_upload", + "description": "Image uploaded successfully!", + "done": True, + }, + } + ) return image_url @@ -1603,16 +1621,17 @@ async def _upload_image_with_status( else: image_data_b64 = str(image_data) - await __event_emitter__( - { - "type": "status", - "data": { - "action": "image_upload", - "description": "Using inline image (upload failed)", - "done": True, - }, - } - ) + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "action": "image_upload", + "description": "Using inline image (upload failed)", + "done": True, + }, + } + ) return f"data:{mime_type};base64,{image_data_b64}" @@ -1925,8 +1944,10 @@ def _configure_generation( gen_config_params |= {"safety_settings": safety_settings} # Add various tools to Gemini as required - features = __metadata__.get("features", {}) - params = __metadata__.get("params", {}) + # Defensive check: __metadata__ can be None when no filters are installed + metadata = __metadata__ or {} + features = metadata.get("features", {}) + params = metadata.get("params", {}) tools = [] if features.get("google_search_tool", False): @@ -2037,14 +2058,14 @@ async def _process_grounding_metadata( grounding_supports.extend(metadata.grounding_supports) # Add sources to the response - if grounding_chunks: + if grounding_chunks and __event_emitter__: sources = self._format_grounding_chunks_as_sources(grounding_chunks) await __event_emitter__( {"type": "chat:completion", "data": {"sources": sources}} ) # Add status specifying google queries used for grounding - if web_search_queries: + if web_search_queries and __event_emitter__: await __event_emitter__( { "type": "status", @@ -2171,15 +2192,16 @@ async def emit_chat_event(event_type: str, data: Dict[str, Any]) -> None: self.log.warning(f"Failed to access content parts: {parts_error}") if hasattr(chunk, "text") and chunk.text: answer_chunks.append(chunk.text) - await __event_emitter__( - { - "type": "chat:message:delta", - "data": { - "role": "assistant", - "content": chunk.text, - }, - } - ) + if __event_emitter__: + await __event_emitter__( + { + "type": "chat:message:delta", + "data": { + "role": "assistant", + "content": chunk.text, + }, + } + ) continue for part in parts: @@ -2196,30 +2218,32 @@ async def emit_chat_event(event_type: str, data: Dict[str, Any]) -> None: MAX_PREVIEW = 120 if len(preview) > MAX_PREVIEW: preview = preview[:MAX_PREVIEW].rstrip() + "…" - await __event_emitter__( - { - "type": "status", - "data": { - "action": "thinking", - "description": f"Thinking… {preview}", - "done": False, - "hidden": False, - }, - } - ) + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "action": "thinking", + "description": f"Thinking… {preview}", + "done": False, + "hidden": False, + }, + } + ) # Regular answer text elif getattr(part, "text", None): answer_chunks.append(part.text) - await __event_emitter__( - { - "type": "chat:message:delta", - "data": { - "role": "assistant", - "content": part.text, - }, - } - ) + if __event_emitter__: + await __event_emitter__( + { + "type": "chat:message:delta", + "data": { + "role": "assistant", + "content": part.text, + }, + } + ) except Exception as part_error: # Log part processing errors but continue with the stream self.log.warning(f"Error processing content part: {part_error}") @@ -2271,12 +2295,17 @@ async def emit_chat_event(event_type: str, data: Dict[str, Any]) -> None: if thought_chunks: # Clear the thinking status without a summary in the status emitter - await __event_emitter__( - { - "type": "status", - "data": {"action": "thinking", "done": True, "hidden": True}, - } - ) + if __event_emitter__: + await __event_emitter__( + { + "type": "status", + "data": { + "action": "thinking", + "done": True, + "hidden": True, + }, + } + ) await emit_chat_event( "chat:finish", @@ -2401,7 +2430,8 @@ async def pipe( request_id = id(body) self.log.debug(f"Processing request {request_id}") self.log.debug(f"User request body: {__user__}") - self.user = Users.get_user_by_id(__user__["id"]) + # Defensive check: __user__ can be None in some contexts + self.user = Users.get_user_by_id(__user__["id"]) if __user__ else None try: # Parse and validate model ID @@ -2509,7 +2539,7 @@ async def get_response(): start_ts = time.time() # Send processing status for image generation - if supports_image_generation: + if supports_image_generation and __event_emitter__: await __event_emitter__( { "type": "status", @@ -2525,7 +2555,7 @@ async def get_response(): self.log.debug(f"Request {request_id}: Got non-streaming response") # Clear processing status for image generation - if supports_image_generation: + if supports_image_generation and __event_emitter__: await __event_emitter__( { "type": "status",