Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 117 additions & 87 deletions pipelines/google/google_gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
author_url: https://github.com/owndev/
project_url: https://github.com/owndev/Open-WebUI-Functions
funding_url: https://github.com/sponsors/owndev
version: 1.11.1
version: 1.11.2
required_open_webui_version: 0.6.26
license: Apache License 2.0
description: Highly optimized Google Gemini pipeline with advanced image generation capabilities, intelligent compression, and streamlined processing workflows.
Expand Down Expand Up @@ -328,6 +328,9 @@ async def _gather_history_images(
) -> List[Dict[str, Any]]:
history_images: List[Dict[str, Any]] = []
for msg in messages:
# Defensive check: msg can be None in some cases
if not msg:
continue
if msg is last_user_msg:
continue
if msg.get("role") not in {"user", "assistant"}:
Expand Down Expand Up @@ -434,7 +437,7 @@ async def _emit_image_stats(
ordered_stats: stats list in the exact order images will be sent (same length as combined image list)
reused_flags: parallel list indicating whether image originated from history
"""
if not ordered_stats:
if not ordered_stats or not __event_emitter__:
return
for idx, stat in enumerate(ordered_stats, start=1):
reused = reused_flags[idx - 1] if idx - 1 < len(reused_flags) else False
Expand Down Expand Up @@ -555,17 +558,18 @@ async def _build_image_generation_contents(
}
for i in range(len(combined))
]
await __event_emitter__(
{
"type": "status",
"data": {
"action": "image_reference_map",
"description": f"{len(combined)} image(s) included (limit {self.valves.IMAGE_HISTORY_MAX_REFERENCES}).",
"images": mapping,
"done": True,
},
}
)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"action": "image_reference_map",
"description": f"{len(combined)} image(s) included (limit {self.valves.IMAGE_HISTORY_MAX_REFERENCES}).",
"images": mapping,
"done": True,
},
}
)

# Build parts
parts: List[Dict[str, Any]] = []
Expand Down Expand Up @@ -1111,6 +1115,9 @@ def _prepare_content(
# Prepare contents for the API
contents = []
for message in messages:
# Defensive check: message can be None in some cases
if not message:
continue
role = message.get("role")
if role == "system":
continue # Skip system messages, handled separately
Expand Down Expand Up @@ -1149,6 +1156,9 @@ def _process_multimodal_content(
parts = []

for item in content_list:
# Defensive check: item can be None in some cases
if not item:
continue
if item.get("type") == "text":
parts.append({"text": item.get("text", "")})
elif item.get("type") == "image_url":
Expand Down Expand Up @@ -1248,6 +1258,9 @@ def _add_image(data_url: str):
# Structured multimodal array
if isinstance(content, list):
for item in content:
# Defensive check: item can be None in some cases
if not item:
continue
if item.get("type") == "text":
txt = item.get("text", "")
text_segments.append(txt)
Expand Down Expand Up @@ -1535,7 +1548,8 @@ async def _fetch_file_as_base64(self, file_url: str) -> Optional[str]:
async with aiofiles.open(file_path, "rb") as fp:
raw = await fp.read()
enc = base64.b64encode(raw).decode()
mime = file_obj.meta.get("content_type", "image/png")
# Defensive check: file_obj.meta can be None
mime = (file_obj.meta or {}).get("content_type", "image/png")
return f"data:{mime};base64,{enc}"
except Exception as e:
self.log.warning(f"Could not fetch file {file_url}: {e}")
Expand All @@ -1556,18 +1570,21 @@ async def _upload_image_with_status(
URL to uploaded image or data URL fallback
"""
try:
await __event_emitter__(
{
"type": "status",
"data": {
"action": "image_upload",
"description": "Uploading generated image to your library...",
"done": False,
},
}
)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"action": "image_upload",
"description": "Uploading generated image to your library...",
"done": False,
},
}
)

self.user = user = Users.get_user_by_id(__user__["id"])
self.user = user = (
Users.get_user_by_id(__user__["id"]) if __user__ else None
)

# Convert image data to base64 string if needed
if isinstance(image_data, bytes):
Expand All @@ -1582,16 +1599,17 @@ async def _upload_image_with_status(
mime_type=mime_type,
)

await __event_emitter__(
{
"type": "status",
"data": {
"action": "image_upload",
"description": "Image uploaded successfully!",
"done": True,
},
}
)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"action": "image_upload",
"description": "Image uploaded successfully!",
"done": True,
},
}
)

return image_url

