diff --git a/scripts/a2a/debugger.py b/scripts/a2a/debugger.py index 2bfbfc79..d06b38ed 100644 --- a/scripts/a2a/debugger.py +++ b/scripts/a2a/debugger.py @@ -177,16 +177,22 @@ def build_message_stream_payload( request_id: str, message_id: str, images: Any = None, + iac_code_model: str | None = None, ) -> dict[str, Any]: parts = [] if prompt: parts.append({"text": prompt}) parts.extend(_normalize_image_parts(images)) + iac_code_metadata = {"cwd": cwd} + if iac_code_model: + stripped_model = iac_code_model.strip() + if stripped_model: + iac_code_metadata["iac_code_model"] = stripped_model message: dict[str, Any] = { "messageId": message_id, "role": "ROLE_USER", "parts": parts, - "metadata": {"iac_code": {"cwd": cwd}}, + "metadata": {"iac_code": iac_code_metadata}, } if context_id: message["contextId"] = context_id @@ -1386,6 +1392,9 @@ def render_index_html(config: DebuggerConfig) -> str: + @@ -1583,6 +1592,7 @@ def render_index_html(config: DebuggerConfig) -> str: return { serverUrl: byId("server-url").value.trim(), cwd: byId("cwd").value.trim(), + iacCodeModel: byId("iac-code-model").value.trim(), contextId: byId("context-id").value.trim(), taskId: byId("task-id").value.trim(), activeTaskId: byId("active-task-id").value.trim(), @@ -4271,6 +4281,7 @@ def render_index_html(config: DebuggerConfig) -> str: const payload = { serverUrl: controls.serverUrl, cwd: controls.cwd, + iacCodeModel: controls.iacCodeModel, contextId: controls.contextId || state.contextId, taskId: streamTaskIdForControls(controls), prompt: controls.prompt @@ -4542,7 +4553,7 @@ def render_index_html(config: DebuggerConfig) -> str: if (subtitle) { subtitle.textContent = "Read-only exported debugger snapshot"; } - ["server-url", "cwd", "context-id", "task-id", "active-task-id", "prompt"].forEach((id) => { + ["server-url", "cwd", "iac-code-model", "context-id", "task-id", "active-task-id", "prompt"].forEach((id) => { const element = byId(id); if (element) { element.readOnly = true; @@ -4586,10 +4597,10 @@ def render_index_html(config: DebuggerConfig) -> str: body.setAttribute("data-export-mode", "true"); } clone.querySelectorAll("#debug-export-data").forEach((node) => node.remove()); - ["server-url", "cwd", "context-id", "task-id", "active-task-id", "prompt"].forEach((id) => { + ["server-url", "cwd", "iac-code-model", "context-id", "task-id", "active-task-id", "prompt"].forEach((id) => { copyControlValueToClone(clone, id); }); - ["server-url", "cwd", "context-id", "task-id", "active-task-id", "prompt"].forEach((id) => { + ["server-url", "cwd", "iac-code-model", "context-id", "task-id", "active-task-id", "prompt"].forEach((id) => { const element = clone.querySelector(`#${cssEscape(id)}`); if (element) { element.setAttribute("readonly", "readonly"); @@ -4840,6 +4851,7 @@ def _open_sse_stream(server_url: str, payload: dict[str, Any], timeout: float = def _message_stream_body(body: dict[str, Any]) -> tuple[str, dict[str, Any]]: server_url = normalize_server_url(str(body.get("serverUrl", ""))) cwd = str(body.get("cwd", "")) + iac_code_model = str(body.get("iacCodeModel") or body.get("iac_code_model") or "") prompt = str(body.get("prompt", "")) context_id = str(body.get("contextId", "")) task_id = str(body.get("taskId", "")) @@ -4855,6 +4867,7 @@ def _message_stream_body(body: dict[str, Any]) -> tuple[str, dict[str, Any]]: request_id=str(uuid.uuid4()), message_id=str(uuid.uuid4()), images=body.get("images"), + iac_code_model=iac_code_model, ) return server_url, payload diff --git a/scripts/a2a/selling_console_web/app.js b/scripts/a2a/selling_console_web/app.js index 01105ded..484ec910 100644 --- a/scripts/a2a/selling_console_web/app.js +++ b/scripts/a2a/selling_console_web/app.js @@ -130,6 +130,7 @@ defaults: stateDefaults, serverUrl: stateDefaults.serverUrl || "", cwd: stateDefaults.cwd || "", + iacCodeModel: stateDefaults.iacCodeModel || "", contextId: "", pipelineTaskId: "", activeTaskId: "", @@ -1354,6 +1355,7 @@ return { serverUrl: source.serverUrl || "", cwd: source.cwd || "", + iacCodeModel: source.iacCodeModel || "", contextId: source.contextId || "", taskId: source.normalHandoffReady ? "" : source.activeTaskId || source.pipelineTaskId || "", prompt: prompt || "", @@ -2583,24 +2585,32 @@ const state = ensureState(); const serverInput = byId("server-url"); const cwdInput = byId("cwd"); + const modelInput = byId("iac-code-model"); if (serverInput && "value" in serverInput && !serverInput.value && state.serverUrl) { serverInput.value = state.serverUrl; } if (cwdInput && "value" in cwdInput && !cwdInput.value && state.cwd) { cwdInput.value = state.cwd; } + if (modelInput && "value" in modelInput && !modelInput.value && state.iacCodeModel) { + modelInput.value = state.iacCodeModel; + } } function syncStateFromConnectionControls() { const state = ensureState(); const serverInput = byId("server-url"); const cwdInput = byId("cwd"); + const modelInput = byId("iac-code-model"); if (serverInput && "value" in serverInput) { state.serverUrl = String(serverInput.value || "").trim(); } if (cwdInput && "value" in cwdInput) { state.cwd = String(cwdInput.value || "").trim(); } + if (modelInput && "value" in modelInput) { + state.iacCodeModel = String(modelInput.value || "").trim(); + } return state; } @@ -3842,6 +3852,7 @@ const fields = [ ["serverUrl", "Server URL", state.serverUrl || ""], ["cwd", "CWD", state.cwd || ""], + ["iacCodeModel", "Model", state.iacCodeModel || ""], ["contextId", "Context ID", state.contextId || "未获取"], ["pipelineTaskId", "Pipeline Task", state.pipelineTaskId || "未获取"], ["activeTaskId", "Active Task", state.activeTaskId || "未获取"], @@ -4220,6 +4231,7 @@ } const serverInput = byId("server-url"); const cwdInput = byId("cwd"); + const modelInput = byId("iac-code-model"); const sendButton = byId("send-button"); const composer = byId("composer-input"); const healthButton = byId("health-button"); @@ -4234,6 +4246,7 @@ addListener(serverInput, "input", syncStateFromConnectionControls); addListener(cwdInput, "input", syncStateFromConnectionControls); + addListener(modelInput, "input", syncStateFromConnectionControls); addListener(sendButton, "click", sendComposerMessage); addListener(healthButton, "click", healthCheck); addListener(fetchStateButton, "click", fetchState); @@ -4246,7 +4259,15 @@ } }); controller.bound = Boolean( - serverInput || cwdInput || sendButton || composer || healthButton || fetchStateButton || cancelButton || debugDrawer + serverInput || + cwdInput || + modelInput || + sendButton || + composer || + healthButton || + fetchStateButton || + cancelButton || + debugDrawer ); } diff --git a/scripts/a2a/selling_console_web/index.html b/scripts/a2a/selling_console_web/index.html index 63148bfa..f4ee104c 100644 --- a/scripts/a2a/selling_console_web/index.html +++ b/scripts/a2a/selling_console_web/index.html @@ -135,6 +135,10 @@

高可用标准方案

CWD +
diff --git a/src/iac_code/a2a/client.py b/src/iac_code/a2a/client.py index 32743ee2..e4d892cb 100644 --- a/src/iac_code/a2a/client.py +++ b/src/iac_code/a2a/client.py @@ -132,6 +132,7 @@ async def send_message( cwd: str, context_id: str | None = None, model: str | None = None, + iac_code_api_key: str | None = None, ) -> A2AClientResponse: payload = self._message_payload( method="SendMessage", @@ -139,6 +140,7 @@ async def send_message( cwd=cwd, context_id=context_id, model=model, + iac_code_api_key=iac_code_api_key, ) transport = self._make_transport_client(url) response = await transport.send(payload) @@ -152,6 +154,7 @@ async def stream_message( cwd: str, context_id: str | None = None, model: str | None = None, + iac_code_api_key: str | None = None, ) -> AsyncIterator[dict[str, Any]]: payload = self._message_payload( method="SendStreamingMessage", @@ -159,6 +162,7 @@ async def stream_message( cwd=cwd, context_id=context_id, model=model, + iac_code_api_key=iac_code_api_key, ) transport = self._make_transport_client(url) async for event in transport.stream(payload): @@ -354,12 +358,17 @@ def _message_payload( cwd: str, context_id: str | None, model: str | None, + iac_code_api_key: str | None, ) -> dict[str, Any]: iac_code_metadata = {"cwd": cwd} if model: stripped_model = model.strip() if stripped_model: iac_code_metadata["iac_code_model"] = stripped_model + if iac_code_api_key: + stripped_api_key = iac_code_api_key.strip() + if stripped_api_key: + iac_code_metadata["iac_code_api_key"] = stripped_api_key message: dict[str, Any] = { "messageId": str(uuid.uuid4()), "role": "ROLE_USER", diff --git a/src/iac_code/a2a/events.py b/src/iac_code/a2a/events.py index a63bcf7b..a2a90717 100644 --- a/src/iac_code/a2a/events.py +++ b/src/iac_code/a2a/events.py @@ -40,6 +40,21 @@ _METADATA_MAX_DEPTH = 32 logger = logging.getLogger(__name__) A2APermissionResolver: TypeAlias = Callable[[PermissionRequestEvent], "bool | Awaitable[bool]"] +IAC_CODE_SESSION_ID_METADATA_KEY = "iacCodeSessionId" + + +def iac_code_session_metadata(session_id: str) -> dict[str, Any]: + return {"iac_code": {IAC_CODE_SESSION_ID_METADATA_KEY: session_id}} + + +def with_iac_code_session_metadata(metadata: dict[str, Any] | None, session_id: str | None) -> dict[str, Any] | None: + if not session_id: + return metadata + merged = dict(metadata or {}) + iac_code = dict(merged.get("iac_code") or {}) + iac_code[IAC_CODE_SESSION_ID_METADATA_KEY] = session_id + merged["iac_code"] = iac_code + return merged def _truncate(value: Any, *, _depth: int = 0) -> Any: @@ -144,12 +159,14 @@ async def _enqueue_status( state: int, message: Message | None = None, metadata: dict[str, Any] | None = None, + iac_code_session_id: str | None = None, ) -> None: update = TaskStatusUpdateEvent( task_id=task_id, context_id=context_id, status=TaskStatus(state=TaskState.Name(state), message=message), ) + metadata = with_iac_code_session_metadata(metadata, iac_code_session_id) if metadata is not None: ParseDict(metadata, update.metadata) await event_queue.enqueue_event(update) @@ -165,6 +182,7 @@ async def publish_stream_event( permission_resolver: A2APermissionResolver | None = None, auto_approve_permissions: bool = False, exposure_types: Any = None, + iac_code_session_id: str | None = None, ) -> str | None: enabled_exposure_types = normalize_a2a_exposure_types(exposure_types) @@ -177,6 +195,7 @@ async def publish_stream_event( context_id=context_id, state=TaskState.TASK_STATE_WORKING, message=_agent_text_message(task_id=task_id, context_id=context_id, text=event.text), + iac_code_session_id=iac_code_session_id, ) return event.text @@ -189,6 +208,7 @@ async def publish_stream_event( context_id=context_id, state=TaskState.TASK_STATE_WORKING, metadata={"iac_code": {"thinking": {"type": "raw_thinking", "text": _truncate(event.text)}}}, + iac_code_session_id=iac_code_session_id, ) return None @@ -201,6 +221,7 @@ async def publish_stream_event( context_id=context_id, state=TaskState.TASK_STATE_WORKING, metadata={"iac_code": {"tool": {"status": "started", "toolUseId": event.tool_use_id, "name": event.name}}}, + iac_code_session_id=iac_code_session_id, ) return None @@ -221,6 +242,7 @@ async def publish_stream_event( } } }, + iac_code_session_id=iac_code_session_id, ) return None @@ -242,6 +264,7 @@ async def publish_stream_event( } } }, + iac_code_session_id=iac_code_session_id, ) return None @@ -270,6 +293,7 @@ async def publish_stream_event( context_id=context_id, state=TaskState.TASK_STATE_WORKING, metadata={"iac_code": {"tool": tool_metadata}}, + iac_code_session_id=iac_code_session_id, ) return None @@ -297,6 +321,7 @@ async def publish_stream_event( } } }, + iac_code_session_id=iac_code_session_id, ) return None @@ -315,6 +340,7 @@ async def publish_stream_event( } } }, + iac_code_session_id=iac_code_session_id, ) return None @@ -336,6 +362,7 @@ async def publish_stream_event( state=state, message=_agent_text_message(task_id=task_id, context_id=context_id, text=text), metadata={"iac_code": {"error": error_metadata}}, + iac_code_session_id=iac_code_session_id, ) return None diff --git a/src/iac_code/a2a/executor.py b/src/iac_code/a2a/executor.py index f50710c9..9b394ac2 100644 --- a/src/iac_code/a2a/executor.py +++ b/src/iac_code/a2a/executor.py @@ -1,7 +1,6 @@ from __future__ import annotations import asyncio -import contextlib import json import logging import os @@ -15,9 +14,14 @@ from a2a.server.events import EventQueue from a2a.types import Message, Role, Task, TaskState, TaskStatus, TaskStatusUpdateEvent from a2a.utils.errors import InvalidParamsError -from google.protobuf.json_format import MessageToDict +from google.protobuf.json_format import MessageToDict, ParseDict -from iac_code.a2a.events import make_text_part, publish_stream_event +from iac_code.a2a.events import ( + iac_code_session_metadata, + make_text_part, + publish_stream_event, + with_iac_code_session_metadata, +) from iac_code.a2a.exposure import normalize_a2a_exposure_types from iac_code.a2a.metrics import A2AMetrics, NoOpA2AMetrics from iac_code.a2a.parts import ( @@ -33,6 +37,12 @@ from iac_code.a2a.pipeline_paths import existing_a2a_pipeline_dir_for_session from iac_code.a2a.pipeline_snapshot import A2APipelineSnapshotStore, reduce_pipeline_events from iac_code.a2a.pipeline_stream import PipelineA2AEventPublisher +from iac_code.a2a.runtime_overrides import ( + a2a_request_context, + configure_runtime_model, + credentials_with_metadata_api_key, + refresh_runtime_cloud_tools, +) from iac_code.a2a.task_store import A2ATaskStore from iac_code.a2a.types import ( TASK_STATE_CANCELED, @@ -62,9 +72,8 @@ from iac_code.pipeline.engine.user_input import PipelineUserInput, normalize_pipeline_user_input from iac_code.services.agent_factory import AgentFactoryOptions, create_agent_runtime from iac_code.services.capabilities.multimodal import is_model_multimodal -from iac_code.services.providers.aliyun import DEFAULT_REGION, AliyunCredential, use_aliyun_credential +from iac_code.services.providers.aliyun import DEFAULT_REGION, AliyunCredential from iac_code.services.session_storage import SessionStorage -from iac_code.services.telemetry import use_session_id, use_user_id from iac_code.types.stream_events import TextDeltaEvent from iac_code.utils.file_security import atomic_write_text, ensure_private_dir, ensure_private_file from iac_code.utils.public_errors import public_exception_summary, sanitize_public_text @@ -798,6 +807,7 @@ async def publish_initial_task_if_missing() -> None: cwd = self._resolve_cwd(metadata) user_id = self._resolve_user_id(metadata) metadata_model = self._resolve_model(metadata) + metadata_api_key = self._resolve_api_key(metadata) model = metadata_model or self._model aliyun_credential = self._resolve_aliyun_credential(metadata) pipeline_mode = get_run_mode() == RunMode.PIPELINE @@ -888,6 +898,10 @@ async def publish_initial_task_if_missing() -> None: permission_resolver=self._permission_resolver, auto_approve_permissions=self._auto_approve_permissions, thinking_exposure_types=self._thinking_exposure_types, + user_id=user_id, + aliyun_credential=aliyun_credential, + model_from_metadata=metadata_model is not None, + metadata_api_key=metadata_api_key, ) await pipeline_executor.execute( context=context, @@ -918,10 +932,7 @@ def runtime_factory(session_id: str) -> Any: ) try: - aliyun_credential_ctx = ( - use_aliyun_credential(aliyun_credential) if aliyun_credential else contextlib.nullcontext() - ) - with aliyun_credential_ctx: + with a2a_request_context(user_id=user_id, aliyun_credential=aliyun_credential): ctx = await self._task_store.get_or_create_context( context_id=context_id, cwd=cwd, @@ -956,6 +967,7 @@ def runtime_factory(session_id: str) -> Any: context_id=context_id, state=TaskState.TASK_STATE_FAILED, text=_("Task is already working."), + session_id=ctx.session_id, ) self._task_store.mirror_task(task) await self._notify_terminal_task(task_id=task.task_id, context_id=task.context_id, state=task.state) @@ -973,6 +985,7 @@ def runtime_factory(session_id: str) -> Any: context_id=context_id, state=TaskState.TASK_STATE_FAILED, text=_("Task is already working."), + session_id=ctx.session_id, ) self._task_store.mirror_task(task) await self._notify_terminal_task(task_id=task.task_id, context_id=task.context_id, state=task.state) @@ -994,20 +1007,27 @@ def runtime_factory(session_id: str) -> Any: task_id=task_id, context_id=context_id, state=TaskState.TASK_STATE_SUBMITTED, + session_id=ctx.session_id, ) await self._publish_status( event_queue, task_id=task_id, context_id=context_id, state=TaskState.TASK_STATE_WORKING, + session_id=ctx.session_id, ) - user_id_ctx = use_user_id(user_id) if user_id else contextlib.nullcontext() - aliyun_credential_ctx = ( - use_aliyun_credential(aliyun_credential) if aliyun_credential else contextlib.nullcontext() - ) - with use_session_id(ctx.session_id), user_id_ctx, aliyun_credential_ctx: - self._configure_runtime_model(runtime, model, from_metadata=metadata_model is not None) - self._refresh_runtime_cloud_tools(runtime) + with a2a_request_context( + session_id=ctx.session_id, + user_id=user_id, + aliyun_credential=aliyun_credential, + ): + configure_runtime_model( + runtime, + model, + from_metadata=metadata_model is not None, + metadata_api_key=metadata_api_key, + ) + refresh_runtime_cloud_tools(runtime) cleanup_ledger = _cleanup_ledger_for_a2a_normal_chat(cwd=cwd, session_id=ctx.session_id) _prune_completed_cleanup_prompt_from_runtime(runtime, cleanup_ledger) cleanup_publisher = None @@ -1039,6 +1059,7 @@ def runtime_factory(session_id: str) -> Any: permission_resolver=self._permission_resolver, auto_approve_permissions=self._auto_approve_permissions, exposure_types=self._thinking_exposure_types, + iac_code_session_id=ctx.session_id, ) if text_chunk: task.output_text.append(text_chunk) @@ -1048,6 +1069,7 @@ def runtime_factory(session_id: str) -> Any: task_id=task_id, context_id=context_id, state=TaskState.TASK_STATE_INPUT_REQUIRED, + session_id=ctx.session_id, ) self._task_store.mirror_task(task) await self._notify_terminal_task(task_id=task.task_id, context_id=task.context_id, state=task.state) @@ -1060,6 +1082,7 @@ def runtime_factory(session_id: str) -> Any: context_id=context_id, state=TaskState.TASK_STATE_CANCELED, text=_("Task canceled."), + session_id=ctx.session_id, ) self._task_store.mirror_task(task) await self._notify_terminal_task(task_id=task.task_id, context_id=task.context_id, state=task.state) @@ -1073,6 +1096,7 @@ def runtime_factory(session_id: str) -> Any: context_id=context_id, state=TaskState.TASK_STATE_INPUT_REQUIRED, text="A temporary error occurred. Please retry.", + session_id=ctx.session_id, ) self._task_store.mirror_task(task) await self._notify_terminal_task(task_id=task.task_id, context_id=task.context_id, state=task.state) @@ -1086,6 +1110,7 @@ def runtime_factory(session_id: str) -> Any: context_id=context_id, state=TaskState.TASK_STATE_FAILED, text=self._sanitize_error(exc), + session_id=ctx.session_id, ) self._task_store.mirror_task(task) await self._notify_terminal_task(task_id=task.task_id, context_id=task.context_id, state=task.state) @@ -1184,6 +1209,19 @@ def _resolve_model(self, metadata: Any | None) -> str | None: return raw_model.strip() return None + def _resolve_api_key(self, metadata: Any | None) -> str | None: + if metadata is not None and hasattr(metadata, "DESCRIPTOR"): + metadata = MessageToDict(metadata, preserving_proto_field_name=False) + if not isinstance(metadata, Mapping): + return None + raw_iac_meta = metadata.get("iac_code") + if not isinstance(raw_iac_meta, Mapping): + return None + raw_api_key = raw_iac_meta.get("iac_code_api_key") + if isinstance(raw_api_key, str) and raw_api_key.strip(): + return raw_api_key.strip() + return None + def _resolve_aliyun_credential(self, metadata: Any | None) -> AliyunCredential | None: if metadata is not None and hasattr(metadata, "DESCRIPTOR"): metadata = MessageToDict(metadata, preserving_proto_field_name=False) @@ -1356,6 +1394,8 @@ async def _publish_status( context_id: str, state: int, text: str | None = None, + metadata: dict[str, Any] | None = None, + session_id: str | None = None, ) -> None: message = None if text: @@ -1368,7 +1408,11 @@ async def _publish_status( ) status = TaskStatus(state=TaskState.Name(state), message=message) status.timestamp.GetCurrentTime() - await event_queue.enqueue_event(TaskStatusUpdateEvent(task_id=task_id, context_id=context_id, status=status)) + update = TaskStatusUpdateEvent(task_id=task_id, context_id=context_id, status=status) + metadata = with_iac_code_session_metadata(metadata, session_id) + if metadata is not None: + ParseDict(metadata, update.metadata) + await event_queue.enqueue_event(update) async def _publish_initial_task( self, @@ -1377,48 +1421,52 @@ async def _publish_initial_task( task_id: str, context_id: str, context: RequestContext, + session_id: str | None = None, ) -> None: task = Task( id=task_id, context_id=context_id, status=TaskStatus(state=TaskState.Name(TaskState.TASK_STATE_SUBMITTED)), ) + if session_id: + ParseDict(iac_code_session_metadata(session_id), task.metadata) message = getattr(context, "message", None) if isinstance(message, Message): task.history.append(message) await event_queue.enqueue_event(task) def _refresh_runtime_cloud_tools(self, runtime: Any) -> None: - refresh_cloud_tools = getattr(runtime, "refresh_cloud_tools", None) - if callable(refresh_cloud_tools): - refresh_cloud_tools() - return - tool_registry = getattr(runtime, "tool_registry", None) - if tool_registry is None: - return - from iac_code.services.cloud_credentials import CloudCredentials - from iac_code.tools.cloud.registry import register_cloud_tools - - register_cloud_tools(tool_registry, CloudCredentials()) - - def _configure_runtime_model(self, runtime: Any, model: str, *, from_metadata: bool) -> None: - provider_manager = getattr(runtime, "provider_manager", None) - reconfigure = getattr(provider_manager, "reconfigure", None) - if not callable(reconfigure): - return - was_metadata_model = bool(getattr(runtime, "_iac_code_a2a_metadata_model_applied", False)) - if not from_metadata and not was_metadata_model: - return + refresh_runtime_cloud_tools(runtime) - from iac_code.config import load_credentials + def _configure_runtime_model( + self, + runtime: Any, + model: str, + *, + from_metadata: bool, + metadata_api_key: str | None = None, + ) -> None: + configure_runtime_model( + runtime, + model, + from_metadata=from_metadata, + metadata_api_key=metadata_api_key, + ) - provider_key_override = getattr(provider_manager, "_provider_key_override", None) - base_url_override = getattr(provider_manager, "_base_url_override", None) - credentials = getattr(provider_manager, "_credentials", None) - if not isinstance(credentials, dict) or provider_key_override is None: - credentials = load_credentials(model=model) - reconfigure(model, credentials, provider_key_override, base_url_override) - setattr(runtime, "_iac_code_a2a_metadata_model_applied", from_metadata) + def _credentials_with_metadata_api_key( + self, + *, + model: str, + credentials: dict[str, str], + provider_key_override: str | None, + metadata_api_key: str, + ) -> dict[str, str]: + return credentials_with_metadata_api_key( + model=model, + credentials=credentials, + provider_key_override=provider_key_override, + metadata_api_key=metadata_api_key, + ) async def _notify_terminal_task(self, *, task_id: str, context_id: str, state: str) -> None: if self._push_notifier is None: diff --git a/src/iac_code/a2a/pipeline_events.py b/src/iac_code/a2a/pipeline_events.py index ff011887..024969ef 100644 --- a/src/iac_code/a2a/pipeline_events.py +++ b/src/iac_code/a2a/pipeline_events.py @@ -106,6 +106,7 @@ class PipelineA2AContext: task_id: str context_id: str pipeline_name: str + iac_code_session_id: str | None = None parent_step_order: list[str] = field(default_factory=list) candidate_step_order: list[str] = field(default_factory=list) emit_stack_events: bool = False @@ -716,7 +717,7 @@ def _envelope( created_at: str | None = None, ) -> dict[str, Any]: self._sequence += 1 - return { + envelope = { "schemaVersion": PIPELINE_METADATA_SCHEMA_VERSION, "extensionUri": PIPELINE_EVENTS_EXTENSION_URI, "eventId": f"evt-{uuid.uuid4().hex}", @@ -731,6 +732,9 @@ def _envelope( "status": status, "data": data, } + if self._context.iac_code_session_id is not None: + envelope["iacCodeSessionId"] = self._context.iac_code_session_id + return envelope def _parent_step_coordinate(self, step_id: str, data: dict[str, Any] | None = None) -> dict[str, Any]: data = data or {} diff --git a/src/iac_code/a2a/pipeline_executor.py b/src/iac_code/a2a/pipeline_executor.py index eef50b42..bdb8a38a 100644 --- a/src/iac_code/a2a/pipeline_executor.py +++ b/src/iac_code/a2a/pipeline_executor.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import contextlib import inspect import logging from collections.abc import AsyncIterator, Callable @@ -22,6 +23,11 @@ ) from iac_code.a2a.pipeline_snapshot import A2APipelineSnapshotStore, reduce_pipeline_events from iac_code.a2a.pipeline_stream import PipelineA2AEventPublisher +from iac_code.a2a.runtime_overrides import ( + a2a_request_context, + configure_runtime_model, + refresh_runtime_cloud_tools, +) from iac_code.a2a.types import ( TASK_STATE_CANCELED, TASK_STATE_COMPLETED, @@ -41,8 +47,8 @@ from iac_code.pipeline.engine.session import PipelineSession from iac_code.pipeline.engine.user_input import PipelineUserInput, normalize_pipeline_user_input from iac_code.services.agent_factory import AgentFactoryOptions, create_agent_runtime +from iac_code.services.providers.aliyun import AliyunCredential from iac_code.services.session_storage import SessionStorage -from iac_code.services.telemetry import use_session_id from iac_code.types.stream_events import AskUserQuestionEvent, SubPipelineStreamEvent from iac_code.utils.public_errors import sanitize_public_text @@ -164,6 +170,10 @@ def __init__( permission_resolver: Any | None, auto_approve_permissions: bool, thinking_exposure_types: Any, + user_id: str | None = None, + aliyun_credential: AliyunCredential | None = None, + model_from_metadata: bool = False, + metadata_api_key: str | None = None, ) -> None: self._task_store = task_store self._model = model @@ -173,6 +183,10 @@ def __init__( self._permission_resolver = permission_resolver self._auto_approve_permissions = auto_approve_permissions self._thinking_exposure_types = thinking_exposure_types + self._user_id = user_id + self._aliyun_credential = aliyun_credential + self._model_from_metadata = model_from_metadata + self._metadata_api_key = metadata_api_key async def execute( self, @@ -196,11 +210,12 @@ def runtime_factory(session_id: str) -> Any: return create_agent_runtime(AgentFactoryOptions(model=self._model, session_id=session_id, cwd=cwd)) try: - ctx = await self._task_store.get_or_create_context( - context_id=context_id, - cwd=cwd, - runtime_factory=runtime_factory, - ) + with self._request_context(): + ctx = await self._task_store.get_or_create_context( + context_id=context_id, + cwd=cwd, + runtime_factory=runtime_factory, + ) except Exception as exc: await self._publish_exception_status( event_queue, @@ -217,16 +232,17 @@ def runtime_factory(session_id: str) -> Any: if ctx.active_task_id is not None: preserve_active_task = _is_active_task_record(task, ctx.active_task_id) if _is_active_task_request(task, task_id, ctx.active_task_id): - routed = await self._route_active_pipeline_interrupt( - event_queue, - task=task, - ctx=ctx, - task_id=task_id, - context_id=context_id, - cwd=cwd, - pipeline_input=pipeline_input, - preserve_task_record=preserve_active_task, - ) + with self._request_context(session_id=ctx.session_id): + routed = await self._route_active_pipeline_interrupt( + event_queue, + task=task, + ctx=ctx, + task_id=task_id, + context_id=context_id, + cwd=cwd, + pipeline_input=pipeline_input, + preserve_task_record=preserve_active_task, + ) if routed: return await self._fail_already_active( @@ -252,55 +268,68 @@ def runtime_factory(session_id: str) -> Any: pipeline = None publisher: PipelineA2AEventPublisher | None = None try: - pipeline_runtime = self._pipeline_runtime_from_context(ctx.runtime, session_id=ctx.session_id, cwd=cwd) - agent_runtime = pipeline_runtime.agent_runtime - pipeline = self._create_pipeline( - session_id=ctx.session_id, - cwd=cwd, - runtime=agent_runtime, - session_storage=session_storage, - ) - self._set_pipeline_telemetry_correlation(pipeline, task_id=task_id, context_id=context_id) - publisher = self._publisher( - event_queue=event_queue, - pipeline=pipeline, - task_id=task_id, - context_id=context_id, - session_id=ctx.session_id, - cwd=cwd, - ) - pipeline_runtime = A2APipelineRuntime( - agent_runtime=agent_runtime, - pipeline=pipeline, - publisher=publisher, - ) - ctx.runtime = pipeline_runtime - self._task_store.mirror_context(ctx) - - def fresh_pipeline_factory() -> Any: - fresh_pipeline = self._create_pipeline( + with self._request_context(session_id=ctx.session_id): + pipeline_runtime = self._pipeline_runtime_from_context( + ctx.runtime, + session_id=ctx.session_id, + cwd=cwd, + ) + agent_runtime = pipeline_runtime.agent_runtime + configure_runtime_model( + agent_runtime, + self._model, + from_metadata=self._model_from_metadata, + metadata_api_key=self._metadata_api_key, + ) + if self._aliyun_credential is not None: + refresh_runtime_cloud_tools(agent_runtime) + pipeline = self._create_pipeline( session_id=ctx.session_id, cwd=cwd, runtime=agent_runtime, session_storage=session_storage, - resume_from_sidecar=False, ) - self._set_pipeline_telemetry_correlation( - fresh_pipeline, + self._set_pipeline_telemetry_correlation(pipeline, task_id=task_id, context_id=context_id) + publisher = self._publisher( + event_queue=event_queue, + pipeline=pipeline, task_id=task_id, context_id=context_id, + session_id=ctx.session_id, + cwd=cwd, ) - return fresh_pipeline + pipeline_runtime = A2APipelineRuntime( + agent_runtime=agent_runtime, + pipeline=pipeline, + publisher=publisher, + ) + ctx.runtime = pipeline_runtime + self._task_store.mirror_context(ctx) - selected = self._select_stream( - pipeline, - prompt, - pipeline_input=pipeline_input, - publisher=publisher, - task_id=task_id, - context_id=context_id, - fresh_pipeline_factory=fresh_pipeline_factory, - ) + def fresh_pipeline_factory() -> Any: + fresh_pipeline = self._create_pipeline( + session_id=ctx.session_id, + cwd=cwd, + runtime=agent_runtime, + session_storage=session_storage, + resume_from_sidecar=False, + ) + self._set_pipeline_telemetry_correlation( + fresh_pipeline, + task_id=task_id, + context_id=context_id, + ) + return fresh_pipeline + + selected = self._select_stream( + pipeline, + prompt, + pipeline_input=pipeline_input, + publisher=publisher, + task_id=task_id, + context_id=context_id, + fresh_pipeline_factory=fresh_pipeline_factory, + ) if selected.pipeline is not pipeline: pipeline = selected.pipeline pipeline_runtime.pipeline = pipeline @@ -313,7 +342,7 @@ def fresh_pipeline_factory() -> Any: self._task_store.mirror_task(task) self._task_store.mirror_context(ctx) stream_had_events = False - with use_session_id(ctx.session_id): + with self._request_context(session_id=ctx.session_id): while True: stream_result = await self._consume_stream_until_restart( stream=stream, @@ -413,6 +442,13 @@ def fresh_pipeline_factory() -> Any: finally: lock.release() + def _request_context(self, *, session_id: str | None = None) -> contextlib.AbstractContextManager[None]: + return a2a_request_context( + session_id=session_id, + user_id=self._user_id, + aliyun_credential=self._aliyun_credential, + ) + def _pipeline_runtime_from_context(self, runtime: Any, *, session_id: str, cwd: str) -> A2APipelineRuntime: if isinstance(runtime, A2APipelineRuntime): return runtime @@ -622,7 +658,7 @@ async def _continue_active_pause_confirmation( stream = pipeline.continue_from_sidecar() task.state = TASK_STATE_WORKING self._task_store.mirror_task(task) - with use_session_id(session_id): + with self._request_context(session_id=session_id): while True: stream_result = await self._consume_stream_until_restart( stream=stream, @@ -800,6 +836,7 @@ def _publisher( task_id=task_id, context_id=context_id, pipeline_name=getattr(pipeline, "pipeline_name", get_pipeline_name()), + iac_code_session_id=session_id, parent_step_order=_pipeline_parent_step_order(pipeline), candidate_step_order=_pipeline_candidate_step_order(pipeline), emit_stack_events=bool(getattr(pipeline, "emit_stack_events", False)), @@ -1520,6 +1557,7 @@ def cancel_waiting_input_task_from_sidecar( task_id=task_id, context_id=context_id, pipeline_name=pipeline_name, + iac_code_session_id=session_id, ) translator = PipelineEventTranslator(context) translator.hydrate_from_events(events) diff --git a/src/iac_code/a2a/runtime_overrides.py b/src/iac_code/a2a/runtime_overrides.py new file mode 100644 index 00000000..983d86da --- /dev/null +++ b/src/iac_code/a2a/runtime_overrides.py @@ -0,0 +1,107 @@ +from __future__ import annotations + +import contextlib +from collections.abc import Iterator +from typing import Any + +from iac_code.services.providers.aliyun import AliyunCredential, use_aliyun_credential +from iac_code.services.telemetry import use_session_id, use_user_id + + +@contextlib.contextmanager +def a2a_request_context( + *, + session_id: str | None = None, + user_id: str | None = None, + aliyun_credential: AliyunCredential | None = None, +) -> Iterator[None]: + with contextlib.ExitStack() as stack: + if session_id: + stack.enter_context(use_session_id(session_id)) + if user_id: + stack.enter_context(use_user_id(user_id)) + if aliyun_credential is not None: + stack.enter_context(use_aliyun_credential(aliyun_credential)) + yield + + +def refresh_runtime_cloud_tools(runtime: Any) -> None: + refresh_cloud_tools = getattr(runtime, "refresh_cloud_tools", None) + if callable(refresh_cloud_tools): + refresh_cloud_tools() + return + tool_registry = getattr(runtime, "tool_registry", None) + if tool_registry is None: + return + + from iac_code.services.cloud_credentials import CloudCredentials + from iac_code.tools.cloud.registry import register_cloud_tools + + register_cloud_tools(tool_registry, CloudCredentials()) + + +def configure_runtime_model( + runtime: Any, + model: str, + *, + from_metadata: bool, + metadata_api_key: str | None = None, +) -> None: + provider_manager = getattr(runtime, "provider_manager", None) + reconfigure = getattr(provider_manager, "reconfigure", None) + if not callable(reconfigure): + return + was_metadata_model = bool(getattr(runtime, "_iac_code_a2a_metadata_model_applied", False)) + has_metadata_api_key = metadata_api_key is not None + was_metadata_api_key = bool(getattr(runtime, "_iac_code_a2a_metadata_api_key_applied", False)) + if not from_metadata and not was_metadata_model and not has_metadata_api_key and not was_metadata_api_key: + return + + from iac_code.config import load_credentials + + provider_key_override = getattr(provider_manager, "_provider_key_override", None) + base_url_override = getattr(provider_manager, "_base_url_override", None) + credentials = getattr(provider_manager, "_credentials", None) + if ( + not isinstance(credentials, dict) + or provider_key_override is None + or has_metadata_api_key + or was_metadata_api_key + ): + credentials = load_credentials(model=model) + if metadata_api_key is not None: + credentials = credentials_with_metadata_api_key( + model=model, + credentials=credentials, + provider_key_override=provider_key_override, + metadata_api_key=metadata_api_key, + ) + reconfigure(model, credentials, provider_key_override, base_url_override) + setattr(runtime, "_iac_code_a2a_metadata_model_applied", from_metadata) + setattr(runtime, "_iac_code_a2a_metadata_api_key_applied", has_metadata_api_key) + + +def credentials_with_metadata_api_key( + *, + model: str, + credentials: dict[str, str], + provider_key_override: str | None, + metadata_api_key: str, +) -> dict[str, str]: + provider_key = provider_key_override + if provider_key is None: + try: + from iac_code.providers.manager import _detect_provider_name + + provider_key = _detect_provider_name(model) + except ValueError: + return credentials + + from iac_code.config import _KEY_NAME_TO_CRED_SLOT + + slot = _KEY_NAME_TO_CRED_SLOT.get(provider_key) + if not slot: + return credentials + updated = dict(credentials) + updated[slot] = metadata_api_key + return updated diff --git a/src/iac_code/a2a/task_store.py b/src/iac_code/a2a/task_store.py index 5c0227aa..43dfad3d 100644 --- a/src/iac_code/a2a/task_store.py +++ b/src/iac_code/a2a/task_store.py @@ -14,7 +14,9 @@ from a2a.server.tasks.inmemory_task_store import resolve_user_scope as default_owner_resolver from a2a.types import ListTasksRequest, ListTasksResponse, Message, Part, Role, Task, TaskState, TaskStatus from a2a.utils.errors import InvalidParamsError +from google.protobuf.json_format import MessageToDict, ParseDict +from iac_code.a2a.events import with_iac_code_session_metadata from iac_code.a2a.metrics import A2AMetrics, NoOpA2AMetrics from iac_code.a2a.persistence import A2AContextSnapshot, A2APersistenceStore, A2ATaskSnapshot from iac_code.a2a.types import ( @@ -74,6 +76,7 @@ async def save(self, task: Task, context: ServerCallContext | None = None) -> No owner = self._owner(context) task_id = validate_protocol_id(task.id) async with self._mutation_lock: + self._attach_context_metadata(task) owner_tasks = self._sdk_tasks.setdefault(owner, {}) previous = owner_tasks.get(task_id) if previous is not None: @@ -98,6 +101,15 @@ async def save(self, task: Task, context: ServerCallContext | None = None) -> No record.touch() self._mirror_task(record) + def _attach_context_metadata(self, task: Task) -> None: + context = self._contexts.get(task.context_id) + if context is None: + return + metadata = MessageToDict(task.metadata, preserving_proto_field_name=False) if task.metadata.fields else {} + metadata = with_iac_code_session_metadata(metadata, context.session_id) + if metadata is not None: + ParseDict(metadata, task.metadata) + async def delete(self, task_id: str, context: ServerCallContext | None = None) -> None: owner = self._owner(context) task_id = validate_protocol_id(task_id) diff --git a/src/iac_code/cli/main.py b/src/iac_code/cli/main.py index 3a60839e..cb9d34c7 100644 --- a/src/iac_code/cli/main.py +++ b/src/iac_code/cli/main.py @@ -841,6 +841,11 @@ def a2a_call( "--iac-code-model", help=_("Model metadata to send with this A2A request"), ), + iac_code_api_key: str = typer.Option( + "", + "--iac-code-api-key", + help=_("API key metadata to send with this A2A request"), + ), token: str = typer.Option("", "--token", help=_("Bearer token for A2A HTTP requests")), basic_username: str = typer.Option("", "--basic-username", help=_("Basic auth username for A2A HTTP requests")), basic_password: str = typer.Option("", "--basic-password", help=_("Basic auth password for A2A HTTP requests")), @@ -877,6 +882,7 @@ def a2a_call( cwd = _current_logical_cwd() context_id = _a2a_config_value(ctx, config, "context_id", context_id) iac_code_model = _a2a_config_value(ctx, config, "iac_code_model", iac_code_model) + iac_code_api_key = _a2a_config_value(ctx, config, "iac_code_api_key", iac_code_api_key) timeout = _a2a_config_value(ctx, config, "timeout", timeout) stream = _a2a_config_value(ctx, config, "stream", stream) auth_options = _a2a_client_auth_options( @@ -914,6 +920,7 @@ def a2a_call( cwd=cwd, context_id=context_id or None, model=iac_code_model or None, + iac_code_api_key=iac_code_api_key or None, token=auth_options["token"] or None, basic_username=auth_options["basic_username"] or None, basic_password=auth_options["basic_password"] or None, @@ -1578,6 +1585,7 @@ async def _run_a2a_call( cwd: str, context_id: str | None, model: str | None, + iac_code_api_key: str | None, token: str | None, basic_username: str | None, basic_password: str | None, @@ -1616,6 +1624,7 @@ async def _run_a2a_call( cwd=str(Path(cwd)), context_id=context_id, model=model, + iac_code_api_key=iac_code_api_key, ): line = _format_a2a_stream_event(event) if stream_callback is not None: @@ -1629,6 +1638,7 @@ async def _run_a2a_call( cwd=str(Path(cwd)), context_id=context_id, model=model, + iac_code_api_key=iac_code_api_key, ) return response.text or json.dumps(response.payload, ensure_ascii=False, indent=2, sort_keys=True) finally: diff --git a/src/iac_code/i18n/locales/de/LC_MESSAGES/messages.po b/src/iac_code/i18n/locales/de/LC_MESSAGES/messages.po index 3b0d4e7f..fc375829 100644 --- a/src/iac_code/i18n/locales/de/LC_MESSAGES/messages.po +++ b/src/iac_code/i18n/locales/de/LC_MESSAGES/messages.po @@ -740,6 +740,10 @@ msgstr "Fortzusetzende A2A-Kontext-ID" msgid "Model metadata to send with this A2A request" msgstr "Modell-Metadaten, die mit dieser A2A-Anfrage gesendet werden" +#: src/iac_code/cli/main.py +msgid "API key metadata to send with this A2A request" +msgstr "API-Schlüssel-Metadaten, die mit dieser A2A-Anfrage gesendet werden" + #: src/iac_code/cli/main.py msgid "Bearer token for A2A HTTP requests" msgstr "Bearer-Token für A2A-HTTP-Anfragen" diff --git a/src/iac_code/i18n/locales/es/LC_MESSAGES/messages.po b/src/iac_code/i18n/locales/es/LC_MESSAGES/messages.po index 328ccf8c..bd4cb7d0 100644 --- a/src/iac_code/i18n/locales/es/LC_MESSAGES/messages.po +++ b/src/iac_code/i18n/locales/es/LC_MESSAGES/messages.po @@ -739,6 +739,10 @@ msgstr "ID de contexto A2A a continuar" msgid "Model metadata to send with this A2A request" msgstr "Metadatos del modelo que se enviarán con esta solicitud A2A" +#: src/iac_code/cli/main.py +msgid "API key metadata to send with this A2A request" +msgstr "Metadatos de clave API que se enviarán con esta solicitud A2A" + #: src/iac_code/cli/main.py msgid "Bearer token for A2A HTTP requests" msgstr "Token Bearer para solicitudes HTTP A2A" diff --git a/src/iac_code/i18n/locales/fr/LC_MESSAGES/messages.po b/src/iac_code/i18n/locales/fr/LC_MESSAGES/messages.po index 339e9b9b..ea06083f 100644 --- a/src/iac_code/i18n/locales/fr/LC_MESSAGES/messages.po +++ b/src/iac_code/i18n/locales/fr/LC_MESSAGES/messages.po @@ -738,6 +738,10 @@ msgstr "ID de contexte A2A à poursuivre" msgid "Model metadata to send with this A2A request" msgstr "Métadonnées du modèle à envoyer avec cette requête A2A" +#: src/iac_code/cli/main.py +msgid "API key metadata to send with this A2A request" +msgstr "Métadonnées de clé API à envoyer avec cette requête A2A" + #: src/iac_code/cli/main.py msgid "Bearer token for A2A HTTP requests" msgstr "Jeton Bearer pour les requêtes HTTP A2A" diff --git a/src/iac_code/i18n/locales/ja/LC_MESSAGES/messages.po b/src/iac_code/i18n/locales/ja/LC_MESSAGES/messages.po index bf22dd73..d4f6da6a 100644 --- a/src/iac_code/i18n/locales/ja/LC_MESSAGES/messages.po +++ b/src/iac_code/i18n/locales/ja/LC_MESSAGES/messages.po @@ -694,6 +694,10 @@ msgstr "継続する A2A コンテキスト ID" msgid "Model metadata to send with this A2A request" msgstr "この A2A リクエストで送信するモデルメタデータ" +#: src/iac_code/cli/main.py +msgid "API key metadata to send with this A2A request" +msgstr "この A2A リクエストで送信する API キーメタデータ" + #: src/iac_code/cli/main.py msgid "Bearer token for A2A HTTP requests" msgstr "A2A HTTP リクエスト用の Bearer トークン" diff --git a/src/iac_code/i18n/locales/pt/LC_MESSAGES/messages.po b/src/iac_code/i18n/locales/pt/LC_MESSAGES/messages.po index 55b40178..654c9e36 100644 --- a/src/iac_code/i18n/locales/pt/LC_MESSAGES/messages.po +++ b/src/iac_code/i18n/locales/pt/LC_MESSAGES/messages.po @@ -732,6 +732,10 @@ msgstr "ID de contexto A2A para continuar" msgid "Model metadata to send with this A2A request" msgstr "Metadados do modelo a enviar com esta solicitação A2A" +#: src/iac_code/cli/main.py +msgid "API key metadata to send with this A2A request" +msgstr "Metadados da chave de API a enviar com esta solicitação A2A" + #: src/iac_code/cli/main.py msgid "Bearer token for A2A HTTP requests" msgstr "Token Bearer para requisições HTTP A2A" diff --git a/src/iac_code/i18n/locales/zh/LC_MESSAGES/messages.po b/src/iac_code/i18n/locales/zh/LC_MESSAGES/messages.po index a9b40763..cea9f9e7 100644 --- a/src/iac_code/i18n/locales/zh/LC_MESSAGES/messages.po +++ b/src/iac_code/i18n/locales/zh/LC_MESSAGES/messages.po @@ -684,6 +684,10 @@ msgstr "要继续的 A2A 上下文 ID" msgid "Model metadata to send with this A2A request" msgstr "随本次 A2A 请求发送的模型元数据" +#: src/iac_code/cli/main.py +msgid "API key metadata to send with this A2A request" +msgstr "随本次 A2A 请求发送的 API key 元数据" + #: src/iac_code/cli/main.py msgid "Bearer token for A2A HTTP requests" msgstr "用于 A2A HTTP 请求的 Bearer 令牌" diff --git a/tests/a2a/test_client.py b/tests/a2a/test_client.py index 0382342b..00e5f3f8 100644 --- a/tests/a2a/test_client.py +++ b/tests/a2a/test_client.py @@ -161,6 +161,35 @@ async def test_message_payload_includes_metadata_iac_code_model() -> None: } +@pytest.mark.asyncio +async def test_message_payload_includes_metadata_iac_code_api_key() -> None: + http = FakeHTTPClient() + client = A2AClient(http_client=http) + + await client.send_message("http://remote/", "hello", cwd="/tmp/work", iac_code_api_key="metadata-key") + events = [ + event + async for event in client.stream_message( + "http://remote/", + "hello", + cwd="/tmp/work", + iac_code_api_key="stream-key", + ) + ] + + assert events + send_payload = http.requests[-2][2] + stream_payload = http.requests[-1][2] + assert send_payload["params"]["message"]["metadata"]["iac_code"] == { + "cwd": "/tmp/work", + "iac_code_api_key": "metadata-key", + } + assert stream_payload["params"]["message"]["metadata"]["iac_code"] == { + "cwd": "/tmp/work", + "iac_code_api_key": "stream-key", + } + + def test_response_text_extracts_from_task_history_agent_message() -> None: response = A2AClientResponse( payload={ diff --git a/tests/a2a/test_executor.py b/tests/a2a/test_executor.py index 4ca31db0..4df909f7 100644 --- a/tests/a2a/test_executor.py +++ b/tests/a2a/test_executor.py @@ -63,6 +63,30 @@ async def test_executor_runs_prompt_and_finishes_input_required( assert "".join(record.output_text) == "hi" +@pytest.mark.asyncio +async def test_executor_exposes_iac_code_session_id_in_status_metadata( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + def factory(options): + return FakeRuntime(agent_loop=FakeAgentLoop([TextDeltaEvent(text="hi")]), session_id=options.session_id) + + monkeypatch.setattr("iac_code.a2a.executor.create_agent_runtime", factory) + + store = A2ATaskStore(metrics=NoOpA2AMetrics()) + executor = IacCodeA2AExecutor(task_store=store, model="qwen3.6-plus") + queue = FakeEventQueue() + + await executor.execute( + FakeRequestContext(task_id="task-1", context_id="ctx-1", metadata={"iac_code": {"cwd": str(tmp_path)}}), + queue, + ) + + session_id = store._contexts["ctx-1"].session_id + status_updates = [dump(event) for event in queue.events if isinstance(event, TaskStatusUpdateEvent)] + assert status_updates + assert all(event["metadata"]["iac_code"]["iacCodeSessionId"] == session_id for event in status_updates) + + @pytest.mark.asyncio async def test_executor_passes_artifact_store_to_stream_event_publisher( monkeypatch: pytest.MonkeyPatch, tmp_path: Path @@ -82,6 +106,7 @@ async def spy_publish_stream_event( permission_resolver=None, auto_approve_permissions=False, exposure_types=None, + iac_code_session_id=None, ): seen_artifact_stores.append(artifact_store) seen_auto_approve_permissions.append(auto_approve_permissions) @@ -417,8 +442,33 @@ async def execute(self, *, context, event_queue, task, task_id, context_id, cwd, store = A2ATaskStore(metrics=NoOpA2AMetrics()) executor = IacCodeA2AExecutor(task_store=store, model="qwen3.6-plus") - await executor.execute(FakeRequestContext(metadata={"iac_code": {"cwd": str(tmp_path)}}), FakeEventQueue()) + await executor.execute( + FakeRequestContext( + metadata={ + "iac_code": { + "cwd": str(tmp_path), + "user_id": "client-user", + "iac_code_model": "metadata-model", + "iac_code_api_key": "metadata-api-key", + "alibaba_cloud_access_key_id": "client-id", + "alibaba_cloud_access_key_secret": "client-secret", + "alibaba_cloud_region_id": "cn-beijing", + "alibaba_cloud_security_token": "client-sts", + } + } + ), + FakeEventQueue(), + ) + init_kwargs = calls[0][1] + assert init_kwargs["model"] == "metadata-model" + assert init_kwargs["user_id"] == "client-user" + assert init_kwargs["model_from_metadata"] is True + assert init_kwargs["metadata_api_key"] == "metadata-api-key" + assert init_kwargs["aliyun_credential"].access_key_id == "client-id" + assert init_kwargs["aliyun_credential"].access_key_secret == "client-secret" + assert init_kwargs["aliyun_credential"].region_id == "cn-beijing" + assert init_kwargs["aliyun_credential"].sts_token == "client-sts" assert calls[-1] == ( "execute", { @@ -1713,6 +1763,47 @@ def reconfigure(self, model, credentials, provider_key_override=None, base_url_o assert provider_manager.calls == ["metadata-model", "server-default-model"] +@pytest.mark.asyncio +async def test_executor_reconfigures_cached_runtime_iac_code_api_key_per_call( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + class FakeProviderManager: + def __init__(self) -> None: + self.calls: list[tuple[str, dict[str, str]]] = [] + + def reconfigure(self, model, credentials, provider_key_override=None, base_url_override=None): + self.calls.append((model, dict(credentials))) + + provider_manager = FakeProviderManager() + runtime = FakeRuntime( + agent_loop=FakeAgentLoop([TextDeltaEvent(text="ok")]), + session_id="session-1", + provider_manager=provider_manager, + ) + monkeypatch.setattr("iac_code.a2a.executor.create_agent_runtime", lambda options: runtime) + monkeypatch.setattr("iac_code.config.load_credentials", lambda model=None: {"dashscope": "fallback-key"}) + + store = A2ATaskStore(metrics=NoOpA2AMetrics()) + executor = IacCodeA2AExecutor(task_store=store, model="qwen3.6-plus") + + await executor.execute( + FakeRequestContext( + context_id="ctx-1", + metadata={"iac_code": {"cwd": str(tmp_path), "iac_code_api_key": "metadata-key"}}, + ), + FakeEventQueue(), + ) + await executor.execute( + FakeRequestContext(context_id="ctx-1", task_id="task-2", metadata={"iac_code": {"cwd": str(tmp_path)}}), + FakeEventQueue(), + ) + + assert provider_manager.calls == [ + ("qwen3.6-plus", {"dashscope": "metadata-key"}), + ("qwen3.6-plus", {"dashscope": "fallback-key"}), + ] + + @pytest.mark.asyncio async def test_executor_applies_aliyun_metadata_to_task_credentials( monkeypatch: pytest.MonkeyPatch, tmp_path: Path diff --git a/tests/a2a/test_pipeline_debugger_script.py b/tests/a2a/test_pipeline_debugger_script.py index 94e60518..c2e57d9d 100644 --- a/tests/a2a/test_pipeline_debugger_script.py +++ b/tests/a2a/test_pipeline_debugger_script.py @@ -237,6 +237,27 @@ def test_build_message_stream_payload_uses_a2a_v1_method_and_cwd_metadata() -> N assert payload["params"]["configuration"] == {"acceptedOutputModes": ["text/plain"]} +def test_build_message_stream_payload_includes_iac_code_model_metadata() -> None: + debugger = load_debugger_module() + + payload = debugger.build_message_stream_payload( + cwd="/workspace/demo", + prompt="帮我生成售卖 pipeline 方案", + context_id="ctx-demo", + task_id="task-demo", + request_id="req-1", + message_id="msg-1", + iac_code_model=" kimi-k2.7-code ", + ) + + assert payload["params"]["message"]["metadata"] == { + "iac_code": { + "cwd": "/workspace/demo", + "iac_code_model": "kimi-k2.7-code", + } + } + + def test_build_message_stream_payload_adds_image_data_parts() -> None: debugger = load_debugger_module() diff --git a/tests/a2a/test_pipeline_events.py b/tests/a2a/test_pipeline_events.py index 306b64ef..e18058b1 100644 --- a/tests/a2a/test_pipeline_events.py +++ b/tests/a2a/test_pipeline_events.py @@ -113,6 +113,28 @@ def test_manual_cleanup_event_normalizes_cleanup_data_keys() -> None: assert event["data"]["lastError"] == "DELETE_FAILED" +def test_pipeline_envelope_exposes_iac_code_session_id() -> None: + context = PipelineA2AContext( + pipeline_run_id="ctx-1", + task_id="task-1", + context_id="ctx-1", + pipeline_name="selling", + iac_code_session_id="session-1", + ) + translator = PipelineEventTranslator(context) + + [envelope] = translator.translate( + PipelineEvent( + type=PipelineEventType.PIPELINE_STARTED, + step_id=None, + timestamp=1717821600.0, + data={}, + ) + ) + + assert envelope["iacCodeSessionId"] == "session-1" + + def test_parent_step_attempt_increments_after_rollback() -> None: translator = PipelineEventTranslator(_ctx()) translator.translate( diff --git a/tests/a2a/test_pipeline_executor.py b/tests/a2a/test_pipeline_executor.py index a7f90725..7b8d6d04 100644 --- a/tests/a2a/test_pipeline_executor.py +++ b/tests/a2a/test_pipeline_executor.py @@ -166,6 +166,214 @@ def _status_events(queue: FakeEventQueue) -> list[dict]: return [dump(event) for event in queue.events if isinstance(event, TaskStatusUpdateEvent)] +@pytest.mark.asyncio +async def test_pipeline_executor_applies_aliyun_metadata_while_creating_runtime( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + from iac_code.a2a.pipeline_executor import IacCodeA2APipelineExecutor + from iac_code.services.providers.aliyun import AliyunCredential, AliyunCredentials + + captured_credentials: list[tuple[str, str | None, str | None]] = [] + + def runtime_factory(options): + credential = AliyunCredentials.load() + captured_credentials.append( + ( + options.model, + credential.access_key_id if credential else None, + credential.region_id if credential else None, + ) + ) + return _fake_runtime() + + fake_pipeline = FakePipeline( + [PipelineEvent(type=PipelineEventType.PIPELINE_COMPLETED, step_id=None, timestamp=1717821601.0, data={})], + session_dir=tmp_path / "sidecar", + ) + monkeypatch.setenv("ALIBABA_CLOUD_ACCESS_KEY_ID", "env-id") + monkeypatch.setenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET", "env-secret") + monkeypatch.setenv("ALIBABA_CLOUD_REGION_ID", "cn-shanghai") + monkeypatch.setattr("iac_code.a2a.pipeline_executor.create_agent_runtime", runtime_factory) + monkeypatch.setattr("iac_code.a2a.pipeline_executor.create_pipeline", lambda *args, **kwargs: fake_pipeline) + monkeypatch.setattr("iac_code.tools.cloud.registry.register_cloud_tools", lambda *args, **kwargs: None) + monkeypatch.setattr("iac_code.services.providers.aliyun.AliyunCredentials._load_from_iac_code_config", lambda: None) + + store = A2ATaskStore(metrics=NoOpA2AMetrics()) + executor = IacCodeA2APipelineExecutor( + task_store=store, + model="qwen3.6-plus", + metrics=NoOpA2AMetrics(), + artifact_store=None, + push_notifier=None, + permission_resolver=None, + auto_approve_permissions=False, + thinking_exposure_types=None, + aliyun_credential=AliyunCredential( + access_key_id="client-id", + access_key_secret="client-secret", + region_id="cn-beijing", + ), + ) + + await executor.execute( + context=FakeRequestContext(metadata={"iac_code": {"cwd": str(tmp_path)}}), + event_queue=FakeEventQueue(), + task=await store.get_or_create_task(task_id="task-1", context_id="ctx-1"), + task_id="task-1", + context_id="ctx-1", + cwd=str(tmp_path), + prompt="部署网站", + ) + + assert captured_credentials == [("qwen3.6-plus", "client-id", "cn-beijing")] + assert AliyunCredentials.load().access_key_id == "env-id" + + +@pytest.mark.asyncio +async def test_pipeline_executor_refreshes_cloud_tools_with_aliyun_metadata_for_reused_context( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + from iac_code.a2a.pipeline_executor import IacCodeA2APipelineExecutor + from iac_code.services.providers.aliyun import AliyunCredential, AliyunCredentials + + seen_access_key_ids: list[str | None] = [] + runtime = _fake_runtime() + + def fake_register_cloud_tools(registry, credentials): + assert registry is runtime.tool_registry + credential = credentials.get_provider("aliyun") + seen_access_key_ids.append(credential.access_key_id if credential else None) + + fake_pipeline = FakePipeline( + [PipelineEvent(type=PipelineEventType.PIPELINE_COMPLETED, step_id=None, timestamp=1717821601.0, data={})], + session_dir=tmp_path / "sidecar", + ) + monkeypatch.setenv("ALIBABA_CLOUD_ACCESS_KEY_ID", "env-id") + monkeypatch.setenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET", "env-secret") + monkeypatch.setenv("ALIBABA_CLOUD_REGION_ID", "cn-shanghai") + monkeypatch.setattr("iac_code.tools.cloud.registry.register_cloud_tools", fake_register_cloud_tools) + monkeypatch.setattr("iac_code.a2a.pipeline_executor.create_pipeline", lambda *args, **kwargs: fake_pipeline) + monkeypatch.setattr("iac_code.services.providers.aliyun.AliyunCredentials._load_from_iac_code_config", lambda: None) + + store = A2ATaskStore(metrics=NoOpA2AMetrics()) + await store.get_or_create_context( + context_id="ctx-1", + cwd=str(tmp_path), + runtime_factory=lambda _session_id: runtime, + ) + executor = IacCodeA2APipelineExecutor( + task_store=store, + model="qwen3.6-plus", + metrics=NoOpA2AMetrics(), + artifact_store=None, + push_notifier=None, + permission_resolver=None, + auto_approve_permissions=False, + thinking_exposure_types=None, + aliyun_credential=AliyunCredential( + access_key_id="client-id", + access_key_secret="client-secret", + region_id="cn-beijing", + ), + ) + + await executor.execute( + context=FakeRequestContext(context_id="ctx-1", metadata={"iac_code": {"cwd": str(tmp_path)}}), + event_queue=FakeEventQueue(), + task=await store.get_or_create_task(task_id="task-1", context_id="ctx-1"), + task_id="task-1", + context_id="ctx-1", + cwd=str(tmp_path), + prompt="部署网站", + ) + + assert seen_access_key_ids == ["client-id"] + assert AliyunCredentials.load().access_key_id == "env-id" + + +@pytest.mark.asyncio +async def test_pipeline_executor_reconfigures_cached_runtime_model_and_api_key_per_call( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + from iac_code.a2a.pipeline_executor import IacCodeA2APipelineExecutor + + class FakeProviderManager: + def __init__(self) -> None: + self.calls: list[tuple[str, dict[str, str]]] = [] + + def reconfigure(self, model, credentials, provider_key_override=None, base_url_override=None): + self.calls.append((model, dict(credentials))) + + provider_manager = FakeProviderManager() + runtime = SimpleNamespace(provider_manager=provider_manager, tool_registry=object()) + created_pipeline_count = 0 + + def fake_create_pipeline(*args, **kwargs): + nonlocal created_pipeline_count + created_pipeline_count += 1 + return FakePipeline( + [PipelineEvent(type=PipelineEventType.PIPELINE_COMPLETED, step_id=None, timestamp=1717821601.0, data={})], + session_dir=tmp_path / f"sidecar-{created_pipeline_count}", + ) + + monkeypatch.setattr("iac_code.a2a.pipeline_executor.create_agent_runtime", lambda options: runtime) + monkeypatch.setattr("iac_code.a2a.pipeline_executor.create_pipeline", fake_create_pipeline) + monkeypatch.setattr("iac_code.config.load_credentials", lambda model=None: {"dashscope": "fallback-key"}) + + store = A2ATaskStore(metrics=NoOpA2AMetrics()) + task_one = await store.get_or_create_task(task_id="task-1", context_id="ctx-1") + metadata_executor = IacCodeA2APipelineExecutor( + task_store=store, + model="qwen3.6-max", + metrics=NoOpA2AMetrics(), + artifact_store=None, + push_notifier=None, + permission_resolver=None, + auto_approve_permissions=False, + thinking_exposure_types=None, + model_from_metadata=True, + metadata_api_key="metadata-key", + ) + await metadata_executor.execute( + context=FakeRequestContext(context_id="ctx-1", metadata={"iac_code": {"cwd": str(tmp_path)}}), + event_queue=FakeEventQueue(), + task=task_one, + task_id="task-1", + context_id="ctx-1", + cwd=str(tmp_path), + prompt="部署网站", + ) + + task_two = await store.get_or_create_task(task_id="task-2", context_id="ctx-1") + default_executor = IacCodeA2APipelineExecutor( + task_store=store, + model="qwen3.6-plus", + metrics=NoOpA2AMetrics(), + artifact_store=None, + push_notifier=None, + permission_resolver=None, + auto_approve_permissions=False, + thinking_exposure_types=None, + ) + await default_executor.execute( + context=FakeRequestContext(context_id="ctx-1", task_id="task-2", metadata={"iac_code": {"cwd": str(tmp_path)}}), + event_queue=FakeEventQueue(), + task=task_two, + task_id="task-2", + context_id="ctx-1", + cwd=str(tmp_path), + prompt="继续", + ) + + assert provider_manager.calls == [ + ("qwen3.6-max", {"dashscope": "metadata-key"}), + ("qwen3.6-plus", {"dashscope": "fallback-key"}), + ] + + async def _wait_for_output_text(task, expected: str) -> None: for _ in range(100): if "".join(task.output_text) == expected: @@ -2319,6 +2527,9 @@ async def test_pipeline_executor_routes_second_prompt_as_interrupt(tmp_path: Pat from iac_code.a2a.pipeline_journal import A2APipelineJournal from iac_code.a2a.pipeline_snapshot import A2APipelineSnapshotStore from iac_code.a2a.pipeline_stream import PipelineA2AEventPublisher + from iac_code.services.providers.aliyun import AliyunCredential, AliyunCredentials + + captured_interrupt_credentials: list[str | None] = [] class InterruptiblePipeline(FakePipeline): def __init__(self, *, session_dir: Path) -> None: @@ -2326,6 +2537,8 @@ def __init__(self, *, session_dir: Path) -> None: self.interrupts: list[str] = [] async def handle_user_interrupt(self, message: str) -> SimpleNamespace: + credential = AliyunCredentials.load() + captured_interrupt_credentials.append(credential.access_key_id if credential else None) self.interrupts.append(message) return SimpleNamespace( action="supplement", @@ -2370,6 +2583,11 @@ async def handle_user_interrupt(self, message: str) -> SimpleNamespace: permission_resolver=None, auto_approve_permissions=False, thinking_exposure_types=None, + aliyun_credential=AliyunCredential( + access_key_id="client-id", + access_key_secret="client-secret", + region_id="cn-beijing", + ), ) await executor.execute( @@ -2386,6 +2604,7 @@ async def handle_user_interrupt(self, message: str) -> SimpleNamespace: ) assert pipeline.interrupts == ["please change cpu"] + assert captured_interrupt_credentials == ["client-id"] event_types = [event["eventType"] for event in publisher.journal.read_all()] assert event_types == ["interrupt_received", "interrupt_classified"] @@ -3421,7 +3640,10 @@ async def test_executor_routes_running_sidecar_pending_ask_to_ask_resume( monkeypatch: pytest.MonkeyPatch, tmp_path: Path, ) -> None: + from iac_code.services.providers.aliyun import AliyunCredentials + monkeypatch.setenv("IAC_CODE_MODE", "pipeline") + captured_resume_credentials: list[str | None] = [] class AskResumePipeline(FakePipeline): def __init__(self, *, session_dir: Path) -> None: @@ -3447,6 +3669,8 @@ async def resume_ask_user_question( tool_use_id: str, pending_input: dict[str, object] | None = None, ): + credential = AliyunCredentials.load() + captured_resume_credentials.append(credential.access_key_id if credential else None) self.ask_answers.append(answer) self.pending_inputs.append(pending_input) assert tool_use_id == "ask-1" @@ -3489,6 +3713,7 @@ async def resume_ask_user_question( fake_pipeline.sidecar_status = "running" monkeypatch.setattr("iac_code.a2a.pipeline_executor.create_pipeline", lambda *args, **kwargs: fake_pipeline) monkeypatch.setattr("iac_code.a2a.pipeline_executor.create_agent_runtime", lambda options: _fake_runtime()) + monkeypatch.setattr("iac_code.tools.cloud.registry.register_cloud_tools", lambda *args, **kwargs: None) store = A2ATaskStore(metrics=NoOpA2AMetrics()) executor = IacCodeA2AExecutor(task_store=store, model="qwen3.6-plus") @@ -3497,13 +3722,21 @@ async def resume_ask_user_question( await executor.execute( FakeRequestContext( text="Nginx 网站", - metadata={"iac_code": {"cwd": str(tmp_path)}}, + metadata={ + "iac_code": { + "cwd": str(tmp_path), + "alibaba_cloud_access_key_id": "client-id", + "alibaba_cloud_access_key_secret": "client-secret", + "alibaba_cloud_region_id": "cn-beijing", + } + }, ), queue, ) assert fake_pipeline.continue_calls == 0 assert fake_pipeline.ask_answers == [{"selected_id": "nginx", "selected_label": "Nginx 网站", "free_text": ""}] + assert captured_resume_credentials == ["client-id"] assert fake_pipeline.pending_inputs[0]["candidate"] == { "runId": "candidate-evaluate_candidate-0-1", "id": "evaluate_candidate", diff --git a/tests/a2a/test_selling_console_frontend.py b/tests/a2a/test_selling_console_frontend.py index b175b305..27ae7463 100644 --- a/tests/a2a/test_selling_console_frontend.py +++ b/tests/a2a/test_selling_console_frontend.py @@ -191,6 +191,7 @@ class FakeElement {{ "status-alert": new FakeElement("div", "status-alert"), "server-url": new FakeElement("input", "server-url"), cwd: new FakeElement("input", "cwd"), + "iac-code-model": new FakeElement("input", "iac-code-model"), "composer-input": new FakeElement("textarea", "composer-input"), "send-button": new FakeElement("button", "send-button"), "health-button": new FakeElement("button", "health-button"), @@ -873,7 +874,11 @@ def test_reducer_collects_snake_case_candidate_index_from_conclusion_options() - def test_reducer_does_not_mutate_original_state() -> None: output = reducer_harness( """ -const state = reducers.createInitialState({serverUrl: "http://server", cwd: "/workspace"}); +const state = reducers.createInitialState({ + serverUrl: "http://server", + cwd: "/workspace", + iacCodeModel: "kimi-k2.7-code" +}); const originalStep = state.steps.architecture_planning; const next = reducers.reducePipelinePayload(state, { metadata: {iac_code: {pipeline: { @@ -1129,7 +1134,11 @@ def test_reducer_clones_existing_defaults_when_cloning_state() -> None: def test_build_stream_payload_uses_active_task_before_handoff() -> None: output = reducer_harness( """ -const state = reducers.createInitialState({serverUrl: "http://server", cwd: "/workspace"}); +const state = reducers.createInitialState({ + serverUrl: "http://server", + cwd: "/workspace", + iacCodeModel: "kimi-k2.7-code" +}); state.contextId = "ctx-1"; state.pipelineTaskId = "pipeline-task"; state.activeTaskId = "active-task"; @@ -1147,6 +1156,7 @@ def test_build_stream_payload_uses_active_task_before_handoff() -> None: "beforeHandoff": { "serverUrl": "http://server", "cwd": "/workspace", + "iacCodeModel": "kimi-k2.7-code", "contextId": "ctx-1", "taskId": "active-task", "prompt": "部署 nginx", @@ -1154,6 +1164,7 @@ def test_build_stream_payload_uses_active_task_before_handoff() -> None: "afterHandoff": { "serverUrl": "http://server", "cwd": "/workspace", + "iacCodeModel": "kimi-k2.7-code", "contextId": "ctx-1", "taskId": "", "prompt": "继续部署", @@ -3979,6 +3990,7 @@ def test_controller_renders_debug_session_info_for_issue_reports() -> None: assert output["fields"] == [ {"key": "serverUrl", "text": "Server URLhttp://127.0.0.1:41299"}, {"key": "cwd", "text": "CWD/workspace"}, + {"key": "iacCodeModel", "text": "Model"}, {"key": "contextId", "text": "Context IDctx-1"}, {"key": "pipelineTaskId", "text": "Pipeline Tasktask-pipeline"}, {"key": "activeTaskId", "text": "Active Tasktask-active"}, diff --git a/tests/a2a/test_selling_console_script.py b/tests/a2a/test_selling_console_script.py index 9c160a4a..89f4449c 100644 --- a/tests/a2a/test_selling_console_script.py +++ b/tests/a2a/test_selling_console_script.py @@ -387,7 +387,12 @@ def test_message_stream_route_forwards_sse_and_cwd_metadata() -> None: try: status, text, content_type = post_raw( f"{running.url}/api/message/stream", - {"serverUrl": target, "cwd": "/workspace/demo", "prompt": "部署一个静态网站"}, + { + "serverUrl": target, + "cwd": "/workspace/demo", + "iacCodeModel": " kimi-k2.7-code ", + "prompt": "部署一个静态网站", + }, ) finally: running.close() @@ -397,7 +402,9 @@ def test_message_stream_route_forwards_sse_and_cwd_metadata() -> None: assert "TASK_STATE_WORKING" in text payload = json.loads(SseTargetHandler.requests[0]["body"]) assert payload["method"] == "SendStreamingMessage" - assert payload["params"]["message"]["metadata"] == {"iac_code": {"cwd": "/workspace/demo"}} + assert payload["params"]["message"]["metadata"] == { + "iac_code": {"cwd": "/workspace/demo", "iac_code_model": "kimi-k2.7-code"} + } def test_message_stream_route_surfaces_recoverable_task_id_from_jsonrpc_error() -> None: @@ -939,6 +946,7 @@ def test_index_html_contains_screenshot_layout_regions() -> None: 'id="debug-drawer"', 'id="server-url"', 'id="cwd"', + 'id="iac-code-model"', 'id="health-button"', 'id="fetch-state-button"', 'id="cancel-button"', diff --git a/tests/a2a/test_transport_dispatcher.py b/tests/a2a/test_transport_dispatcher.py index 7b4dd76a..36057dab 100644 --- a/tests/a2a/test_transport_dispatcher.py +++ b/tests/a2a/test_transport_dispatcher.py @@ -20,8 +20,11 @@ @pytest.mark.asyncio async def test_dispatcher_handles_unary_v03_message(monkeypatch, tmp_path) -> None: loop = FakeAgentLoop([TextDeltaEvent(text="hello from dispatcher")]) - runtime = FakeRuntime(agent_loop=loop, session_id="session-1") - monkeypatch.setattr("iac_code.a2a.executor.create_agent_runtime", lambda options: runtime) + + def factory(options): + return FakeRuntime(agent_loop=loop, session_id=options.session_id) + + monkeypatch.setattr("iac_code.a2a.executor.create_agent_runtime", factory) components = create_runtime_components(model="qwen3.6-plus", host="127.0.0.1", port=41242) dispatcher = A2AJsonRpcDispatcher(components) @@ -44,6 +47,8 @@ async def test_dispatcher_handles_unary_v03_message(monkeypatch, tmp_path) -> No assert response["id"] == "1" assert response["result"]["status"]["state"] == "input-required" + session_id = components.task_store._contexts[response["result"]["contextId"]].session_id + assert response["result"]["metadata"]["iac_code"]["iacCodeSessionId"] == session_id assert loop.prompts == ["hello"] await components.aclose() diff --git a/tests/cli/test_a2a_command.py b/tests/cli/test_a2a_command.py index 6c4f087c..d9a55f3a 100644 --- a/tests/cli/test_a2a_command.py +++ b/tests/cli/test_a2a_command.py @@ -422,8 +422,16 @@ async def send_message( cwd: str, context_id: str | None = None, model: str | None = None, + iac_code_api_key: str | None = None, ): - called["send"] = {"url": url, "prompt": prompt, "cwd": cwd, "context_id": context_id, "model": model} + called["send"] = { + "url": url, + "prompt": prompt, + "cwd": cwd, + "context_id": context_id, + "model": model, + "iac_code_api_key": iac_code_api_key, + } return SimpleNamespace(text="created stack", payload={"result": {"text": "created stack"}}) async def discover(self, url: str): @@ -465,6 +473,8 @@ async def aclose(self) -> None: "ctx-1", "--iac-code-model", "metadata-model", + "--iac-code-api-key", + "metadata-api-key", "--token", "bearer", "--api-key", @@ -493,6 +503,7 @@ async def aclose(self) -> None: "cwd": str(tmp_path), "context_id": "ctx-1", "model": "metadata-model", + "iac_code_api_key": "metadata-api-key", } assert called["discover"] == "http://agent.example/rpc" assert called["fallback_url"] == "http://agent.example/rpc" @@ -564,8 +575,16 @@ async def stream_message( cwd: str, context_id: str | None = None, model: str | None = None, + iac_code_api_key: str | None = None, ): - called["stream"] = {"url": url, "prompt": prompt, "cwd": cwd, "context_id": context_id, "model": model} + called["stream"] = { + "url": url, + "prompt": prompt, + "cwd": cwd, + "context_id": context_id, + "model": model, + "iac_code_api_key": iac_code_api_key, + } yield {"result": {"status": {"state": "working", "message": {"parts": [{"text": "planning"}]}}}} yield {"result": {"text": "created stack"}} @@ -603,6 +622,7 @@ async def aclose(self) -> None: "cwd": str(tmp_path), "context_id": "ctx-1", "model": None, + "iac_code_api_key": None, } assert called["closed"] is True @@ -654,6 +674,7 @@ async def fake_run_a2a_call(**kwargs) -> str: "cwd: /workspace/from-config", "context-id: ctx-from-config", "iac-code-model: config-model", + "iac-code-api-key: config-iac-code-api-key", "token: config-token", "basic-username: config-user", "basic-password: config-pass", @@ -691,6 +712,7 @@ async def fake_run_a2a_call(**kwargs) -> str: "cwd": "/workspace/from-config", "context_id": "ctx-from-config", "model": "config-model", + "iac_code_api_key": "config-iac-code-api-key", "token": "config-token", "basic_username": "config-user", "basic_password": "config-pass", diff --git a/website/docs/a2a/command-reference.md b/website/docs/a2a/command-reference.md index ce58f030..46f2d20b 100644 --- a/website/docs/a2a/command-reference.md +++ b/website/docs/a2a/command-reference.md @@ -60,6 +60,7 @@ timeout: 30 cwd: /path/to/workspace context-id: ctx-123 iac-code-model: qwen-plus +iac-code-api-key: provider-api-key task-id: task-123 config-id: webhook-1 callback-url: https://hooks.example.com/a2a @@ -271,7 +272,8 @@ Discover an Agent Card, choose the advertised endpoint, and send a prompt. iac-code a2a-client --config a2a-client.yml call \ --prompt "Create a ROS VPC template with two vSwitches." \ --cwd "$PWD" \ - --iac-code-model qwen-plus + --iac-code-model qwen-plus \ + --iac-code-api-key "$IAC_CODE_API_KEY" ``` | Option | Default | Description | @@ -283,12 +285,15 @@ iac-code a2a-client --config a2a-client.yml call \ | `--cwd` | `.` | Workspace path sent as `message.metadata.iac_code.cwd` | | `--context-id` | empty | Existing A2A context ID for a follow-up message | | `--iac-code-model` | empty | LLM model sent as `message.metadata.iac_code.iac_code_model`; overrides server model config for this message turn only | +| `--iac-code-api-key` | empty | LLM provider API key sent as `message.metadata.iac_code.iac_code_api_key`; overrides `IAC_CODE_API_KEY` and `.credentials.yml` for this message turn only | | `--verify-card-secret`, `--signing-secret` | empty | HMAC secret for Agent Card verification | | `--verify-card-jwks-url` | empty | Remote JWKS URL used for Agent Card verification | | `--require-card-signature`, `--require-signature` | `false` | Reject unsigned or invalid Agent Cards | | `--timeout` | `30.0` | Call timeout in seconds | | `--stream` | `false` | Use `SendStreamingMessage` and print stream events | +`--iac-code-api-key` is the key used by the remote iac-code runtime to call its LLM provider. It is separate from `--api-key`, which authenticates the A2A HTTP request itself. + Follow-up in the same context: ```bash diff --git a/website/docs/a2a/examples.md b/website/docs/a2a/examples.md index 5fdb63cd..fc312335 100644 --- a/website/docs/a2a/examples.md +++ b/website/docs/a2a/examples.md @@ -64,7 +64,13 @@ async def main() -> None: message_id=f"msg-{uuid.uuid4().hex}", role=Role.ROLE_USER, parts=[Part(text="Create a ROS VPC template with two vSwitches.")], - metadata={"iac_code": {"cwd": str(Path.cwd())}}, + metadata={ + "iac_code": { + "cwd": str(Path.cwd()), + "iac_code_model": "qwen-plus", + "iac_code_api_key": "provider-api-key", + } + }, ) ) @@ -138,6 +144,7 @@ iac-code a2a-client --config a2a-client.yml call \ --prompt "Create a ROS VPC template with two vSwitches." \ --cwd "$PWD" \ --iac-code-model qwen-plus \ + --iac-code-api-key "$IAC_CODE_API_KEY" \ --stream ``` @@ -191,7 +198,7 @@ def build_follow_up(task_id: str, context_id: str, text: str) -> SendMessageRequ context_id=context_id, role=Role.ROLE_USER, parts=[Part(text=text)], - metadata={"iac_code": {"cwd": str(Path.cwd())}}, + metadata={"iac_code": {"cwd": str(Path.cwd()), "iac_code_api_key": "provider-api-key"}}, ) ) @@ -245,7 +252,13 @@ async def main() -> None: "messageId": "msg-1", "role": "ROLE_USER", "parts": [{"text": "Review this ROS template for missing parameters."}], - "metadata": {"iac_code": {"cwd": str(Path.cwd())}}, + "metadata": { + "iac_code": { + "cwd": str(Path.cwd()), + "iac_code_model": "qwen-plus", + "iac_code_api_key": "provider-api-key", + } + }, }, "configuration": {"acceptedOutputModes": ["text/plain"]}, }, @@ -287,7 +300,12 @@ async def main() -> None: "messageId": "msg-1", "role": "ROLE_USER", "parts": [{"text": "Generate a Terraform VPC example."}], - "metadata": {"iac_code": {"cwd": str(Path.cwd())}}, + "metadata": { + "iac_code": { + "cwd": str(Path.cwd()), + "iac_code_api_key": "provider-api-key", + } + }, }, "configuration": {"acceptedOutputModes": ["text/plain"]}, }, diff --git a/website/docs/a2a/getting-started.md b/website/docs/a2a/getting-started.md index ef25e268..80ceb486 100644 --- a/website/docs/a2a/getting-started.md +++ b/website/docs/a2a/getting-started.md @@ -140,6 +140,7 @@ verify-card-secret: your-card-signing-secret require-card-signature: true cwd: /path/to/workspace iac-code-model: qwen-plus +iac-code-api-key: provider-api-key ``` Use `a2a-client call` for a direct Phase 1 client call: @@ -184,6 +185,8 @@ For per-task telemetry attribution, include a non-empty string `user_id` under ` For a per-call LLM override, include a non-empty string `iac_code_model` under `message.metadata.iac_code`. This field is the lowercase form of `IAC_CODE_MODEL`; it overrides `IAC_CODE_MODEL`, `settings.yml`, and the server startup default model only for the current A2A message turn. +For a per-call LLM provider key override, include a non-empty string `iac_code_api_key` under `message.metadata.iac_code`. This field is the lowercase form of `IAC_CODE_API_KEY`; it overrides `IAC_CODE_API_KEY` and `.credentials.yml` only for the current A2A message turn. It is separate from A2A HTTP authentication (`api-key` / `IACCODE_A2A_API_KEY`), which protects access to the A2A endpoint. + For per-request Alibaba Cloud credentials, include `alibaba_cloud_access_key_id`, `alibaba_cloud_access_key_secret`, `alibaba_cloud_region_id`, and optional `alibaba_cloud_security_token` under `message.metadata.iac_code`. These task credentials override the server environment and `.cloud-credentials.yml` for that A2A execution only. The server accepts text-like parts, JSON data parts, raw UTF-8 text, local workspace `file://` text files, and bounded multimodal attachments. Remote URL ingestion is not supported; `url` parts must be local `file://` URLs inside the allowed workspace. @@ -205,7 +208,9 @@ curl -s -X POST http://127.0.0.1:41242/ \ ], "metadata": { "iac_code": { - "cwd": "/path/to/project" + "cwd": "/path/to/project", + "iac_code_model": "qwen-plus", + "iac_code_api_key": "provider-api-key" } } }, diff --git a/website/docs/a2a/protocol-reference.md b/website/docs/a2a/protocol-reference.md index 81a3b4e3..bb1fbb84 100644 --- a/website/docs/a2a/protocol-reference.md +++ b/website/docs/a2a/protocol-reference.md @@ -134,7 +134,8 @@ Runs a non-streaming A2A message turn. The response contains a task or message a "iac_code": { "cwd": "/absolute/path/to/project", "user_id": "client-user-123", - "iac_code_model": "qwen-plus" + "iac_code_model": "qwen-plus", + "iac_code_api_key": "provider-api-key" } } }, @@ -155,6 +156,7 @@ Runs a non-streaming A2A message turn. The response contains a task or message a | `metadata.iac_code.cwd` | string | Recommended | Absolute workspace path; defaults to the server process directory if omitted | | `metadata.iac_code.user_id` | string | Optional | Per-task telemetry user ID override; ignored when blank or non-string | | `metadata.iac_code.iac_code_model` | string | Optional | Per-call LLM model override; this is the lowercase form of `IAC_CODE_MODEL` and is ignored when blank or non-string | +| `metadata.iac_code.iac_code_api_key` | string | Optional | Per-call LLM provider API key override; this is the lowercase form of `IAC_CODE_API_KEY` and is ignored when blank or non-string | | `metadata.iac_code.alibaba_cloud_access_key_id` | string | Optional | Alibaba Cloud AccessKey ID for this task | | `metadata.iac_code.alibaba_cloud_access_key_secret` | string | Optional | Alibaba Cloud AccessKey Secret for this task | | `metadata.iac_code.alibaba_cloud_region_id` | string | Optional | Alibaba Cloud region for this task; defaults to `cn-hangzhou` when omitted with task credentials | @@ -168,6 +170,8 @@ When `metadata.iac_code` includes both `alibaba_cloud_access_key_id` and `alibab `metadata.iac_code.iac_code_model` only affects the current A2A message turn. It takes priority over `IAC_CODE_MODEL`, `settings.yml`, and the server startup default model. Follow-up turns without this metadata field fall back to the server default model even when they reuse the same `contextId`. +`metadata.iac_code.iac_code_api_key` only affects the current A2A message turn. It takes priority over `IAC_CODE_API_KEY` and `.credentials.yml` for the provider selected by the effective model. Follow-up turns without this metadata field reload normal credentials, so a per-call key does not leak across reused `contextId`s. This field is for the LLM provider key and is separate from A2A transport authentication such as `api-key` / `IACCODE_A2A_API_KEY`. + Supported input categories: | Category | Accepted Shape | Limits and Behavior | @@ -381,6 +385,7 @@ Tool and usage details are delivered through `metadata.iac_code`: | `iac_code.tool.name` | Tool name when available | | `iac_code.tool.input` | Completed tool input, truncated to 4000 characters per field | | `iac_code.tool.result` | Tool result, truncated to 4000 characters per field | +| `iac_code.iacCodeSessionId` | Internal iac-code execution session ID for correlating logs and local session artifacts | | `iac_code.permission.autoApproved` | `false` when a tool permission request was rejected by A2A server mode | | `iac_code.thinking.type` | `raw_thinking` when `raw-thinking` is enabled in `thinking-exposure` | | `iac_code.thinking.text` | Raw provider reasoning chunk, truncated to 4000 characters, emitted only for trusted configurations that enable `raw-thinking` | diff --git a/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/a2a/command-reference.md b/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/a2a/command-reference.md index b45aa6a1..e424a001 100644 --- a/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/a2a/command-reference.md +++ b/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/a2a/command-reference.md @@ -60,6 +60,7 @@ timeout: 30 cwd: /path/to/workspace context-id: ctx-123 iac-code-model: qwen-plus +iac-code-api-key: provider-api-key task-id: task-123 config-id: webhook-1 callback-url: https://hooks.example.com/a2a @@ -269,7 +270,8 @@ thinking-exposure: iac-code a2a-client --config a2a-client.yml call \ --prompt "Create a ROS VPC template with two vSwitches." \ --cwd "$PWD" \ - --iac-code-model qwen-plus + --iac-code-model qwen-plus \ + --iac-code-api-key "$IAC_CODE_API_KEY" ``` | 选项 | 默认值 | 描述 | @@ -281,12 +283,15 @@ iac-code a2a-client --config a2a-client.yml call \ | `--cwd` | `.` | 作为 `message.metadata.iac_code.cwd` 发送的 workspace path | | `--context-id` | 空 | 用于后续消息的现有 A2A context ID | | `--iac-code-model` | 空 | 作为 `message.metadata.iac_code.iac_code_model` 发送的 LLM model;只在本次 message turn 覆盖 server model 配置 | +| `--iac-code-api-key` | 空 | 作为 `message.metadata.iac_code.iac_code_api_key` 发送的 LLM provider API key;只在本次 message turn 覆盖 `IAC_CODE_API_KEY` 和 `.credentials.yml` | | `--verify-card-secret`, `--signing-secret` | 空 | Agent Card verification 的 HMAC secret | | `--verify-card-jwks-url` | 空 | 用于 Agent Card verification 的远程 JWKS URL | | `--require-card-signature`, `--require-signature` | `false` | 拒绝未签名或无效的 Agent Cards | | `--timeout` | `30.0` | 调用 timeout(秒) | | `--stream` | `false` | 使用 `SendStreamingMessage` 并打印 stream events | +`--iac-code-api-key` 是远端 iac-code runtime 调用 LLM provider 使用的 key;它和用于认证 A2A HTTP 请求本身的 `--api-key` 不是同一个配置。 + 在同一 context 中发送后续消息: ```bash diff --git a/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/a2a/examples.md b/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/a2a/examples.md index b7f678e9..fa065c9a 100644 --- a/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/a2a/examples.md +++ b/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/a2a/examples.md @@ -64,7 +64,13 @@ async def main() -> None: message_id=f"msg-{uuid.uuid4().hex}", role=Role.ROLE_USER, parts=[Part(text="Create a ROS VPC template with two vSwitches.")], - metadata={"iac_code": {"cwd": str(Path.cwd())}}, + metadata={ + "iac_code": { + "cwd": str(Path.cwd()), + "iac_code_model": "qwen-plus", + "iac_code_api_key": "provider-api-key", + } + }, ) ) @@ -138,6 +144,7 @@ iac-code a2a-client --config a2a-client.yml call \ --prompt "Create a ROS VPC template with two vSwitches." \ --cwd "$PWD" \ --iac-code-model qwen-plus \ + --iac-code-api-key "$IAC_CODE_API_KEY" \ --stream ``` @@ -191,7 +198,7 @@ def build_follow_up(task_id: str, context_id: str, text: str) -> SendMessageRequ context_id=context_id, role=Role.ROLE_USER, parts=[Part(text=text)], - metadata={"iac_code": {"cwd": str(Path.cwd())}}, + metadata={"iac_code": {"cwd": str(Path.cwd()), "iac_code_api_key": "provider-api-key"}}, ) ) @@ -245,7 +252,13 @@ async def main() -> None: "messageId": "msg-1", "role": "ROLE_USER", "parts": [{"text": "Review this ROS template for missing parameters."}], - "metadata": {"iac_code": {"cwd": str(Path.cwd())}}, + "metadata": { + "iac_code": { + "cwd": str(Path.cwd()), + "iac_code_model": "qwen-plus", + "iac_code_api_key": "provider-api-key", + } + }, }, "configuration": {"acceptedOutputModes": ["text/plain"]}, }, @@ -287,7 +300,12 @@ async def main() -> None: "messageId": "msg-1", "role": "ROLE_USER", "parts": [{"text": "Generate a Terraform VPC example."}], - "metadata": {"iac_code": {"cwd": str(Path.cwd())}}, + "metadata": { + "iac_code": { + "cwd": str(Path.cwd()), + "iac_code_api_key": "provider-api-key", + } + }, }, "configuration": {"acceptedOutputModes": ["text/plain"]}, }, diff --git a/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/a2a/getting-started.md b/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/a2a/getting-started.md index 1db90b48..634db0b7 100644 --- a/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/a2a/getting-started.md +++ b/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/a2a/getting-started.md @@ -140,6 +140,7 @@ verify-card-secret: your-card-signing-secret require-card-signature: true cwd: /path/to/workspace iac-code-model: qwen-plus +iac-code-api-key: provider-api-key ``` 使用 `a2a-client call` 进行直接的 Phase 1 client 调用: @@ -184,6 +185,8 @@ iac-code a2a-client route-preview \ 如需按调用覆盖 LLM,请在 `message.metadata.iac_code` 下传入非空字符串 `iac_code_model`。该字段是 `IAC_CODE_MODEL` 的小写形式;它只在当前 A2A message turn 中覆盖 `IAC_CODE_MODEL`、`settings.yml` 和 server 启动默认 model。 +如需按调用覆盖 LLM provider key,请在 `message.metadata.iac_code` 下传入非空字符串 `iac_code_api_key`。该字段是 `IAC_CODE_API_KEY` 的小写形式;它只在当前 A2A message turn 中覆盖 `IAC_CODE_API_KEY` 和 `.credentials.yml`。它和用于保护 A2A endpoint 的 HTTP 认证(`api-key` / `IACCODE_A2A_API_KEY`)是两件事。 + 如需按请求传入 Alibaba Cloud 凭据,请在 `message.metadata.iac_code` 下包含 `alibaba_cloud_access_key_id`、`alibaba_cloud_access_key_secret`、`alibaba_cloud_region_id` 和可选的 `alibaba_cloud_security_token`。这些任务凭据只在本次 A2A 执行中覆盖 server 环境和 `.cloud-credentials.yml`。 服务器接受类文本 parts、JSON 数据 parts、原始 UTF-8 文本、本地工作区 `file://` 文本文件和有界多模态附件。不支持远程 URL 摄取;`url` parts 必须是位于允许工作区内的本地 `file://` URLs。 @@ -205,7 +208,9 @@ curl -s -X POST http://127.0.0.1:41242/ \ ], "metadata": { "iac_code": { - "cwd": "/path/to/project" + "cwd": "/path/to/project", + "iac_code_model": "qwen-plus", + "iac_code_api_key": "provider-api-key" } } }, diff --git a/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/a2a/protocol-reference.md b/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/a2a/protocol-reference.md index 2fe387b6..a312dee3 100644 --- a/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/a2a/protocol-reference.md +++ b/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/a2a/protocol-reference.md @@ -134,7 +134,8 @@ Callback URLs 会在存储前以及分发前再次校验。默认 validator 会 "iac_code": { "cwd": "/absolute/path/to/project", "user_id": "client-user-123", - "iac_code_model": "qwen-plus" + "iac_code_model": "qwen-plus", + "iac_code_api_key": "provider-api-key" } } }, @@ -155,6 +156,7 @@ Callback URLs 会在存储前以及分发前再次校验。默认 validator 会 | `metadata.iac_code.cwd` | string | 建议 | 绝对工作区路径;省略时默认为服务器进程目录 | | `metadata.iac_code.user_id` | string | 可选 | 当前任务的 telemetry user ID 覆盖;空字符串或非字符串会被忽略 | | `metadata.iac_code.iac_code_model` | string | 可选 | 当前调用的 LLM model 覆盖;这是 `IAC_CODE_MODEL` 的小写形式,空字符串或非字符串会被忽略 | +| `metadata.iac_code.iac_code_api_key` | string | 可选 | 当前调用的 LLM provider API key 覆盖;这是 `IAC_CODE_API_KEY` 的小写形式,空字符串或非字符串会被忽略 | | `metadata.iac_code.alibaba_cloud_access_key_id` | string | 可选 | 当前任务使用的 Alibaba Cloud AccessKey ID | | `metadata.iac_code.alibaba_cloud_access_key_secret` | string | 可选 | 当前任务使用的 Alibaba Cloud AccessKey Secret | | `metadata.iac_code.alibaba_cloud_region_id` | string | 可选 | 当前任务使用的 Alibaba Cloud region;和任务凭据一起省略时默认为 `cn-hangzhou` | @@ -168,6 +170,8 @@ Callback URLs 会在存储前以及分发前再次校验。默认 validator 会 `metadata.iac_code.iac_code_model` 只影响当前 A2A message turn。它优先于 `IAC_CODE_MODEL`、`settings.yml` 和 server 启动默认 model;复用同一个 `contextId` 的后续轮次如果不再传该字段,会回落到 server 默认 model。 +`metadata.iac_code.iac_code_api_key` 只影响当前 A2A message turn。它优先于 `IAC_CODE_API_KEY` 和 `.credentials.yml` 中当前有效 model 对应 provider 的 key;复用同一个 `contextId` 的后续轮次如果不再传该字段,会重新加载正常凭据,因此单次调用 key 不会串到后续请求。这个字段用于 LLM provider key,和 A2A transport 认证里的 `api-key` / `IACCODE_A2A_API_KEY` 是两件事。 + 支持的输入类别: | 类别 | 接受的形状 | 限制和行为 |