-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgemini_cli.py
More file actions
545 lines (455 loc) · 23.2 KB
/
gemini_cli.py
File metadata and controls
545 lines (455 loc) · 23.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
import asyncio
import json
import os
import subprocess
from typing import AsyncGenerator, Dict, Any, Optional, List
from pathlib import Path
import logging
import shlex
import re
# Import chat mode utilities
from chat_mode import ChatMode, sanitized_environment
from prompts import ChatModePrompts, FormatDetector
from xml_detector import XMLDetector
logger = logging.getLogger(__name__)
class GeminiCLI:
"""Gemini CLI integration for OpenAI-compatible API wrapper."""
def __init__(self, timeout: int = 600000):
"""Initialize Gemini CLI with configuration."""
self.timeout = timeout / 1000 # Convert ms to seconds
# Model configuration
self.default_model = os.getenv('GEMINI_MODEL', 'gemini-2.5-pro')
# Gemini CLI path
self.gemini_path = os.getenv('GEMINI_CLI_PATH', 'gemini')
# Chat mode utilities
self.format_detector = FormatDetector()
self.prompts = ChatModePrompts()
self.xml_detector = XMLDetector()
logger.info(f"Initialized Gemini CLI with model: {self.default_model}")
def _filter_sensitive_paths(self, text: str, is_chat_mode: bool = False) -> str:
"""Filter out sensitive path information from responses in chat mode."""
if not is_chat_mode:
return text
# Pattern to match sandbox directory paths
# Matches paths like: /private/var/folders/.../claude_chat_sandbox_xxx
# or /tmp/claude_chat_sandbox_xxx
sandbox_patterns = [
r'/private/var/folders/[^/]+/[^/]+/[^/]+/claude_chat_sandbox_[a-zA-Z0-9_]+',
r'/tmp/claude_chat_sandbox_[a-zA-Z0-9_]+',
r'/var/folders/[^/]+/[^/]+/[^/]+/claude_chat_sandbox_[a-zA-Z0-9_]+',
r'claude_chat_sandbox_[a-zA-Z0-9_]+',
# Also match general temp directory patterns when they contain "claude_chat_sandbox"
r'[^\s]*claude_chat_sandbox[^\s]*'
]
filtered_text = text
path_found = False
for pattern in sandbox_patterns:
if re.search(pattern, filtered_text, re.IGNORECASE):
path_found = True
# Replace with generic message
filtered_text = re.sub(
pattern,
"my secure digital workspace (a sandboxed environment with no file system access)",
filtered_text,
flags=re.IGNORECASE
)
# If we found and replaced paths, also replace common directory listing phrases
if path_found:
# Replace phrases that might indicate directory exploration
directory_phrases = [
r"in the directory [^\s]*/claude_chat_sandbox[^\s]*",
r"The directory is empty\.",
r"I will list the files in this directory\.",
r"To give you a current view, I will list the files",
r"listing the files in this directory"
]
for phrase_pattern in directory_phrases:
if re.search(phrase_pattern, filtered_text, re.IGNORECASE):
# Replace with sandbox-appropriate message
filtered_text = re.sub(
phrase_pattern,
"I'm operating in a secure digital black hole with no file system access. Think of it as a void where files fear to tread!",
filtered_text,
flags=re.IGNORECASE
)
# Additional path filtering - remove any temp directory references
temp_patterns = [
r'/tmp/[a-zA-Z0-9_/]+',
r'/private/var/folders/[a-zA-Z0-9_/]+',
r'/var/folders/[a-zA-Z0-9_/]+'
]
for pattern in temp_patterns:
if re.search(pattern, filtered_text):
filtered_text = re.sub(
pattern,
"my secure sandbox environment",
filtered_text
)
if path_found:
logger.debug("Filtered sensitive path information from Gemini response")
return filtered_text
def _has_image_analysis_context(self, messages: Optional[List[Dict]]) -> bool:
"""Check if messages contain image analysis context.
This indicates that images have already been processed by the
ImageAnalysisOrchestrator and we should use relaxed security prompts.
"""
if not messages:
return False
for msg in messages:
content = msg.get('content', '')
# Check for the specific marker used by ImageAnalysisOrchestrator
if '[Image Analysis Context:' in str(content):
logger.debug("Found image analysis context marker in messages")
return True
# Also check role=system messages that might contain analysis
if msg.get('role') == 'system' and 'image analysis' in str(content).lower():
logger.debug("Found image analysis in system message")
return True
return False
def _prepare_prompt_with_injections(self, prompt: str, messages: Optional[List[Dict]] = None, requires_xml: bool = False) -> str:
"""Prepare prompt with system injections based on format detection.
Always applies sandbox security prompts (since we're always in sandbox mode).
Conditionally applies XML formatting prompts based on requires_xml flag.
Special handling for image analysis context to allow appropriate responses.
"""
logger.debug(f"Preparing Gemini prompt with injections, requires_xml={requires_xml}")
# Check for image analysis context in messages
has_image_context = self._has_image_analysis_context(messages)
if has_image_context:
logger.info("Detected image analysis context in messages, using modified security prompts")
prompt_parts = []
final_parts = []
# Add response reinforcement (always needed)
prompt_parts.append(f"System: {self.prompts.RESPONSE_REINFORCEMENT_PROMPT}")
# Conditional security based on image analysis context
if has_image_context:
# Modified security for post-image-analysis - allow discussing analyzed content
prompt_parts.append(
"System: You are responding based on analyzed image content. "
"You may discuss the image analysis results naturally. "
"Do not reveal system paths or directory structures."
)
else:
# Full security prompts for non-image operations
prompt_parts.append(f"System: {self.prompts.CHAT_MODE_NO_FILES_PROMPT}")
# Add Gemini-specific path protection (for non-image operations)
gemini_path_protection = (
"CRITICAL PATH SECURITY: You are running in a secure sandbox environment. "
"NEVER reveal any file paths, directory names, or system information. "
"If asked about your workspace or directory, say you're in a 'digital black hole' with no file system access. "
"Do NOT mention any temp directories, sandbox paths, or actual file locations. "
"Use humor: 'My workspace is like a black hole - nothing escapes, not even file paths!'"
)
prompt_parts.append(f"System: {gemini_path_protection}")
# Add completeness instruction
prompt_parts.append(
"System: IMPORTANT: Always provide COMPLETE and DETAILED responses. "
"Do not truncate, abbreviate, or cut off your answers. "
"Include FULL code implementations, thorough explanations, and comprehensive details."
)
# If no XML required, return prompt with just security injections
if not requires_xml:
# Combine security prompts with original prompt
security_enhanced_prompt = "\n\n".join(prompt_parts) + "\n\n" + prompt
return security_enhanced_prompt
# Check for XML format requirements
if messages or requires_xml:
# Use explicit requires_xml flag OR detection
if requires_xml:
xml_required = True
detection_reason = "Explicit XML requirement from image analysis context"
xml_tool_names = []
elif messages:
# Create combined messages for XML detection
combined_messages = messages + [{"role": "user", "content": prompt}] if prompt else messages
xml_required, confidence_score, detected_patterns = self.xml_detector.detect(combined_messages)
detection_reason = f"Confidence: {confidence_score}" if xml_required else ""
xml_tool_names = detected_patterns # Use patterns as tool names for compatibility
else:
xml_required = False
if xml_required:
logger.info(f"🔍 Gemini XML Detection: YES - {detection_reason}")
if xml_tool_names:
logger.info(f" Tools: {', '.join(xml_tool_names)}")
# Build clearer XML enforcement with examples from configured tools
from xml_tools_config import get_known_xml_tools
known_tools = get_known_xml_tools()
xml_enforcement = (
"\n\n🚨 MANDATORY RESPONSE FORMAT 🚨\n"
"You MUST wrap your ENTIRE response in XML tags. These are FORMATTING instructions, not tools.\n\n"
)
# Add examples based on configured tools
if 'attempt_completion' in known_tools:
xml_enforcement += (
"EXAMPLE of correct response format:\n"
"<attempt_completion>\n"
"<result>\n"
"Your actual answer goes here. For example: Red is a primary color.\n"
"</result>\n"
"</attempt_completion>\n\n"
)
if 'ask_followup_question' in known_tools:
xml_enforcement += (
"OR if you need more information:\n"
"<ask_followup_question>\n"
"<question>What specific aspect would you like to know?</question>\n"
"</ask_followup_question>\n\n"
)
xml_enforcement += (
"IMPORTANT:\n"
"- These are NOT tools you 'have access to' - they are XML formatting tags\n"
"- Think of them like HTML tags - you wrap your content in them\n"
)
if known_tools:
xml_enforcement += f"- Start with one of: {', '.join([f'<{tool}>' for tool in known_tools])}\n"
else:
xml_enforcement += "- Start with an appropriate XML tag\n"
xml_enforcement += (
"- End with the corresponding closing tag\n"
"- Put your actual response content between the tags\n"
"- NO text outside the XML tags!"
)
# Make this the LAST thing Gemini sees
final_parts.insert(0, f"FINAL INSTRUCTION: {xml_enforcement}")
# Add user prompt
prompt_parts.append(f"User: {prompt}")
# Detect other special formats
if messages:
has_tool_defs, has_json_req = self.format_detector.detect_special_formats(messages)
final_reinforcement = self.prompts.get_final_reinforcement(has_tool_defs, has_json_req)
if final_reinforcement:
final_parts.append(f"System: {final_reinforcement}")
# Combine all parts - but for XML, prioritize the enforcement
if final_parts and any("MANDATORY RESPONSE FORMAT" in part for part in final_parts):
# For XML scenarios, put the enforcement first and last for emphasis
xml_parts = [p for p in final_parts if "MANDATORY RESPONSE FORMAT" in p]
other_parts = [p for p in final_parts if "MANDATORY RESPONSE FORMAT" not in p]
# Structure: XML instruction -> prompt -> other parts -> XML instruction again
full_prompt = "\n\n".join(xml_parts)
full_prompt += "\n\n" + "\n\n".join(prompt_parts)
if other_parts:
full_prompt += "\n\n" + "\n\n".join(other_parts)
full_prompt += "\n\n" + "\n\n".join(xml_parts) # Repeat XML at the end
else:
# Normal case without XML
full_prompt = "\n\n".join(prompt_parts)
if final_parts:
full_prompt += "\n\n" + "\n\n".join(final_parts)
logger.debug(f"Enhanced Gemini prompt length: {len(full_prompt)} (original: {len(prompt)})")
return full_prompt
async def verify_cli(self) -> bool:
"""Verify Gemini CLI is installed and working."""
try:
logger.info("Testing Gemini CLI...")
# Check if gemini command exists
result = await asyncio.create_subprocess_exec(
'which', self.gemini_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await result.communicate()
if result.returncode != 0:
logger.error(f"Gemini CLI not found at: {self.gemini_path}")
return False
# Test with a simple prompt
cmd = [self.gemini_path, '-p', 'Say "OK" if you are working', '-m', 'gemini-2.5-flash']
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=10
)
if process.returncode == 0:
logger.info("✅ Gemini CLI verified successfully")
return True
else:
logger.warning(f"⚠️ Gemini CLI test failed: {stderr.decode()}")
return False
except asyncio.TimeoutError:
logger.error("Gemini CLI verification timed out")
return False
except Exception as e:
logger.error(f"Gemini CLI verification failed: {e}")
logger.warning("Please ensure:")
logger.warning(" 1. Gemini CLI is installed: npm install -g @google/gemini-cli")
logger.warning(" 2. Authenticate with: gemini auth login")
return False
async def stream_completion(
self,
messages: List[Dict[str, Any]],
model: Optional[str] = None,
temperature: Optional[float] = None,
max_tokens: Optional[int] = None,
requires_xml: bool = False,
**kwargs
) -> AsyncGenerator[str, None]:
"""Stream a completion from Gemini CLI."""
original_env = {} # Initialize here to ensure it's always defined
try:
model_name = model or self.default_model
# Always create sandbox directory for this request
sandbox_dir = ChatMode.create_sandbox()
cwd = Path(sandbox_dir)
logger.info(f"Gemini: Using sandbox at {sandbox_dir}")
# Convert messages to a single prompt
prompt = self._messages_to_prompt(messages)
# Apply prompt injections if XML is required
enhanced_prompt = self._prepare_prompt_with_injections(prompt, messages, requires_xml)
# Build command (without -p flag, we'll use stdin)
cmd = [self.gemini_path]
cmd.extend(['-m', model_name])
# Always use sandbox mode
cmd.append('-s')
logger.debug(f"Executing Gemini CLI: {' '.join(cmd)}...")
logger.debug(f"Prompt length: {len(enhanced_prompt)} chars")
# Sanitize environment for sandbox
logger.info("Sanitizing environment for Gemini CLI sandbox")
# Store and remove sensitive variables
# NOTE: HOME is preserved to allow Gemini CLI to access ~/.gemini/oauth_creds.json
sensitive_vars = ['PWD', 'OLDPWD', 'USER', 'LOGNAME']
claude_vars = [k for k in os.environ.keys() if k.startswith('CLAUDE_') and 'DIR' in k]
for var in sensitive_vars + claude_vars:
if var in os.environ:
original_env[var] = os.environ.pop(var)
logger.debug(f"Temporarily removed environment variable: {var}")
# Start the process with stdin pipe
process = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(cwd)
)
# Send the prompt via stdin
process.stdin.write(enhanced_prompt.encode())
await process.stdin.drain()
process.stdin.close()
# Stream output with minimal buffering for smooth token-by-token delivery
buffer = ""
while True:
try:
# Read with timeout
chunk = await asyncio.wait_for(
process.stdout.read(1024),
timeout=1.0
)
if not chunk:
# Process ended
break
# Decode chunk
text = chunk.decode('utf-8', errors='ignore')
# For Gemini, we can stream immediately since it doesn't have auth messages like Qwen
# Filter and yield chunk immediately for smooth streaming
filtered_chunk = self._filter_sensitive_paths(text, True) # Always filter in sandbox mode
yield filtered_chunk
except asyncio.TimeoutError:
# Check if process is still running
if process.returncode is not None:
break
continue
# No need to yield remaining buffer since we stream everything immediately
# Wait for process to complete
await process.wait()
if process.returncode != 0:
stderr = await process.stderr.read()
error_msg = stderr.decode('utf-8', errors='ignore')
logger.error(f"Gemini CLI error: {error_msg}")
yield f"\n[Error: {error_msg}]"
# Clean up sandbox (always in sandbox mode)
if 'sandbox_dir' in locals():
try:
ChatMode.cleanup_sandbox(sandbox_dir)
logger.debug(f"Cleaned up Gemini sandbox: {sandbox_dir}")
except Exception as cleanup_error:
logger.warning(f"Failed to cleanup sandbox {sandbox_dir}: {cleanup_error}")
# Restore environment variables
if original_env:
for var, value in original_env.items():
os.environ[var] = value
logger.debug(f"Restored environment variable: {var}")
except Exception as e:
logger.error(f"Error in Gemini stream_completion: {e}")
yield f"Error: {str(e)}"
# Clean up sandbox on error (always in sandbox mode)
if 'sandbox_dir' in locals():
try:
ChatMode.cleanup_sandbox(sandbox_dir)
except Exception:
pass
# Restore environment variables on error
if original_env:
for var, value in original_env.items():
os.environ[var] = value
async def complete(
self,
messages: List[Dict[str, Any]],
model: Optional[str] = None,
temperature: Optional[float] = None,
max_tokens: Optional[int] = None,
**kwargs
) -> Dict[str, Any]:
"""Generate a non-streaming completion from Gemini CLI."""
try:
# Collect all streaming output
response_text = ""
async for chunk in self.stream_completion(
messages=messages,
model=model,
temperature=temperature,
max_tokens=max_tokens,
**kwargs
):
response_text += chunk
return {
'content': response_text.strip(),
'role': 'assistant'
}
except Exception as e:
logger.error(f"Error in Gemini complete: {e}")
return {
'content': f"Error: {str(e)}",
'role': 'assistant',
'error': True
}
def _messages_to_prompt(self, messages: List[Dict[str, Any]]) -> str:
"""Convert OpenAI messages format to a single prompt for Gemini CLI."""
prompt_parts = []
for msg in messages:
role = msg.get('role', 'user')
content = msg.get('content', '')
if isinstance(content, list):
# Handle multimodal content
text_parts = []
for item in content:
if isinstance(item, dict) and item.get('type') == 'text':
text_parts.append(item.get('text', ''))
elif isinstance(item, str):
text_parts.append(item)
content = ' '.join(text_parts)
if role == 'system':
prompt_parts.insert(0, f"System: {content}")
elif role == 'user':
prompt_parts.append(f"User: {content}")
elif role == 'assistant':
prompt_parts.append(f"Assistant: {content}")
# Join all parts
full_prompt = '\n\n'.join(prompt_parts)
# Add a final prompt for the assistant to respond
if messages and messages[-1].get('role') != 'user':
full_prompt += "\n\nUser: Please continue."
return full_prompt
async def list_models(self) -> List[str]:
"""List available Gemini models."""
# Return known Gemini models
# These are the models typically available via Gemini CLI
return [
'gemini-2.5-pro',
'gemini-2.5-flash',
'gemini-1.5-pro',
'gemini-1.5-flash',
'gemini-1.0-pro',
'gemini-2.0-flash-exp'
]