Expand All @@ -1603,16 +1621,17 @@ async def _upload_image_with_status(
else:
image_data_b64 = str(image_data)

await __event_emitter__(
{
"type": "status",
"data": {
"action": "image_upload",
"description": "Using inline image (upload failed)",
"done": True,
},
}
)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"action": "image_upload",
"description": "Using inline image (upload failed)",
"done": True,
},
}
)

return f"data:{mime_type};base64,{image_data_b64}"

Expand Down Expand Up @@ -1925,8 +1944,10 @@ def _configure_generation(
gen_config_params |= {"safety_settings": safety_settings}

# Add various tools to Gemini as required
features = __metadata__.get("features", {})
params = __metadata__.get("params", {})
# Defensive check: __metadata__ can be None when no filters are installed
metadata = __metadata__ or {}
features = metadata.get("features", {})
params = metadata.get("params", {})
tools = []

if features.get("google_search_tool", False):
Expand Down Expand Up @@ -2037,14 +2058,14 @@ async def _process_grounding_metadata(
grounding_supports.extend(metadata.grounding_supports)

# Add sources to the response
if grounding_chunks:
if grounding_chunks and __event_emitter__:
sources = self._format_grounding_chunks_as_sources(grounding_chunks)
await __event_emitter__(
{"type": "chat:completion", "data": {"sources": sources}}
)

# Add status specifying google queries used for grounding
if web_search_queries:
if web_search_queries and __event_emitter__:
await __event_emitter__(
{
"type": "status",
Expand Down Expand Up @@ -2171,15 +2192,16 @@ async def emit_chat_event(event_type: str, data: Dict[str, Any]) -> None:
self.log.warning(f"Failed to access content parts: {parts_error}")
if hasattr(chunk, "text") and chunk.text:
answer_chunks.append(chunk.text)
await __event_emitter__(
{
"type": "chat:message:delta",
"data": {
"role": "assistant",
"content": chunk.text,
},
}
)
if __event_emitter__:
await __event_emitter__(
{
"type": "chat:message:delta",
"data": {
"role": "assistant",
"content": chunk.text,
},
}
)
continue

for part in parts:
Expand All @@ -2196,30 +2218,32 @@ async def emit_chat_event(event_type: str, data: Dict[str, Any]) -> None:
MAX_PREVIEW = 120
if len(preview) > MAX_PREVIEW:
preview = preview[:MAX_PREVIEW].rstrip() + "…"
await __event_emitter__(
{
"type": "status",
"data": {
"action": "thinking",
"description": f"Thinking… {preview}",
"done": False,
"hidden": False,
},
}
)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"action": "thinking",
"description": f"Thinking… {preview}",
"done": False,
"hidden": False,
},
}
)

# Regular answer text
elif getattr(part, "text", None):
answer_chunks.append(part.text)
await __event_emitter__(
{
"type": "chat:message:delta",
"data": {
"role": "assistant",
"content": part.text,
},
}
)
if __event_emitter__:
await __event_emitter__(
{
"type": "chat:message:delta",
"data": {
"role": "assistant",
"content": part.text,
},
}
)
except Exception as part_error:
# Log part processing errors but continue with the stream
self.log.warning(f"Error processing content part: {part_error}")
Expand Down Expand Up @@ -2271,12 +2295,17 @@ async def emit_chat_event(event_type: str, data: Dict[str, Any]) -> None:

if thought_chunks:
# Clear the thinking status without a summary in the status emitter
await __event_emitter__(
{
"type": "status",
"data": {"action": "thinking", "done": True, "hidden": True},
}
)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"action": "thinking",
"done": True,
"hidden": True,
},
}
)

await emit_chat_event(
"chat:finish",
Expand Down Expand Up @@ -2401,7 +2430,8 @@ async def pipe(
request_id = id(body)
self.log.debug(f"Processing request {request_id}")
self.log.debug(f"User request body: {__user__}")
self.user = Users.get_user_by_id(__user__["id"])
# Defensive check: __user__ can be None in some contexts
self.user = Users.get_user_by_id(__user__["id"]) if __user__ else None

try:
# Parse and validate model ID
Expand Down Expand Up @@ -2509,7 +2539,7 @@ async def get_response():
start_ts = time.time()

# Send processing status for image generation
if supports_image_generation:
if supports_image_generation and __event_emitter__:
await __event_emitter__(
{
"type": "status",
Expand All @@ -2525,7 +2555,7 @@ async def get_response():
self.log.debug(f"Request {request_id}: Got non-streaming response")

# Clear processing status for image generation
if supports_image_generation:
if supports_image_generation and __event_emitter__:
await __event_emitter__(
{
"type": "status",
Expand Down