diff --git a/pyproject.toml b/pyproject.toml index 657920e2c7..9b791f834c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -145,6 +145,7 @@ optional-dependencies.extensions = [ "llama-index-embeddings-google-genai>=0.3", "llama-index-readers-file>=0.4", "lxml>=5.3", + "openai>=2.20.0,<3.0.0", "pypika>=0.50", "toolbox-adk>=1,<2", ] @@ -207,7 +208,7 @@ optional-dependencies.test = [ "litellm>=1.83.7,<=1.83.14", "llama-index-readers-file>=0.4", "mcp>=1.24,<2", - "openai>=1.100.2", + "openai>=2.20.0,<3.0.0", "opentelemetry-exporter-gcp-logging>=1.9.0a0,<=1.12.0a0", "opentelemetry-exporter-gcp-monitoring>=1.9.0a0,<2", "opentelemetry-exporter-gcp-trace>=1.9,<2", diff --git a/src/google/adk/flows/llm_flows/contents.py b/src/google/adk/flows/llm_flows/contents.py index fab5afd2cd..f53911c678 100644 --- a/src/google/adk/flows/llm_flows/contents.py +++ b/src/google/adk/flows/llm_flows/contents.py @@ -52,16 +52,24 @@ async def run_async( ): preserve_function_call_ids = True else: - # Anthropic pairs tool_use/tool_result by id, so `adk-*` fallback - # ids must survive replay. + # Some non-Gemini providers pair tool calls/results by id, so `adk-*` + # fallback ids must survive replay. try: from ...models.anthropic_llm import AnthropicLlm except (ImportError, OSError): AnthropicLlm = None + try: + from ...labs.openai_responses import OpenAIResponsesLlm + except (ImportError, OSError): + OpenAIResponsesLlm = None if AnthropicLlm is not None and isinstance( canonical_model, AnthropicLlm ): preserve_function_call_ids = True + if OpenAIResponsesLlm is not None and isinstance( + canonical_model, OpenAIResponsesLlm + ): + preserve_function_call_ids = True # Preserve all contents that were added by instruction processor # (since llm_request.contents will be completely reassigned below) diff --git a/src/google/adk/labs/openai_responses/__init__.py b/src/google/adk/labs/openai_responses/__init__.py new file mode 100644 index 0000000000..1429c4938c --- /dev/null +++ b/src/google/adk/labs/openai_responses/__init__.py @@ -0,0 +1,21 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._openai_responses_llm import AzureOpenAIResponsesLlm +from ._openai_responses_llm import OpenAIResponsesLlm + +__all__ = [ + 'AzureOpenAIResponsesLlm', + 'OpenAIResponsesLlm', +] diff --git a/src/google/adk/labs/openai_responses/_openai_responses_llm.py b/src/google/adk/labs/openai_responses/_openai_responses_llm.py new file mode 100644 index 0000000000..46171048fc --- /dev/null +++ b/src/google/adk/labs/openai_responses/_openai_responses_llm.py @@ -0,0 +1,1253 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""OpenAI Responses API integrations for GPT models.""" + +from __future__ import annotations + +import base64 +import copy +import inspect +import json +import logging +import os +import re +from functools import cached_property +from typing import Any, AsyncGenerator, Callable, Mapping, cast + +from google.genai import types +from pydantic import BaseModel, Field +from typing_extensions import override + +try: + from openai import AsyncOpenAI + from openai.types.responses import ( + EasyInputMessageParam, + FunctionToolParam, + Response, + ResponseFunctionToolCall, + ResponseFunctionToolCallParam, + ResponseInputContentParam, + ResponseInputFileParam, + ResponseInputImageParam, + ResponseInputItemParam, + ResponseInputTextParam, + ResponseOutputItem, + ResponseOutputMessage, + ResponseOutputRefusal, + ResponseOutputText, + ResponseReasoningItem, + ResponseReasoningItemParam, + ResponseStreamEvent, + ResponseUsage, + ToolParam, + ) + from openai.types.responses.response_input_item_param import FunctionCallOutput + from openai.types.shared_params.reasoning import Reasoning as OpenAIReasoning +except ImportError as e: + raise ImportError( + "The 'openai' package is not installed. Please install it with " + '`pip install openai` to use the OpenAI Responses API labs models.' + ) from e + +from ...models.base_llm import BaseLlm +from ...models.llm_request import LlmRequest +from ...models.llm_response import LlmResponse + +logger = logging.getLogger('google_adk.' + __name__) + +__all__ = [ + 'AzureOpenAIResponsesLlm', + 'OpenAIResponsesLlm', +] + +_REFUSAL_PREFIX = 'OpenAI refusal: ' +_REASONING_NOT_GIVEN = object() + +_ResponsesInputItem = ResponseInputItemParam | EasyInputMessageParam + + +class _CallIdSanitizer: + """Maps invalid or missing function call IDs to stable Responses IDs.""" + + def __init__(self) -> None: + self._mapping: dict[str, str] = {} + self._next_fallback = 0 + + def sanitize(self, call_id: str | None) -> str: + if call_id and re.fullmatch(r'[a-zA-Z0-9_-]+', call_id): + return call_id + if not call_id: + fallback = f'call_adk_fallback_{self._next_fallback}' + self._next_fallback += 1 + return fallback + key = call_id + if key not in self._mapping: + self._mapping[key] = f'call_adk_fallback_{self._next_fallback}' + self._next_fallback += 1 + return self._mapping[key] + + +def _get_value(obj: object, key: str, default: Any = None) -> Any: + """Returns a value from either a mapping or an SDK object.""" + if obj is None: + return default + if isinstance(obj, Mapping): + return obj.get(key, default) + return getattr(obj, key, default) + + +def _to_dict(obj: object) -> dict[str, Any]: + """Returns a serializable dict for mappings and Pydantic SDK objects.""" + if obj is None: + return {} + if isinstance(obj, Mapping): + return dict(obj) + if hasattr(obj, 'model_dump'): + return obj.model_dump(exclude_none=True) + return { + key: value + for key, value in vars(obj).items() + if not key.startswith('_') and value is not None + } + + +def _serialize_json_value(value: object) -> str: + """Serializes tool output values into the string expected by Responses.""" + if value is None: + return '' + if isinstance(value, str): + return value + if isinstance(value, Mapping): + content = value.get('content') + if isinstance(content, list) and content: + content_items = [] + for item in content: + if isinstance(item, Mapping): + if item.get('type') == 'text' and 'text' in item: + content_items.append(str(item['text'])) + else: + content_items.append(str(dict(item))) + else: + content_items.append(str(item)) + return '\n'.join(content_items) + if isinstance(content, str) and content: + return content + if 'result' in value and value['result'] is not None: + result = value['result'] + if isinstance(result, str): + return result + return json.dumps(result) + return json.dumps(value) + + +def _signature_to_str(signature: bytes | str) -> str: + if isinstance(signature, bytes): + return signature.decode('utf-8') + return signature + + +def _loads_json_object(value: str | None) -> dict[str, Any]: + if not value: + return {} + try: + parsed = json.loads(value) + except json.JSONDecodeError: + logger.warning('Failed to parse Responses API function arguments as JSON.') + return {} + if isinstance(parsed, dict): + return parsed + return {} + + +def _serialize_system_instruction( + system_instruction: types.ContentUnion | None, +) -> str | None: + """Serializes ADK system instructions to Responses API instructions.""" + if not system_instruction: + return None + if isinstance(system_instruction, str): + return system_instruction + if isinstance(system_instruction, types.Part): + return system_instruction.text + if isinstance(system_instruction, types.Content): + return ''.join(part.text or '' for part in system_instruction.parts or []) + if isinstance(system_instruction, Mapping): + part = types.Part(**system_instruction) + return part.text + if isinstance(system_instruction, list): + return ''.join( + item + if isinstance(item, str) + else item.text or '' + if isinstance(item, types.Part) + else types.Part(**item).text or '' + for item in system_instruction + ) + return None + + +def _update_type_string(value: Any) -> None: + """Lowercases nested JSON schema type strings for OpenAI compatibility.""" + if isinstance(value, list): + for item in value: + _update_type_string(item) + return + + if not isinstance(value, dict): + return + + schema_type = value.get('type') + if isinstance(schema_type, str): + value['type'] = schema_type.lower() + + for child_value in value.values(): + if isinstance(child_value, (dict, list)): + _update_type_string(child_value) + + +def _enforce_strict_openai_schema(schema: dict[str, Any]) -> None: + """Recursively transforms a JSON schema for strict structured outputs.""" + if not isinstance(schema, dict): + return + if '$ref' in schema: + for key in list(schema.keys()): + if key != '$ref': + del schema[key] + return + if schema.get('type') == 'object' and 'properties' in schema: + schema['additionalProperties'] = False + schema['required'] = sorted(schema['properties'].keys()) + for defn in schema.get('$defs', {}).values(): + _enforce_strict_openai_schema(defn) + for prop in schema.get('properties', {}).values(): + _enforce_strict_openai_schema(prop) + for key in ('anyOf', 'oneOf', 'allOf'): + for item in schema.get(key, []): + _enforce_strict_openai_schema(item) + if 'items' in schema and isinstance(schema['items'], dict): + _enforce_strict_openai_schema(schema['items']) + + +def _schema_to_dict(schema: object) -> dict[str, Any]: + if isinstance(schema, types.Schema): + schema_dict = schema.model_dump(exclude_none=True, mode='json') + elif isinstance(schema, type) and issubclass(schema, BaseModel): + schema_dict = schema.model_json_schema() + elif isinstance(schema, BaseModel): + schema_dict = schema.__class__.model_json_schema() + elif isinstance(schema, Mapping): + schema_dict = copy.deepcopy(dict(schema)) + elif hasattr(schema, 'model_dump'): + schema_dict = schema.model_dump(exclude_none=True) + else: + schema_dict = {} + _update_type_string(schema_dict) + return schema_dict + + +def _response_text_config( + config: types.GenerateContentConfig, +) -> dict[str, Any] | None: + """Maps ADK structured output settings to Responses text config.""" + schema = config.response_schema or config.response_json_schema + if schema: + schema_dict = _schema_to_dict(schema) + if not schema_dict: + return None + schema_name = schema_dict.get('title') or getattr(schema, '__name__', None) + schema_name = schema_name or schema.__class__.__name__ + _enforce_strict_openai_schema(schema_dict) + return { + 'format': { + 'type': 'json_schema', + 'name': str(schema_name), + 'strict': True, + 'schema': schema_dict, + } + } + if config.response_mime_type == 'application/json': + return {'format': {'type': 'json_object'}} + return None + + +def _openai_reasoning_config( + config: types.GenerateContentConfig, +) -> OpenAIReasoning | None | object: + """Maps ADK thinking config to Responses reasoning config.""" + if not config.thinking_config: + return _REASONING_NOT_GIVEN + + thinking_level = config.thinking_config.thinking_level + if thinking_level: + effort = str(thinking_level.value).lower() + if effort == 'thinking_level_unspecified': + effort = 'medium' + return {'effort': effort, 'summary': 'concise'} + + thinking_budget = config.thinking_config.thinking_budget + if thinking_budget is None: + raise ValueError( + 'thinking_budget must be set explicitly when ThinkingConfig is' + ' provided without thinking_level for OpenAI Responses models. Use' + ' thinking_level for effort-based reasoning, 0 to disable reasoning,' + ' or -1 for medium reasoning.' + ) + if thinking_budget <= 0: + if thinking_budget < 0: + return {'effort': 'medium', 'summary': 'concise'} + return None + # OpenAI Responses reasoning is effort-based, not token-budget based. Positive + # ADK budgets request reasoning, so map them to a concrete medium effort with + # concise summaries instead of forwarding unsupported token budgets. + return {'effort': 'medium', 'summary': 'concise'} + + +def _role_to_responses_role(role: str | None) -> str: + if role in ('model', 'assistant'): + return 'assistant' + if role in ('system', 'developer'): + return role + return 'user' + + +def _responses_content_type(role: str, part: types.Part) -> str: + if part.thought: + return 'summary_text' + if role == 'assistant': + return 'output_text' + return 'input_text' + + +def _text_part_to_response_content( + role: str, part: types.Part +) -> ResponseInputContentParam | dict[str, Any]: + content: ResponseInputContentParam | dict[str, Any] + if part.thought: + content = {'type': 'summary_text', 'text': part.text or ''} + elif role == 'assistant': + content = {'type': 'output_text', 'text': part.text or ''} + else: + content = ResponseInputTextParam(type='input_text', text=part.text or '') + return content + + +def _reasoning_item_from_part( + part: types.Part, index: int +) -> ResponseReasoningItemParam: + item: dict[str, Any] = { + 'id': f'rs_adk_thought_{index}', + 'type': 'reasoning', + } + if part.text: + item['summary'] = [{'type': 'summary_text', 'text': part.text}] + if part.thought_signature: + item['encrypted_content'] = _signature_to_str(part.thought_signature) + return cast(ResponseReasoningItemParam, item) + + +def _skip_replayed_reasoning_part(part: types.Part) -> None: + """Skips ADK thought replay that cannot be addressed in Responses input. + + Responses reasoning input items must reference real reasoning item IDs from a + prior response. ADK thought parts do not currently carry those IDs, and + synthetic IDs are rejected by the API. Continuity is handled through + previous_response_id when available. + """ + if part.thought_signature: + logger.debug( + 'Skipping replayed OpenAI Responses reasoning part with encrypted ' + 'content because no prior reasoning item id is available.' + ) + else: + logger.debug( + 'Skipping replayed OpenAI Responses reasoning summary because no prior ' + 'reasoning item id is available.' + ) + + +def _inline_data_part_to_response_content( + part: types.Part, +) -> ResponseInputContentParam: + inline_data = part.inline_data + data = inline_data.data + if isinstance(data, bytes): + encoded = base64.b64encode(data).decode('utf-8') + else: + encoded = str(data) + mime_type = inline_data.mime_type or 'application/octet-stream' + if mime_type.startswith('image/'): + return ResponseInputImageParam( + type='input_image', + detail='auto', + image_url=f'data:{mime_type};base64,{encoded}', + ) + return ResponseInputFileParam( + type='input_file', + filename=inline_data.display_name or 'inline_data', + file_data=f'data:{mime_type};base64,{encoded}', + ) + + +def _file_data_part_to_response_content( + part: types.Part, +) -> ResponseInputContentParam: + file_data = part.file_data + file_uri = file_data.file_uri or '' + mime_type = file_data.mime_type or '' + if mime_type.startswith('image/'): + return ResponseInputImageParam( + type='input_image', detail='auto', image_url=file_uri + ) + if file_uri.startswith('file-'): + return ResponseInputFileParam(type='input_file', file_id=file_uri) + return ResponseInputFileParam(type='input_file', file_url=file_uri) + + +def _function_call_to_response_item( + function_call: types.FunctionCall, + sanitizer: _CallIdSanitizer, +) -> ResponseFunctionToolCallParam: + return ResponseFunctionToolCallParam( + type='function_call', + call_id=sanitizer.sanitize(function_call.id), + name=function_call.name or '', + arguments=json.dumps(function_call.args or {}), + ) + + +def _function_response_to_response_item( + function_response: types.FunctionResponse, + sanitizer: _CallIdSanitizer, +) -> FunctionCallOutput: + return FunctionCallOutput( + type='function_call_output', + call_id=sanitizer.sanitize(function_response.id), + output=_serialize_json_value(function_response.response), + ) + + +def _code_part_to_text(part: types.Part) -> str | None: + if part.executable_code: + return 'Code:```python\n' + part.executable_code.code + '\n```' + if part.code_execution_result: + return ( + 'Execution Result:```code_output\n' + + part.code_execution_result.output + + '\n```' + ) + return None + + +def _content_to_response_input_items( + content: types.Content, + sanitizer: _CallIdSanitizer | None = None, +) -> list[_ResponsesInputItem]: + """Converts ADK Content into Responses API input items.""" + role = _role_to_responses_role(content.role) + sanitizer = sanitizer or _CallIdSanitizer() + items: list[_ResponsesInputItem] = [] + message_parts: list[ResponseInputContentParam] = [] + + def flush_message_parts() -> None: + if message_parts: + items.append( + EasyInputMessageParam( + type='message', role=cast(Any, role), content=message_parts[:] + ) + ) + message_parts.clear() + + def append_assistant_text(text: str) -> None: + flush_message_parts() + items.append( + EasyInputMessageParam(type='message', role='assistant', content=text) + ) + + for index, part in enumerate(content.parts or []): + if part.function_response: + flush_message_parts() + items.append( + _function_response_to_response_item(part.function_response, sanitizer) + ) + elif part.function_call: + flush_message_parts() + items.append( + _function_call_to_response_item(part.function_call, sanitizer) + ) + elif part.thought and (part.text or part.thought_signature): + flush_message_parts() + _skip_replayed_reasoning_part(part) + elif part.text: + if role == 'assistant': + append_assistant_text(part.text) + else: + message_parts.append( + cast( + ResponseInputContentParam, + _text_part_to_response_content(role, part), + ) + ) + elif part.inline_data: + if role == 'assistant': + logger.warning( + 'Media data is not supported in Responses assistant turns.' + ) + continue + message_parts.append(_inline_data_part_to_response_content(part)) + elif part.file_data: + if role == 'assistant': + logger.warning( + 'Media data is not supported in Responses assistant turns.' + ) + continue + message_parts.append(_file_data_part_to_response_content(part)) + elif part.executable_code: + text = _code_part_to_text(part) + if text and role == 'assistant': + append_assistant_text(text) + elif text: + message_parts.append( + ResponseInputTextParam(type='input_text', text=text) + ) + elif part.code_execution_result: + text = _code_part_to_text(part) + if text and role == 'assistant': + append_assistant_text(text) + elif text: + message_parts.append( + ResponseInputTextParam(type='input_text', text=text) + ) + + flush_message_parts() + return items + + +def _function_declaration_to_response_tool( + function_declaration: types.FunctionDeclaration, +) -> FunctionToolParam: + """Converts an ADK FunctionDeclaration to a Responses function tool.""" + if not function_declaration.name: + raise ValueError('FunctionDeclaration must have a name.') + + if function_declaration.parameters_json_schema: + parameters = copy.deepcopy(function_declaration.parameters_json_schema) + _update_type_string(parameters) + elif function_declaration.parameters: + parameters = _schema_to_dict(function_declaration.parameters) + else: + parameters = {'type': 'object', 'properties': {}} + + required = ( + function_declaration.parameters.required + if function_declaration.parameters + and function_declaration.parameters.required + else None + ) + if required: + parameters['required'] = required + + return FunctionToolParam( + type='function', + name=function_declaration.name, + description=function_declaration.description or '', + parameters=parameters, + strict=False, + ) + + +def _tool_choice(config: types.GenerateContentConfig) -> str | None: + if not config.tool_config or not config.tool_config.function_calling_config: + return None + mode = config.tool_config.function_calling_config.mode + if mode == types.FunctionCallingConfigMode.ANY: + return 'required' + if mode == types.FunctionCallingConfigMode.NONE: + return 'none' + if mode == types.FunctionCallingConfigMode.AUTO: + return 'auto' + return None + + +def _usage_metadata( + usage: ResponseUsage | Mapping[str, Any] | None, +) -> types.GenerateContentResponseUsageMetadata | None: + if not usage: + return None + input_tokens = _get_value(usage, 'input_tokens') + output_tokens = _get_value(usage, 'output_tokens') + total_tokens = _get_value(usage, 'total_tokens') + if ( + total_tokens is None + and input_tokens is not None + and output_tokens is not None + ): + total_tokens = input_tokens + output_tokens + input_details = _get_value(usage, 'input_tokens_details') + output_details = _get_value(usage, 'output_tokens_details') + cached_tokens = _get_value(input_details, 'cached_tokens') + reasoning_tokens = _get_value(output_details, 'reasoning_tokens') + return types.GenerateContentResponseUsageMetadata( + prompt_token_count=input_tokens, + candidates_token_count=output_tokens, + total_token_count=total_tokens, + cached_content_token_count=cached_tokens, + thoughts_token_count=reasoning_tokens, + ) + + +def _map_finish_reason( + response: Response | Mapping[str, Any], +) -> types.FinishReason | None: + status = _get_value(response, 'status') + if status == 'completed': + return types.FinishReason.STOP + if status == 'incomplete': + incomplete_details = _get_value(response, 'incomplete_details') + reason = _get_value(incomplete_details, 'reason') + if reason in ('max_output_tokens', 'max_tokens'): + return types.FinishReason.MAX_TOKENS + return types.FinishReason.OTHER + if status in ('failed', 'cancelled'): + return types.FinishReason.OTHER + return None + + +def _message_content_parts( + item: ResponseOutputMessage | Mapping[str, Any], +) -> list[types.Part]: + parts = [] + for content in _get_value(item, 'content', []) or []: + if isinstance(content, ResponseOutputText): + parts.append(types.Part.from_text(text=content.text)) + continue + if isinstance(content, ResponseOutputRefusal): + parts.append(types.Part.from_text(text=_REFUSAL_PREFIX + content.refusal)) + continue + + content_type = _get_value(content, 'type') + text = _get_value(content, 'text') + if content_type == 'output_text' and text: + parts.append(types.Part.from_text(text=text)) + elif content_type == 'refusal': + refusal = _get_value(content, 'refusal') or text + if refusal: + parts.append(types.Part.from_text(text=_REFUSAL_PREFIX + refusal)) + return parts + + +def _reasoning_parts( + item: ResponseReasoningItem | Mapping[str, Any], +) -> tuple[list[types.Part], dict[str, Any]]: + parts = [] + metadata: dict[str, Any] = {} + encrypted_content = _get_value(item, 'encrypted_content') + summary = _get_value(item, 'summary', []) or [] + for summary_part in summary: + text = _get_value(summary_part, 'text') + if text: + part = types.Part(text=text, thought=True) + if encrypted_content: + part.thought_signature = encrypted_content.encode('utf-8') + parts.append(part) + content = _get_value(item, 'content', []) or [] + for content_part in content: + text = _get_value(content_part, 'text') + if text: + part = types.Part(text=text, thought=True) + if encrypted_content: + part.thought_signature = encrypted_content.encode('utf-8') + parts.append(part) + if encrypted_content: + metadata['encrypted_content'] = encrypted_content + if not parts: + parts.append( + types.Part( + thought=True, + thought_signature=encrypted_content.encode('utf-8'), + ) + ) + item_id = _get_value(item, 'id') + if item_id: + metadata['id'] = item_id + return parts, metadata + + +def _function_call_part( + item: ResponseFunctionToolCall | Mapping[str, Any], +) -> types.Part: + arguments = _get_value(item, 'arguments') + part = types.Part.from_function_call( + name=_get_value(item, 'name'), + args=_loads_json_object(arguments), + ) + part.function_call.id = _get_value(item, 'call_id') or _get_value(item, 'id') + return part + + +def _response_to_llm_response( + response: Response | Mapping[str, Any], + *, + include_response_metadata: bool = True, +) -> LlmResponse: + """Converts a Responses API response object to ADK LlmResponse.""" + parts: list[types.Part] = [] + output_metadata = [] + reasoning_metadata = [] + unmapped_output = [] + + for item in _get_value(response, 'output', []) or []: + if isinstance(item, ResponseOutputMessage): + parts.extend(_message_content_parts(item)) + item_type = item.type + elif isinstance(item, ResponseFunctionToolCall): + parts.append(_function_call_part(item)) + item_type = item.type + elif isinstance(item, ResponseReasoningItem): + reasoning, metadata = _reasoning_parts(item) + parts.extend(reasoning) + if metadata: + reasoning_metadata.append(metadata) + item_type = item.type + else: + item_type = _get_value(item, 'type') + if item_type == 'message': + parts.extend(_message_content_parts(cast(Mapping[str, Any], item))) + elif item_type == 'function_call': + parts.append(_function_call_part(cast(Mapping[str, Any], item))) + elif item_type == 'reasoning': + reasoning, metadata = _reasoning_parts(cast(Mapping[str, Any], item)) + parts.extend(reasoning) + if metadata: + reasoning_metadata.append(metadata) + else: + unmapped_output.append(_to_dict(item)) + + if item_type: + output_metadata.append(_to_dict(item)) + + usage = _get_value(response, 'usage') + custom_metadata = None + if include_response_metadata: + custom_metadata = { + 'openai_response': { + 'id': _get_value(response, 'id'), + 'status': _get_value(response, 'status'), + 'output': output_metadata, + } + } + if usage: + custom_metadata['openai_response']['usage'] = _to_dict(usage) + if reasoning_metadata: + custom_metadata['openai_response']['reasoning'] = reasoning_metadata + if unmapped_output: + custom_metadata['openai_response']['unmapped_output'] = unmapped_output + + finish_reason = _map_finish_reason(response) + llm_response = LlmResponse( + content=types.Content(role='model', parts=parts) if parts else None, + usage_metadata=_usage_metadata(usage), + finish_reason=finish_reason, + model_version=_get_value(response, 'model'), + interaction_id=_get_value(response, 'id'), + custom_metadata=custom_metadata, + ) + if finish_reason and finish_reason != types.FinishReason.STOP: + error = _get_value(response, 'error') or _get_value( + response, 'incomplete_details' + ) + llm_response.error_code = finish_reason + llm_response.error_message = json.dumps(_to_dict(error)) if error else None + return llm_response + + +class _StreamAccumulator: + """Accumulates Responses API stream events into a final ADK response.""" + + def __init__(self, *, include_response_metadata: bool = True) -> None: + self.include_response_metadata = include_response_metadata + self.output_items: dict[int | str, dict[str, Any]] = {} + self.output_order: list[int | str] = [] + self.function_calls: dict[int | str, dict[str, Any]] = {} + self.response: Response | Mapping[str, Any] | None = None + self.model: str | None = None + self.response_id: str | None = None + self.usage: ResponseUsage | Mapping[str, Any] | None = None + self.failed = False + self.reasoning_open = False + + def process_event( + self, event: ResponseStreamEvent | Mapping[str, Any] + ) -> list[LlmResponse]: + event_type = _get_value(event, 'type') + responses = [] + + if event_type == 'response.created': + response = _get_value(event, 'response') + self.response_id = _get_value(response, 'id') + self.model = _get_value(response, 'model') + elif event_type == 'response.output_text.delta': + responses.extend(self._close_reasoning_stream(event)) + delta = _get_value(event, 'delta') or '' + key = self._stream_output_key(event, 'message') + item = self._ensure_output_item(key, 'message') + self._append_indexed_text(item, 'text', event, delta, 'content_index') + responses.append( + LlmResponse( + content=types.Content( + role='model', parts=[types.Part.from_text(text=delta)] + ), + partial=True, + model_version=self.model, + interaction_id=self.response_id, + ) + ) + elif event_type in ( + 'response.reasoning_summary_text.delta', + 'response.reasoning_text.delta', + ): + delta = _get_value(event, 'delta') or '' + self.reasoning_open = True + key = self._stream_output_key(event, 'reasoning') + item = self._ensure_output_item(key, 'reasoning') + self._append_indexed_text( + item, 'reasoning', event, delta, 'summary_index' + ) + responses.append( + LlmResponse( + content=types.Content( + role='model', parts=[types.Part(text=delta, thought=True)] + ), + partial=True, + model_version=self.model, + interaction_id=self.response_id, + ) + ) + elif event_type == 'response.output_item.added': + item = _get_value(event, 'item') + item_type = _get_value(item, 'type') + if item_type != 'reasoning': + responses.extend(self._close_reasoning_stream(event)) + key = self._stream_output_key(event, _get_value(item, 'call_id')) + self._ensure_output_item(key, item_type) + if item_type == 'function_call': + self._track_function_call_item(key, item) + elif event_type in ( + 'response.content_part.done', + 'response.output_text.done', + ): + responses.extend(self._close_reasoning_stream(event)) + key = self._stream_output_key(event, 'message') + item = self._ensure_output_item(key, 'message') + part = _get_value(event, 'part') + text = _get_value(event, 'text') or _get_value(part, 'text') or '' + if text: + self._set_indexed_text(item, 'text', event, text, 'content_index') + elif event_type in ( + 'response.reasoning_summary_text.done', + 'response.reasoning_text.done', + 'response.reasoning_summary_part.done', + ): + key = self._stream_output_key(event, 'reasoning') + item = self._ensure_output_item(key, 'reasoning') + part = _get_value(event, 'part') + text = _get_value(event, 'text') or _get_value(part, 'text') or '' + if text: + self._set_indexed_text(item, 'reasoning', event, text, 'summary_index') + responses.extend(self._close_reasoning_stream(event)) + elif event_type == 'response.function_call_arguments.delta': + responses.extend(self._close_reasoning_stream(event)) + key = self._stream_output_key(event, _get_value(event, 'call_id')) + self._ensure_output_item(key, 'function_call') + call = self.function_calls.setdefault( + key, + { + 'name': _get_value(event, 'name') or '', + 'call_id': _get_value(event, 'call_id'), + 'arguments': '', + }, + ) + call['arguments'] += _get_value(event, 'delta') or '' + elif event_type == 'response.function_call_arguments.done': + responses.extend(self._close_reasoning_stream(event)) + key = self._stream_output_key(event, _get_value(event, 'call_id')) + self._ensure_output_item(key, 'function_call') + call = self.function_calls.setdefault( + key, + { + 'name': _get_value(event, 'name') or '', + 'call_id': _get_value(event, 'call_id'), + 'arguments': '', + }, + ) + arguments = _get_value(event, 'arguments') + if arguments is not None: + call['arguments'] = arguments + elif event_type == 'response.output_item.done': + item = _get_value(event, 'item') + item_type = _get_value(item, 'type') + if item_type != 'reasoning': + responses.extend(self._close_reasoning_stream(event)) + key = self._stream_output_key(event, _get_value(item, 'call_id')) + output_item = self._ensure_output_item(key, item_type) + output_item['done_item'] = item + if item_type == 'function_call': + self._track_function_call_item(key, item) + elif event_type in ('response.completed', 'response.incomplete'): + self.response = _get_value(event, 'response') + response_usage = _get_value(self.response, 'usage') + if response_usage: + self.usage = response_usage + elif event_type in ('response.failed', 'error'): + self.failed = True + responses.append( + LlmResponse( + error_code=types.FinishReason.OTHER, + error_message=json.dumps(_to_dict(event)), + finish_reason=types.FinishReason.OTHER, + interaction_id=self.response_id, + ) + ) + return responses + + def _close_reasoning_stream( + self, event: ResponseStreamEvent | Mapping[str, Any] + ) -> list[LlmResponse]: + if not self.reasoning_open: + return [] + self.reasoning_open = False + if not self.include_response_metadata: + return [] + stream_event: dict[str, Any] = { + 'type': _get_value(event, 'type'), + 'reasoning_done': True, + } + for key in ('output_index', 'item_id', 'summary_index'): + value = _get_value(event, key) + if value is not None: + stream_event[key] = value + return [ + LlmResponse( + partial=True, + model_version=self.model, + interaction_id=self.response_id, + custom_metadata={'openai_response': {'stream_event': stream_event}}, + ) + ] + + def _stream_output_key( + self, event: ResponseStreamEvent | Mapping[str, Any], fallback: Any + ) -> int | str: + output_index = _get_value(event, 'output_index') + if output_index is not None: + return output_index + item_id = _get_value(event, 'item_id') + if item_id is not None: + return item_id + if fallback is not None: + return fallback + return 'output' + + def _ensure_output_item( + self, key: int | str, item_type: str | None + ) -> dict[str, Any]: + if key not in self.output_items: + self.output_items[key] = {} + self.output_order.append(key) + item = self.output_items[key] + if item_type and 'type' not in item: + item['type'] = item_type + return item + + def _append_indexed_text( + self, + item: dict[str, Any], + field: str, + event: ResponseStreamEvent | Mapping[str, Any], + delta: str, + index_field: str, + ) -> None: + index = _get_value(event, index_field) + if index is None: + item[field] = item.get(field, '') + delta + return + parts = item.setdefault(f'{field}_parts', {}) + parts[index] = parts.get(index, '') + delta + + def _set_indexed_text( + self, + item: dict[str, Any], + field: str, + event: ResponseStreamEvent | Mapping[str, Any], + text: str, + index_field: str, + ) -> None: + index = _get_value(event, index_field) + if index is None: + item[field] = text + return + parts = item.setdefault(f'{field}_parts', {}) + parts[index] = text + + def _assembled_text(self, item: dict[str, Any], field: str) -> str: + text = item.get(field, '') + parts = item.get(f'{field}_parts') or {} + return text + ''.join(parts[index] for index in sorted(parts)) + + def _track_function_call_item( + self, key: int | str, item: ResponseOutputItem | Mapping[str, Any] + ) -> None: + self._ensure_output_item(key, 'function_call') + self.function_calls[key] = { + 'name': _get_value(item, 'name') or '', + 'call_id': _get_value(item, 'call_id') or _get_value(item, 'id'), + 'arguments': _get_value(item, 'arguments') or '', + } + + def final_response(self) -> LlmResponse | None: + if self.failed: + return None + if self.response: + return _response_to_llm_response( + self.response, + include_response_metadata=self.include_response_metadata, + ) + + parts = [] + for key in self.output_order: + item = self.output_items[key] + done_item = item.get('done_item') + item_type = ( + _get_value(done_item, 'type') if done_item else item.get('type') + ) + if done_item and item_type == 'message': + message_parts = _message_content_parts(done_item) + if message_parts: + parts.extend(message_parts) + continue + if done_item and item_type == 'reasoning': + reasoning, _ = _reasoning_parts(done_item) + if reasoning: + parts.extend(reasoning) + continue + if item_type == 'reasoning': + reasoning_text = self._assembled_text(item, 'reasoning') + if reasoning_text: + parts.append(types.Part(text=reasoning_text, thought=True)) + elif item_type == 'message': + text = self._assembled_text(item, 'text') + if text: + parts.append(types.Part.from_text(text=text)) + elif item_type == 'function_call' and key in self.function_calls: + parts.append(self._function_call_part_from_accumulator(key)) + for key in self.function_calls: + if key not in self.output_items: + parts.append(self._function_call_part_from_accumulator(key)) + if not parts: + return None + return LlmResponse( + content=types.Content(role='model', parts=parts), + partial=False, + finish_reason=types.FinishReason.STOP, + interaction_id=self.response_id, + model_version=self.model, + usage_metadata=_usage_metadata(self.usage), + ) + + def _function_call_part_from_accumulator(self, key: int | str) -> types.Part: + call = self.function_calls[key] + part = types.Part.from_function_call( + name=call.get('name'), + args=_loads_json_object(call.get('arguments')), + ) + part.function_call.id = call.get('call_id') + return part + + +class OpenAIResponsesLlm(BaseLlm): + """ADK model implementation backed by the OpenAI Responses API.""" + + model: str = 'gpt-5' + api_key: str | Callable[[], str] | None = None + organization: str | None = None + project: str | None = None + base_url: str | None = None + timeout: float | None = None + max_retries: int | None = None + default_headers: dict[str, str] | None = None + store: bool | None = None + include: list[str] | None = None + reasoning: OpenAIReasoning | None = None + parallel_tool_calls: bool | None = None + truncation: str | None = None + service_tier: str | None = None + include_response_metadata: bool = True + extra_request_args: dict[str, Any] = Field(default_factory=dict) + + @classmethod + @override + def supported_models(cls) -> list[str]: + return [] + + @override + async def generate_content_async( + self, llm_request: LlmRequest, stream: bool = False + ) -> AsyncGenerator[LlmResponse, None]: + kwargs = self._get_response_create_kwargs(llm_request, stream=stream) + if not stream: + response = await self._openai_client.responses.create(**kwargs) + yield _response_to_llm_response( + response, + include_response_metadata=self.include_response_metadata, + ) + return + + accumulator = _StreamAccumulator( + include_response_metadata=self.include_response_metadata + ) + response_stream = await self._openai_client.responses.create(**kwargs) + async for event in response_stream: + for response in accumulator.process_event(event): + yield response + final_response = accumulator.final_response() + if final_response: + yield final_response + + def _get_response_create_kwargs( + self, llm_request: LlmRequest, *, stream: bool + ) -> dict[str, Any]: + config = llm_request.config + kwargs: dict[str, Any] = { + 'model': llm_request.model or self.model, + 'input': self._get_response_input(llm_request), + 'stream': stream, + } + instructions = _serialize_system_instruction(config.system_instruction) + if instructions: + kwargs['instructions'] = instructions + if llm_request.previous_interaction_id: + kwargs['previous_response_id'] = llm_request.previous_interaction_id + + self._apply_config(config, kwargs) + self._apply_model_options(kwargs) + kwargs.update(self.extra_request_args) + return {key: value for key, value in kwargs.items() if value is not None} + + def _get_response_input( + self, llm_request: LlmRequest + ) -> list[_ResponsesInputItem]: + input_items: list[_ResponsesInputItem] = [] + sanitizer = _CallIdSanitizer() + for content in llm_request.contents or []: + input_items.extend(_content_to_response_input_items(content, sanitizer)) + return input_items + + def _apply_config( + self, config: types.GenerateContentConfig, kwargs: dict[str, Any] + ) -> None: + if config.temperature is not None: + kwargs['temperature'] = config.temperature + if config.top_p is not None: + kwargs['top_p'] = config.top_p + if config.max_output_tokens is not None: + kwargs['max_output_tokens'] = config.max_output_tokens + if config.stop_sequences: + kwargs['extra_body'] = { + **kwargs.get('extra_body', {}), + 'stop': config.stop_sequences, + } + text = _response_text_config(config) + if text: + kwargs['text'] = text + reasoning = _openai_reasoning_config(config) + if reasoning is not _REASONING_NOT_GIVEN: + kwargs['reasoning'] = reasoning + tools: list[ToolParam] = [] + for tool in config.tools or []: + for function_declaration in tool.function_declarations or []: + tools.append( + _function_declaration_to_response_tool(function_declaration) + ) + if tools: + kwargs['tools'] = tools + tool_choice = _tool_choice(config) + if tool_choice: + kwargs['tool_choice'] = tool_choice + + def _apply_model_options(self, kwargs: dict[str, Any]) -> None: + kwargs['store'] = self.store + kwargs['include'] = self.include + if 'reasoning' not in kwargs: + kwargs['reasoning'] = self.reasoning + kwargs['parallel_tool_calls'] = self.parallel_tool_calls + kwargs['truncation'] = self.truncation + kwargs['service_tier'] = self.service_tier + + def _resolve_api_key(self) -> str | None: + if callable(self.api_key): + return self.api_key() + return self.api_key + + @cached_property + def _openai_client(self) -> AsyncOpenAI: + kwargs: dict[str, Any] = { + 'api_key': self._resolve_api_key(), + 'organization': self.organization, + 'project': self.project, + 'base_url': self.base_url, + 'timeout': self.timeout, + 'default_headers': self.default_headers, + } + if self.max_retries is not None: + kwargs['max_retries'] = self.max_retries + return AsyncOpenAI( + **{key: value for key, value in kwargs.items() if value is not None} + ) + + +class AzureOpenAIResponsesLlm(OpenAIResponsesLlm): + """Azure OpenAI-compatible Responses API model. + + Azure's Responses API is exposed through an OpenAI-compatible + `/openai/v1/responses` endpoint. The `model` field should be the Azure model + deployment name. + """ + + azure_endpoint: str | None = None + api_key: str | Callable[[], str] | None = None + + def _resolve_api_key(self) -> str | None: + if callable(self.api_key): + value = self.api_key() + if inspect.isawaitable(value): + raise TypeError('Azure token providers for this model must be sync.') + return value + return self.api_key or os.environ.get('AZURE_OPENAI_API_KEY') + + @cached_property + def _openai_client(self) -> AsyncOpenAI: + base_url = self.base_url + if not base_url and self.azure_endpoint: + base_url = self.azure_endpoint.rstrip('/') + '/openai/v1/' + kwargs: dict[str, Any] = { + 'api_key': self._resolve_api_key(), + 'base_url': base_url, + 'timeout': self.timeout, + 'default_headers': self.default_headers, + } + if self.max_retries is not None: + kwargs['max_retries'] = self.max_retries + return AsyncOpenAI( + **{key: value for key, value in kwargs.items() if value is not None} + ) diff --git a/tests/unittests/flows/llm_flows/test_contents.py b/tests/unittests/flows/llm_flows/test_contents.py index 0136a0928a..a6d3d7e291 100644 --- a/tests/unittests/flows/llm_flows/test_contents.py +++ b/tests/unittests/flows/llm_flows/test_contents.py @@ -12,18 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. +import pytest +from google.genai import types + from google.adk.agents.llm_agent import Agent from google.adk.events.event import Event from google.adk.events.event_actions import EventActions from google.adk.flows.llm_flows import contents from google.adk.flows.llm_flows.contents import request_processor -from google.adk.flows.llm_flows.functions import REQUEST_CONFIRMATION_FUNCTION_CALL_NAME -from google.adk.flows.llm_flows.functions import REQUEST_EUC_FUNCTION_CALL_NAME +from google.adk.flows.llm_flows.functions import ( + REQUEST_CONFIRMATION_FUNCTION_CALL_NAME, + REQUEST_EUC_FUNCTION_CALL_NAME, +) +from google.adk.labs.openai_responses import OpenAIResponsesLlm from google.adk.models.anthropic_llm import AnthropicLlm from google.adk.models.google_llm import Gemini from google.adk.models.llm_request import LlmRequest -from google.genai import types -import pytest from ... import testing_utils @@ -1145,6 +1149,74 @@ async def test_adk_function_call_ids_preserved_for_anthropic_model(): assert user_fr_part.function_response.id == function_call_id +@pytest.mark.asyncio +async def test_adk_function_call_ids_preserved_for_openai_responses_model(): + """Responses API replay needs call_id values to match tool outputs.""" + agent = Agent( + model=OpenAIResponsesLlm(model="gpt-5.5"), + name="test_agent", + ) + llm_request = LlmRequest(model="gpt-5.5") + invocation_context = await testing_utils.create_invocation_context( + agent=agent + ) + + function_call_id = "adk-test-call-id" + events = [ + Event( + invocation_id="inv1", + author="user", + content=types.UserContent("Call the tool"), + ), + Event( + invocation_id="inv2", + author="test_agent", + content=types.Content( + role="model", + parts=[ + types.Part( + function_call=types.FunctionCall( + id=function_call_id, + name="test_tool", + args={"x": 1}, + ) + ) + ], + ), + ), + Event( + invocation_id="inv3", + author="test_agent", + content=types.Content( + role="user", + parts=[ + types.Part( + function_response=types.FunctionResponse( + id=function_call_id, + name="test_tool", + response={"result": 2}, + ) + ) + ], + ), + ), + ] + invocation_context.session.events = events + + async for _ in contents.request_processor.run_async( + invocation_context, llm_request + ): + pass + + model_fc_part = llm_request.contents[1].parts[0] + assert model_fc_part.function_call is not None + assert model_fc_part.function_call.id == function_call_id + + user_fr_part = llm_request.contents[2].parts[0] + assert user_fr_part.function_response is not None + assert user_fr_part.function_response.id == function_call_id + + def test_is_other_agent_reply_live_session(): """Test _is_other_agent_reply when live_session_id is present.""" event = Event(author="another_agent", live_session_id="session_123") diff --git a/tests/unittests/labs/openai_responses/test_openai_responses_llm.py b/tests/unittests/labs/openai_responses/test_openai_responses_llm.py new file mode 100644 index 0000000000..f3337f5d47 --- /dev/null +++ b/tests/unittests/labs/openai_responses/test_openai_responses_llm.py @@ -0,0 +1,1133 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from unittest import mock + +from google.genai import types +from pydantic import BaseModel + +if not hasattr(types, 'AvatarConfig'): + # The repository may be tested locally with a google-genai version older than + # the source tree expects. Keep this test focused on the labs model behavior. + types.AvatarConfig = type('AvatarConfig', (BaseModel,), {}) + +from google.adk.labs.openai_responses._openai_responses_llm import _content_to_response_input_items +from google.adk.labs.openai_responses._openai_responses_llm import _function_declaration_to_response_tool +from google.adk.labs.openai_responses._openai_responses_llm import _response_to_llm_response +from google.adk.labs.openai_responses._openai_responses_llm import AzureOpenAIResponsesLlm +from google.adk.labs.openai_responses._openai_responses_llm import OpenAIResponsesLlm +from google.adk.models.llm_request import LlmRequest +from openai.types.responses import EasyInputMessageParam +from openai.types.responses import FunctionToolParam +from openai.types.responses import Response +from openai.types.responses import ResponseFunctionToolCall +from openai.types.responses import ResponseFunctionToolCallParam +from openai.types.responses import ResponseInputFileParam +from openai.types.responses import ResponseInputImageParam +from openai.types.responses import ResponseInputItemParam +from openai.types.responses import ResponseInputTextParam +from openai.types.responses import ResponseOutputMessage +from openai.types.responses import ResponseOutputText +from openai.types.responses import ResponseReasoningItem +from openai.types.responses import ResponseReasoningItemParam +from openai.types.responses import ResponseStreamEvent +from openai.types.responses import ResponseUsage +from openai.types.responses import ToolParam +from openai.types.responses.response_reasoning_item import Summary +from openai.types.responses.response_usage import InputTokensDetails +from openai.types.responses.response_usage import OutputTokensDetails +import pytest + + +class _FakeAsyncStream: + + def __init__(self, events: list[dict]): + self._events = events + + def __aiter__(self): + return self._iter() + + async def _iter(self): + for event in self._events: + yield event + + +class _CaptureResponses: + + def __init__(self, response): + self.response = response + self.kwargs = None + + async def create(self, **kwargs): + self.kwargs = kwargs + return self.response + + +class _CaptureClient: + + def __init__(self, response): + self.responses = _CaptureResponses(response) + + +def test_openai_responses_package_exports_required_types(): + """The supported OpenAI SDK range exposes the Responses API types we use.""" + assert EasyInputMessageParam + assert FunctionToolParam + assert Response + assert ResponseFunctionToolCall + assert ResponseFunctionToolCallParam + assert ResponseInputFileParam + assert ResponseInputImageParam + assert ResponseInputItemParam + assert ResponseInputTextParam + assert ResponseOutputMessage + assert ResponseOutputText + assert ResponseReasoningItem + assert ResponseReasoningItemParam + assert ResponseStreamEvent + assert ResponseUsage + assert ToolParam + assert Summary + assert InputTokensDetails + assert OutputTokensDetails + + +def test_request_kwargs_use_responses_api_shape(): + """ADK requests are converted to Responses input, tools, and config.""" + llm = OpenAIResponsesLlm( + model='gpt-5', + store=False, + include=['reasoning.encrypted_content'], + reasoning={'effort': 'medium'}, + ) + llm_request = LlmRequest( + model='gpt-5-mini', + previous_interaction_id='resp_previous', + contents=[ + types.Content( + role='user', + parts=[types.Part.from_text(text='What is the weather?')], + ), + types.Content( + role='tool', + parts=[ + types.Part( + function_response=types.FunctionResponse( + id='call_weather', + name='get_weather', + response={'temperature': '70 F'}, + ) + ) + ], + ), + ], + config=types.GenerateContentConfig( + system_instruction='You are concise.', + temperature=0.2, + top_p=0.9, + max_output_tokens=128, + tools=[ + types.Tool( + function_declarations=[ + types.FunctionDeclaration( + name='get_weather', + description='Get weather', + parameters=types.Schema( + type=types.Type.OBJECT, + properties={ + 'location': types.Schema( + type=types.Type.STRING + ) + }, + required=['location'], + ), + ) + ] + ) + ], + ), + ) + + kwargs = llm._get_response_create_kwargs(llm_request, stream=False) + + assert kwargs['model'] == 'gpt-5-mini' + assert kwargs['instructions'] == 'You are concise.' + assert kwargs['previous_response_id'] == 'resp_previous' + assert kwargs['stream'] is False + assert kwargs['temperature'] == 0.2 + assert kwargs['top_p'] == 0.9 + assert kwargs['max_output_tokens'] == 128 + assert kwargs['store'] is False + assert kwargs['include'] == ['reasoning.encrypted_content'] + assert kwargs['reasoning'] == {'effort': 'medium'} + assert kwargs['input'] == [ + { + 'type': 'message', + 'role': 'user', + 'content': [{'type': 'input_text', 'text': 'What is the weather?'}], + }, + { + 'type': 'function_call_output', + 'call_id': 'call_weather', + 'output': '{"temperature": "70 F"}', + }, + ] + assert kwargs['tools'] == [{ + 'type': 'function', + 'name': 'get_weather', + 'description': 'Get weather', + 'parameters': { + 'type': 'object', + 'properties': {'location': {'type': 'string'}}, + 'required': ['location'], + }, + 'strict': False, + }] + + +def test_content_mapping_preserves_model_tool_calls_and_reasoning(): + """Model tool calls/text replay while synthetic reasoning is skipped.""" + function_call_part = types.Part.from_function_call( + name='get_weather', args={'location': 'Paris'} + ) + function_call_part.function_call.id = 'call_123' + thought_part = types.Part(text='Need weather first.', thought=True) + content = types.Content( + role='model', + parts=[thought_part, function_call_part, types.Part.from_text(text='Hi')], + ) + + items = _content_to_response_input_items(content) + + assert items == [ + { + 'type': 'function_call', + 'call_id': 'call_123', + 'name': 'get_weather', + 'arguments': '{"location": "Paris"}', + }, + { + 'type': 'message', + 'role': 'assistant', + 'content': 'Hi', + }, + ] + + +def test_content_mapping_preserves_reasoning_signature(): + """Replayed thoughts are skipped because synthetic IDs are invalid.""" + thought_part = types.Part(text='Need weather first.', thought=True) + thought_part.thought_signature = b'encrypted_reasoning' + redacted_part = types.Part( + thought=True, thought_signature=b'redacted_reasoning' + ) + content = types.Content(role='model', parts=[thought_part, redacted_part]) + + items = _content_to_response_input_items(content) + + assert items == [] + + +def test_content_mapping_sanitizes_function_call_ids_per_request(): + """Invalid IDs get stable fallbacks and missing IDs do not collide.""" + invalid_call = types.Part.from_function_call(name='tool', args={}) + invalid_call.function_call.id = 'invalid id!' + invalid_response = types.Part( + function_response=types.FunctionResponse( + id='invalid id!', name='tool', response={'result': 'ok'} + ) + ) + missing_call_1 = types.Part.from_function_call(name='tool', args={}) + missing_call_2 = types.Part.from_function_call(name='tool', args={}) + content = types.Content( + role='model', + parts=[invalid_call, invalid_response, missing_call_1, missing_call_2], + ) + + items = OpenAIResponsesLlm()._get_response_input( + LlmRequest(contents=[content]) + ) + + assert items[0]['call_id'] == 'call_adk_fallback_0' + assert items[1]['call_id'] == 'call_adk_fallback_0' + assert items[2]['call_id'] == 'call_adk_fallback_1' + assert items[3]['call_id'] == 'call_adk_fallback_2' + + +def test_function_response_serializes_mcp_content_as_text(): + """MCP-style text content is flattened for function_call_output.""" + content = types.Content( + role='tool', + parts=[ + types.Part( + function_response=types.FunctionResponse( + id='call_123', + name='tool', + response={ + 'content': [ + {'type': 'text', 'text': 'first'}, + {'type': 'text', 'text': 'second'}, + ] + }, + ) + ) + ], + ) + + items = _content_to_response_input_items(content) + + assert items == [{ + 'type': 'function_call_output', + 'call_id': 'call_123', + 'output': 'first\nsecond', + }] + + +def test_image_and_file_parts_use_responses_content_types(): + """Image and file parts become Responses input_image/input_file content.""" + content = types.Content( + role='user', + parts=[ + types.Part( + inline_data=types.Blob(data=b'image', mime_type='image/png') + ), + types.Part( + inline_data=types.Blob( + data=b'hello', mime_type='text/plain', display_name='a.txt' + ) + ), + types.Part( + file_data=types.FileData( + file_uri='file-abc', mime_type='application/pdf' + ) + ), + types.Part( + file_data=types.FileData( + file_uri='https://example.com/doc.pdf', + mime_type='application/pdf', + ) + ), + types.Part( + file_data=types.FileData( + file_uri='https://example.com/image.png', + mime_type='image/png', + ) + ), + ], + ) + + items = _content_to_response_input_items(content) + + assert items[0]['content'][0]['type'] == 'input_image' + assert items[0]['content'][0]['image_url'].startswith( + 'data:image/png;base64,' + ) + assert items[0]['content'][1] == { + 'type': 'input_file', + 'filename': 'a.txt', + 'file_data': 'data:text/plain;base64,aGVsbG8=', + } + assert items[0]['content'][2] == { + 'type': 'input_file', + 'file_id': 'file-abc', + } + assert items[0]['content'][3] == { + 'type': 'input_file', + 'file_url': 'https://example.com/doc.pdf', + } + assert items[0]['content'][4] == { + 'type': 'input_image', + 'detail': 'auto', + 'image_url': 'https://example.com/image.png', + } + + +def test_assistant_media_is_filtered(caplog): + """Assistant media is skipped instead of creating invalid input blocks.""" + content = types.Content( + role='model', + parts=[ + types.Part.from_text(text='before'), + types.Part( + inline_data=types.Blob(data=b'image', mime_type='image/png') + ), + types.Part.from_text(text='after'), + ], + ) + + items = _content_to_response_input_items(content) + + assert items == [ + {'type': 'message', 'role': 'assistant', 'content': 'before'}, + {'type': 'message', 'role': 'assistant', 'content': 'after'}, + ] + assert ( + 'Media data is not supported in Responses assistant turns.' in caplog.text + ) + + +def test_code_parts_are_preserved_as_text(): + """Code parts use the same lossy text fallback as other adapters.""" + content = types.Content( + role='user', + parts=[ + types.Part( + executable_code=types.ExecutableCode( + language='PYTHON', code='print(1)' + ) + ), + types.Part( + code_execution_result=types.CodeExecutionResult( + output='1', outcome=types.Outcome.OUTCOME_OK + ) + ), + ], + ) + + items = _content_to_response_input_items(content) + + assert items[0]['content'] == [ + {'type': 'input_text', 'text': 'Code:```python\nprint(1)\n```'}, + { + 'type': 'input_text', + 'text': 'Execution Result:```code_output\n1\n```', + }, + ] + + +def test_function_declaration_uses_responses_tool_shape(): + """Function declarations use top-level Responses function tool fields.""" + declaration = types.FunctionDeclaration( + name='search', + description='Search docs', + parameters_json_schema={ + 'type': 'OBJECT', + 'properties': {'query': {'type': 'STRING'}}, + }, + ) + + tool = _function_declaration_to_response_tool(declaration) + + assert tool == { + 'type': 'function', + 'name': 'search', + 'description': 'Search docs', + 'parameters': { + 'type': 'object', + 'properties': {'query': {'type': 'string'}}, + }, + 'strict': False, + } + + +def test_structured_output_uses_responses_text_format(): + """ADK response schemas become Responses text.format json_schema.""" + + class Answer(BaseModel): + answer: str + + llm = OpenAIResponsesLlm(model='gpt-5') + llm_request = LlmRequest( + contents=[ + types.Content(role='user', parts=[types.Part.from_text(text='Hi')]) + ], + config=types.GenerateContentConfig(response_schema=Answer), + ) + + kwargs = llm._get_response_create_kwargs(llm_request, stream=False) + + assert kwargs['text']['format']['type'] == 'json_schema' + assert kwargs['text']['format']['name'] == 'Answer' + assert kwargs['text']['format']['strict'] is True + assert kwargs['text']['format']['schema']['additionalProperties'] is False + assert kwargs['text']['format']['schema']['required'] == ['answer'] + + +def test_thinking_config_zero_budget_disables_reasoning_override(): + """thinking_budget=0 suppresses adapter-level reasoning config.""" + llm = OpenAIResponsesLlm(model='gpt-5', reasoning={'effort': 'medium'}) + llm_request = LlmRequest( + contents=[ + types.Content(role='user', parts=[types.Part.from_text(text='Hi')]) + ], + config=types.GenerateContentConfig( + thinking_config=types.ThinkingConfig(thinking_budget=0) + ), + ) + + kwargs = llm._get_response_create_kwargs(llm_request, stream=False) + + assert 'reasoning' not in kwargs + + +@pytest.mark.parametrize( + ('thinking_level', 'effort'), + [ + (types.ThinkingLevel.MINIMAL, 'minimal'), + (types.ThinkingLevel.LOW, 'low'), + (types.ThinkingLevel.MEDIUM, 'medium'), + (types.ThinkingLevel.HIGH, 'high'), + (types.ThinkingLevel.THINKING_LEVEL_UNSPECIFIED, 'medium'), + ], +) +def test_thinking_config_level_maps_to_openai_reasoning_effort( + thinking_level, effort +): + """thinking_level maps directly to Responses reasoning effort.""" + llm = OpenAIResponsesLlm(model='gpt-5') + llm_request = LlmRequest( + contents=[ + types.Content(role='user', parts=[types.Part.from_text(text='Hi')]) + ], + config=types.GenerateContentConfig( + thinking_config=types.ThinkingConfig(thinking_level=thinking_level) + ), + ) + + kwargs = llm._get_response_create_kwargs(llm_request, stream=False) + + assert kwargs['reasoning'] == {'effort': effort, 'summary': 'concise'} + + +def test_thinking_config_level_takes_precedence_over_budget(): + """thinking_level is a better OpenAI mapping than token budget.""" + llm = OpenAIResponsesLlm(model='gpt-5') + llm_request = LlmRequest( + contents=[ + types.Content(role='user', parts=[types.Part.from_text(text='Hi')]) + ], + config=types.GenerateContentConfig( + thinking_config=types.ThinkingConfig( + thinking_budget=0, thinking_level=types.ThinkingLevel.HIGH + ) + ), + ) + + kwargs = llm._get_response_create_kwargs(llm_request, stream=False) + + assert kwargs['reasoning'] == {'effort': 'high', 'summary': 'concise'} + + +def test_thinking_config_automatic_uses_medium_concise_reasoning(): + """Negative budgets map to medium reasoning with concise summaries.""" + llm = OpenAIResponsesLlm(model='gpt-5', reasoning={'effort': 'high'}) + llm_request = LlmRequest( + contents=[ + types.Content(role='user', parts=[types.Part.from_text(text='Hi')]) + ], + config=types.GenerateContentConfig( + thinking_config=types.ThinkingConfig(thinking_budget=-1) + ), + ) + + kwargs = llm._get_response_create_kwargs(llm_request, stream=False) + + assert kwargs['reasoning'] == {'effort': 'medium', 'summary': 'concise'} + + +def test_thinking_config_none_budget_raises(): + """ThinkingConfig requires level or explicit budget semantics.""" + llm = OpenAIResponsesLlm(model='gpt-5') + llm_request = LlmRequest( + contents=[ + types.Content(role='user', parts=[types.Part.from_text(text='Hi')]) + ], + config=types.GenerateContentConfig( + thinking_config=types.ThinkingConfig() + ), + ) + + with pytest.raises( + ValueError, match='thinking_budget must be set explicitly' + ): + llm._get_response_create_kwargs(llm_request, stream=False) + + +def test_thinking_config_positive_budget_uses_medium_concise_reasoning(): + """Positive budgets map to medium reasoning with concise summaries.""" + llm = OpenAIResponsesLlm(model='gpt-5') + llm_request = LlmRequest( + contents=[ + types.Content(role='user', parts=[types.Part.from_text(text='Hi')]) + ], + config=types.GenerateContentConfig( + thinking_config=types.ThinkingConfig(thinking_budget=1024) + ), + ) + + kwargs = llm._get_response_create_kwargs(llm_request, stream=False) + + assert kwargs['reasoning'] == {'effort': 'medium', 'summary': 'concise'} + + +def test_response_parsing_maps_text_reasoning_tool_calls_and_usage(): + """Responses output items become ADK text, thought, and function parts.""" + response = { + 'id': 'resp_123', + 'model': 'gpt-5', + 'status': 'completed', + 'usage': { + 'input_tokens': 11, + 'output_tokens': 7, + 'total_tokens': 18, + 'input_tokens_details': {'cached_tokens': 3}, + 'output_tokens_details': {'reasoning_tokens': 4}, + }, + 'output': [ + { + 'type': 'reasoning', + 'id': 'rs_1', + 'summary': [{'type': 'summary_text', 'text': 'Think.'}], + 'encrypted_content': 'encrypted', + }, + { + 'type': 'message', + 'role': 'assistant', + 'content': [{'type': 'output_text', 'text': 'Calling a tool.'}], + }, + { + 'type': 'function_call', + 'call_id': 'call_123', + 'name': 'get_weather', + 'arguments': '{"location": "Paris"}', + }, + ], + } + + llm_response = _response_to_llm_response(response) + + assert llm_response.interaction_id == 'resp_123' + assert llm_response.model_version == 'gpt-5' + assert llm_response.finish_reason == types.FinishReason.STOP + assert llm_response.usage_metadata.prompt_token_count == 11 + assert llm_response.usage_metadata.candidates_token_count == 7 + assert llm_response.usage_metadata.total_token_count == 18 + assert llm_response.usage_metadata.cached_content_token_count == 3 + assert llm_response.usage_metadata.thoughts_token_count == 4 + assert llm_response.content.parts[0].thought is True + assert llm_response.content.parts[0].text == 'Think.' + assert llm_response.content.parts[0].thought_signature == b'encrypted' + assert llm_response.content.parts[1].text == 'Calling a tool.' + function_call = llm_response.content.parts[2].function_call + assert function_call.id == 'call_123' + assert function_call.name == 'get_weather' + assert function_call.args == {'location': 'Paris'} + assert llm_response.custom_metadata['openai_response']['reasoning'] == [ + {'encrypted_content': 'encrypted', 'id': 'rs_1'} + ] + + +def test_response_parsing_accepts_openai_sdk_response_types(): + """OpenAI SDK Response objects are parsed through typed paths.""" + response = Response( + id='resp_typed', + created_at=1.0, + model='gpt-5', + object='response', + output=[ + ResponseReasoningItem( + id='rs_typed', + type='reasoning', + summary=[Summary(type='summary_text', text='Typed thought.')], + encrypted_content='encrypted_typed', + ), + ResponseOutputMessage( + id='msg_typed', + type='message', + role='assistant', + status='completed', + content=[ + ResponseOutputText( + type='output_text', text='Typed hello.', annotations=[] + ) + ], + ), + ResponseFunctionToolCall( + type='function_call', + call_id='call_typed', + name='get_weather', + arguments='{"city": "Tokyo"}', + ), + ], + parallel_tool_calls=True, + tool_choice='auto', + tools=[], + status='completed', + usage=ResponseUsage( + input_tokens=3, + input_tokens_details=InputTokensDetails(cached_tokens=1), + output_tokens=5, + output_tokens_details=OutputTokensDetails(reasoning_tokens=2), + total_tokens=8, + ), + ) + + llm_response = _response_to_llm_response(response) + + assert llm_response.interaction_id == 'resp_typed' + assert llm_response.content.parts[0].thought is True + assert llm_response.content.parts[0].text == 'Typed thought.' + assert llm_response.content.parts[0].thought_signature == b'encrypted_typed' + assert llm_response.content.parts[1].text == 'Typed hello.' + assert llm_response.content.parts[2].function_call.id == 'call_typed' + assert llm_response.content.parts[2].function_call.args == {'city': 'Tokyo'} + assert llm_response.usage_metadata.total_token_count == 8 + assert llm_response.custom_metadata['openai_response']['reasoning'] == [ + {'encrypted_content': 'encrypted_typed', 'id': 'rs_typed'} + ] + + +def test_response_parsing_preserves_redacted_reasoning(): + """Encrypted-only reasoning becomes a signature-only thought part.""" + response = { + 'id': 'resp_123', + 'model': 'gpt-5', + 'status': 'completed', + 'output': [ + { + 'type': 'reasoning', + 'id': 'rs_1', + 'encrypted_content': 'encrypted_only', + }, + ], + } + + llm_response = _response_to_llm_response(response) + + part = llm_response.content.parts[0] + assert part.thought is True + assert part.text is None + assert part.thought_signature == b'encrypted_only' + + +@pytest.mark.asyncio +async def test_generate_content_async_calls_responses_create(): + """Non-streaming generation calls responses.create and parses the result.""" + response = { + 'id': 'resp_123', + 'model': 'gpt-5', + 'status': 'completed', + 'output': [{ + 'type': 'message', + 'role': 'assistant', + 'content': [{'type': 'output_text', 'text': 'Hello'}], + }], + } + client = _CaptureClient(response) + llm = OpenAIResponsesLlm(model='gpt-5') + llm.__dict__['_openai_client'] = client + llm_request = LlmRequest( + contents=[ + types.Content(role='user', parts=[types.Part.from_text(text='Hi')]) + ] + ) + + responses = [item async for item in llm.generate_content_async(llm_request)] + + assert client.responses.kwargs['model'] == 'gpt-5' + assert client.responses.kwargs['stream'] is False + assert responses[0].content.parts[0].text == 'Hello' + assert responses[0].interaction_id == 'resp_123' + + +@pytest.mark.asyncio +async def test_generate_content_async_can_skip_response_metadata(): + """Response metadata can be omitted from LlmResponse.custom_metadata.""" + response = { + 'id': 'resp_123', + 'model': 'gpt-5', + 'status': 'completed', + 'usage': { + 'input_tokens': 1, + 'output_tokens': 2, + 'total_tokens': 3, + }, + 'output': [{ + 'type': 'message', + 'role': 'assistant', + 'content': [{'type': 'output_text', 'text': 'Hello'}], + }], + } + client = _CaptureClient(response) + llm = OpenAIResponsesLlm(model='gpt-5', include_response_metadata=False) + llm.__dict__['_openai_client'] = client + llm_request = LlmRequest( + contents=[ + types.Content(role='user', parts=[types.Part.from_text(text='Hi')]) + ] + ) + + responses = [item async for item in llm.generate_content_async(llm_request)] + + assert responses[0].custom_metadata is None + assert responses[0].usage_metadata.total_token_count == 3 + + +@pytest.mark.asyncio +async def test_streaming_generation_yields_partials_and_final_response(): + """Streaming generation yields text/thought deltas and a final response.""" + stream = _FakeAsyncStream([ + { + 'type': 'response.created', + 'response': {'id': 'resp_stream', 'model': 'gpt-5'}, + }, + {'type': 'response.reasoning_summary_text.delta', 'delta': 'Think'}, + {'type': 'response.output_text.delta', 'delta': 'Hel'}, + {'type': 'response.output_text.delta', 'delta': 'lo'}, + { + 'type': 'response.completed', + 'response': { + 'id': 'resp_stream', + 'model': 'gpt-5', + 'status': 'completed', + 'output': [ + { + 'type': 'reasoning', + 'summary': [{'type': 'summary_text', 'text': 'Think'}], + }, + { + 'type': 'message', + 'role': 'assistant', + 'content': [{'type': 'output_text', 'text': 'Hello'}], + }, + ], + }, + }, + ]) + client = _CaptureClient(stream) + llm = OpenAIResponsesLlm(model='gpt-5') + llm.__dict__['_openai_client'] = client + llm_request = LlmRequest( + contents=[ + types.Content(role='user', parts=[types.Part.from_text(text='Hi')]) + ] + ) + + responses = [ + item + async for item in llm.generate_content_async(llm_request, stream=True) + ] + + assert client.responses.kwargs['stream'] is True + assert responses[0].partial is True + assert responses[0].content.parts[0].thought is True + assert responses[0].content.parts[0].text == 'Think' + assert responses[1].partial is True + assert responses[1].content is None + assert responses[1].custom_metadata == { + 'openai_response': { + 'stream_event': { + 'type': 'response.output_text.delta', + 'reasoning_done': True, + } + } + } + assert responses[2].content.parts[0].text == 'Hel' + assert responses[3].content.parts[0].text == 'lo' + assert responses[4].partial is None + assert responses[4].content.parts[0].thought is True + assert responses[4].content.parts[1].text == 'Hello' + + +@pytest.mark.asyncio +async def test_streaming_generation_can_skip_response_metadata(): + """Metadata-only stream boundary events are omitted when metadata is off.""" + stream = _FakeAsyncStream([ + { + 'type': 'response.created', + 'response': {'id': 'resp_stream', 'model': 'gpt-5'}, + }, + {'type': 'response.reasoning_summary_text.delta', 'delta': 'Think'}, + {'type': 'response.output_text.delta', 'delta': 'Hello'}, + { + 'type': 'response.completed', + 'response': { + 'id': 'resp_stream', + 'model': 'gpt-5', + 'status': 'completed', + 'output': [ + { + 'type': 'reasoning', + 'summary': [{'type': 'summary_text', 'text': 'Think'}], + }, + { + 'type': 'message', + 'role': 'assistant', + 'content': [{'type': 'output_text', 'text': 'Hello'}], + }, + ], + }, + }, + ]) + client = _CaptureClient(stream) + llm = OpenAIResponsesLlm(model='gpt-5', include_response_metadata=False) + llm.__dict__['_openai_client'] = client + llm_request = LlmRequest( + contents=[ + types.Content(role='user', parts=[types.Part.from_text(text='Hi')]) + ] + ) + + responses = [ + item + async for item in llm.generate_content_async(llm_request, stream=True) + ] + + assert [response.custom_metadata for response in responses] == [ + None, + None, + None, + ] + assert responses[0].content.parts[0].thought is True + assert responses[1].content.parts[0].text == 'Hello' + assert responses[2].partial is None + + +@pytest.mark.asyncio +async def test_streaming_generation_fallback_preserves_output_item_order(): + """Fallback final response preserves separate reasoning/text items.""" + stream = _FakeAsyncStream([ + { + 'type': 'response.created', + 'response': {'id': 'resp_stream', 'model': 'gpt-5'}, + }, + { + 'type': 'response.output_item.added', + 'output_index': 0, + 'item': {'id': 'rs_1', 'type': 'reasoning', 'summary': []}, + }, + { + 'type': 'response.reasoning_summary_text.delta', + 'output_index': 0, + 'summary_index': 0, + 'delta': 'Think', + }, + { + 'type': 'response.reasoning_summary_text.done', + 'output_index': 0, + 'summary_index': 0, + 'text': 'Think', + }, + { + 'type': 'response.output_item.added', + 'output_index': 1, + 'item': {'id': 'msg_1', 'type': 'message', 'content': []}, + }, + { + 'type': 'response.output_text.delta', + 'output_index': 1, + 'content_index': 0, + 'delta': 'Hel', + }, + { + 'type': 'response.output_text.delta', + 'output_index': 1, + 'content_index': 0, + 'delta': 'lo', + }, + { + 'type': 'response.output_item.added', + 'output_index': 2, + 'item': {'id': 'rs_2', 'type': 'reasoning', 'summary': []}, + }, + { + 'type': 'response.reasoning_summary_text.delta', + 'output_index': 2, + 'summary_index': 0, + 'delta': 'Again', + }, + { + 'type': 'response.output_item.added', + 'output_index': 3, + 'item': {'id': 'msg_2', 'type': 'message', 'content': []}, + }, + { + 'type': 'response.output_text.delta', + 'output_index': 3, + 'content_index': 0, + 'delta': 'Bye', + }, + ]) + client = _CaptureClient(stream) + llm = OpenAIResponsesLlm(model='gpt-5') + llm.__dict__['_openai_client'] = client + llm_request = LlmRequest( + contents=[ + types.Content(role='user', parts=[types.Part.from_text(text='Hi')]) + ] + ) + + responses = [ + item + async for item in llm.generate_content_async(llm_request, stream=True) + ] + + final_response = responses[-1] + assert final_response.partial is False + parts = final_response.content.parts + assert [(part.text, part.thought) for part in parts] == [ + ('Think', True), + ('Hello', None), + ('Again', True), + ('Bye', None), + ] + boundaries = [ + response + for response in responses + if response.custom_metadata + and response.custom_metadata['openai_response']['stream_event'][ + 'reasoning_done' + ] + ] + assert [ + boundary.custom_metadata['openai_response']['stream_event']['type'] + for boundary in boundaries + ] == [ + 'response.reasoning_summary_text.done', + 'response.output_item.added', + ] + + +@pytest.mark.asyncio +async def test_streaming_generation_aggregates_function_call_without_completed_event(): + """Streaming function-call events become a final ADK function call.""" + stream = _FakeAsyncStream([ + { + 'type': 'response.output_item.added', + 'output_index': 0, + 'item': { + 'type': 'function_call', + 'call_id': 'call_123', + 'name': 'get_weather', + 'arguments': '', + }, + }, + { + 'type': 'response.function_call_arguments.delta', + 'output_index': 0, + 'delta': '{"location"', + }, + { + 'type': 'response.function_call_arguments.delta', + 'output_index': 0, + 'delta': ': "Paris"}', + }, + ]) + client = _CaptureClient(stream) + llm = OpenAIResponsesLlm(model='gpt-5') + llm.__dict__['_openai_client'] = client + llm_request = LlmRequest( + contents=[ + types.Content(role='user', parts=[types.Part.from_text(text='Hi')]) + ] + ) + + responses = [ + item + async for item in llm.generate_content_async(llm_request, stream=True) + ] + + assert len(responses) == 1 + assert responses[0].finish_reason == types.FinishReason.STOP + function_call = responses[0].content.parts[0].function_call + assert function_call.id == 'call_123' + assert function_call.name == 'get_weather' + assert function_call.args == {'location': 'Paris'} + + +@pytest.mark.asyncio +async def test_streaming_generation_uses_function_arguments_done_event(): + """Final function-call arguments can arrive in a done event.""" + stream = _FakeAsyncStream([ + { + 'type': 'response.output_item.added', + 'output_index': 0, + 'item': { + 'type': 'function_call', + 'call_id': 'call_123', + 'name': 'get_weather', + 'arguments': '', + }, + }, + { + 'type': 'response.function_call_arguments.done', + 'output_index': 0, + 'arguments': '{"location": "Paris"}', + }, + ]) + client = _CaptureClient(stream) + llm = OpenAIResponsesLlm(model='gpt-5') + llm.__dict__['_openai_client'] = client + llm_request = LlmRequest( + contents=[ + types.Content(role='user', parts=[types.Part.from_text(text='Hi')]) + ] + ) + + responses = [ + item + async for item in llm.generate_content_async(llm_request, stream=True) + ] + + function_call = responses[0].content.parts[0].function_call + assert function_call.id == 'call_123' + assert function_call.args == {'location': 'Paris'} + + +@pytest.mark.asyncio +async def test_streaming_generation_failed_event_is_terminal(): + """A failed stream does not also emit a successful fallback final.""" + stream = _FakeAsyncStream([ + {'type': 'response.output_text.delta', 'delta': 'partial'}, + {'type': 'response.failed', 'response': {'id': 'resp_123'}}, + ]) + client = _CaptureClient(stream) + llm = OpenAIResponsesLlm(model='gpt-5') + llm.__dict__['_openai_client'] = client + llm_request = LlmRequest( + contents=[ + types.Content(role='user', parts=[types.Part.from_text(text='Hi')]) + ] + ) + + responses = [ + item + async for item in llm.generate_content_async(llm_request, stream=True) + ] + + assert len(responses) == 2 + assert responses[0].partial is True + assert responses[1].finish_reason == types.FinishReason.OTHER + assert responses[1].error_code == types.FinishReason.OTHER + + +def test_azure_client_uses_openai_v1_base_url(): + """Azure model uses the Azure OpenAI /openai/v1 base URL.""" + with mock.patch( + 'google.adk.labs.openai_responses._openai_responses_llm.AsyncOpenAI' + ) as client_cls: + llm = AzureOpenAIResponsesLlm( + model='deployment', + azure_endpoint='https://example.openai.azure.com/', + api_key='key', + ) + + _ = llm._openai_client + + client_cls.assert_called_once_with( + api_key='key', + base_url='https://example.openai.azure.com/openai/v1/', + )