diff --git a/src/row_bot/agent.py b/src/row_bot/agent.py index 1327919e..a2b8e29f 100644 --- a/src/row_bot/agent.py +++ b/src/row_bot/agent.py @@ -3259,6 +3259,94 @@ def _resolve_tool_display_name(func_name: str) -> str: return _TOOL_DISPLAY_NAMES.get(func_name, func_name) +_SAFE_TOOL_CALL_ARG_KEYS = { + "category", + "display_name", + "include_events", + "limit", + "model", + "parent_message_id", + "parent_run_id", + "parent_thread_id", + "profile", + "run_id", + "setting", + "statuses", + "timeout_seconds", + "wait", +} + + +class ToolCallPayload(str): + """String-compatible tool-call event with optional UI metadata.""" + + def __new__( + cls, + name: str, + *, + raw_name: str = "", + args: dict[str, Any] | None = None, + call_id: str = "", + ): + obj = str.__new__(cls, str(name or "tool")) + obj.raw_name = str(raw_name or "") + obj.args = dict(args or {}) + obj.call_id = str(call_id or "") + return obj + + def get(self, key: str, default: Any = None) -> Any: + if key == "name": + return str(self) + if key == "raw_name": + return self.raw_name + if key == "args": + return self.args + if key == "id": + return self.call_id + return default + + def as_dict(self) -> dict[str, Any]: + return { + "name": str(self), + "raw_name": self.raw_name, + "args": dict(self.args), + "id": self.call_id, + } + + +def _safe_tool_call_args(args: Any) -> dict[str, Any]: + if not isinstance(args, dict): + return {} + safe: dict[str, Any] = {} + for key, value in args.items(): + clean_key = str(key or "").strip() + if clean_key not in _SAFE_TOOL_CALL_ARG_KEYS: + continue + if isinstance(value, bool) or value is None: + safe[clean_key] = value + elif isinstance(value, (int, float)): + safe[clean_key] = value + elif isinstance(value, str): + safe[clean_key] = value[:180] + elif isinstance(value, list): + safe[clean_key] = [ + str(item)[:120] + for item in value[:8] + if isinstance(item, (str, int, float, bool)) + ] + return safe + + +def _tool_call_payload(tc: dict[str, Any]) -> ToolCallPayload: + raw_name = str(tc.get("name") or "") + return ToolCallPayload( + _resolve_tool_display_name(raw_name), + raw_name=raw_name, + args=_safe_tool_call_args(tc.get("args")), + call_id=str(tc.get("id") or raw_name), + ) + + def _selected_model_label_from_config(config: dict) -> tuple[str, bool]: model_override = (config.get("configurable") or {}).get("model_override") if model_override and model_override != get_current_model(): @@ -4316,7 +4404,7 @@ def _finish_provider_call(moment: float, reason: str) -> None: tc_id = tc.get("id", tc["name"]) if tc_id not in _seen_tool_calls: _seen_tool_calls.add(tc_id) - yield ("tool_call", _resolve_tool_display_name(tc["name"])) + yield ("tool_call", _tool_call_payload(tc)) # Loop detection: hash (name, args) as signature _args = tc.get("args", {}) @@ -4454,7 +4542,7 @@ def _finish_provider_call(moment: float, reason: str) -> None: tc_list = getattr(m, "tool_calls", []) if tc_list: for tc in tc_list: - yield ("tool_call", _resolve_tool_display_name(tc["name"])) + yield ("tool_call", _tool_call_payload(tc)) if m.type == "tool": yield ("tool_done", { "name": _resolve_tool_display_name(m.name), diff --git a/src/row_bot/agent_commands.py b/src/row_bot/agent_commands.py index 49ae5ae4..decfcf24 100644 --- a/src/row_bot/agent_commands.py +++ b/src/row_bot/agent_commands.py @@ -37,6 +37,7 @@ class AgentSpawnRequest: profile: str = DEFAULT_DIRECT_AGENT_PROFILE explicit_profile: bool = False source: str = "natural" + model: str = "" _DIRECT_AGENT_VERBS = {"use", "create", "spawn", "start", "launch", "make"} @@ -181,12 +182,18 @@ def is_agent_spawn_command(text: str) -> bool: def parse_agent_spawn_text(text: str) -> AgentSpawnRequest | None: - """Parse explicit direct child-Agent requests without task-based routing.""" + """Parse explicit direct child-Agent slash commands without task-based routing.""" raw = str(text or "").strip() if not raw: return None if is_agent_spawn_command(raw): arg = raw.split(maxsplit=1)[1].strip() if len(raw.split(maxsplit=1)) > 1 else "" + if not arg: + return None + arg, model = _extract_model_option(arg) + if arg is None: + return None + arg = arg.strip() if not arg: return None profile_match = _consume_leading_profile(arg) @@ -199,24 +206,55 @@ def parse_agent_spawn_text(text: str) -> AgentSpawnRequest | None: profile=profile_slug, explicit_profile=True, source="slash", + model=model, ) return AgentSpawnRequest( objective=arg, profile=DEFAULT_DIRECT_AGENT_PROFILE, explicit_profile=False, source="slash", + model=model, ) - return _parse_natural_agent_request(raw) + return None + + +def _extract_model_option(arg: str) -> tuple[str | None, str]: + """Remove a strict ``--model`` option from a slash-command argument.""" + parts = str(arg or "").strip().split() + if not parts: + return "", "" + kept: list[str] = [] + model = "" + index = 0 + while index < len(parts): + part = parts[index] + if part == "--model": + if index + 1 >= len(parts): + return None, "" + model = parts[index + 1].strip() + index += 2 + continue + if part.startswith("--model="): + model = part.split("=", 1)[1].strip() + if not model: + return None, "" + index += 1 + continue + kept.append(part) + index += 1 + return " ".join(kept).strip(), model def format_agent_spawn_usage() -> str: return ( - "Usage: `/agent [profile] `.\n\n" + "Usage: `/agent [--model=model:provider:model-id] [profile] `.\n\n" "Examples:\n" "- `/agent review check this plan for risk`\n" - "- `/agent develop implement the focused fix`\n\n" + "- `/agent develop implement the focused fix`\n" + "- `/agent --model=model:claude_subscription:claude-opus-4-8 worker only reply: ok`\n\n" "Generic Agent requests use `worker`. A specialized profile is used only " - "when you explicitly name an enabled Agent Profile." + "when you explicitly name an enabled Agent Profile. The optional model " + "must be an active pinned Brain canonical ref or exact pinned label." ) @@ -244,6 +282,18 @@ def spawn_agent_from_request( raise ValueError("Direct Agent requests require a parent thread.") from row_bot.agent_runner import spawn_agent_run + model_override = "" + if str(request.model or "").strip(): + from row_bot.providers.selection import resolve_catalog_model_selection + + resolved = resolve_catalog_model_selection( + request.model, + surface="chat", + require_agent_ready=True, + require_pinned=True, + ) + model_override = resolved.ref + return spawn_agent_run( request.objective, parent_thread_id=str(thread_id), @@ -251,6 +301,7 @@ def spawn_agent_from_request( display_name=agent_spawn_display_name(request), context_mode="auto", enabled_tool_names=list(enabled_tool_names or []), + model_override=model_override, wait=False, ) @@ -261,7 +312,9 @@ def format_agent_spawn_started(run: dict, request: AgentSpawnRequest) -> str: name = str((run or {}).get("display_name") or agent_spawn_display_name(request)).strip() profile = str((run or {}).get("profile_slug") or request.profile or DEFAULT_DIRECT_AGENT_PROFILE).strip() suffix = f" (`{run_id}`)" if run_id else "" - return f"Started Agent **{name}** with profile `{profile}`. Status: `{status}`{suffix}." + model = str((run or {}).get("model_override") or request.model or "").strip() + model_text = f" Model: `{model}`." if model else "" + return f"Started Agent **{name}** with profile `{profile}`. Status: `{status}`{suffix}.{model_text}" def _profile_lines(query: str = "", *, limit: int = 18) -> list[str]: diff --git a/src/row_bot/agent_context.py b/src/row_bot/agent_context.py index 027e7a31..7ab24c6f 100644 --- a/src/row_bot/agent_context.py +++ b/src/row_bot/agent_context.py @@ -146,6 +146,7 @@ def build_child_agent_prompt( context_mode: str = "", parent_thread_id: str = "", parent_run_id: str = "", + model_override: str = "", ) -> dict[str, str]: """Build the focused prompt packet for a child Agent run. @@ -204,6 +205,17 @@ def build_child_agent_prompt( "MISSION:", objective, ] + model_ref = str(model_override or "").strip() + runtime_lines = [ + "The parent selected this child Agent's runtime model before start.", + ( + f"Runtime model override: {model_ref}" + if model_ref + else "Runtime model override: inherited from the parent/default runtime." + ), + "Do not call row_bot_update_setting(setting='model') to satisfy your own runtime model request.", + ] + parts.extend(["", "RUNTIME MODEL:", "\n".join(runtime_lines)]) for title, body in context_sections: if body: parts.extend(["", f"{title}:", body]) diff --git a/src/row_bot/agent_runner.py b/src/row_bot/agent_runner.py index d92e52bd..f16f4f83 100644 --- a/src/row_bot/agent_runner.py +++ b/src/row_bot/agent_runner.py @@ -299,6 +299,7 @@ def spawn_agent_run( context_mode=context_mode, parent_thread_id=parent_thread_id, parent_run_id=parent_run_id, + model_override=model, ) run_id = uuid.uuid4().hex[:12] child_display = display_name or f"{profile_snapshot.get('display_name', 'Agent')}: {_short_title(objective, limit=42)}" diff --git a/src/row_bot/agent_runs.py b/src/row_bot/agent_runs.py index 8f363841..d606eea3 100644 --- a/src/row_bot/agent_runs.py +++ b/src/row_bot/agent_runs.py @@ -1385,6 +1385,7 @@ def mirror_workflow_run_start( display_name: str, steps_total: int = 0, profile_id: str = "", + profile_snapshot_json: Mapping[str, Any] | None = None, approval_mode: str = "", model_override: str = "", tools_override: Sequence[str] | str | None = None, @@ -1399,6 +1400,7 @@ def mirror_workflow_run_start( thread_id=thread_id, display_name=display_name, profile_id=profile_id, + profile_snapshot_json=profile_snapshot_json, approval_mode=approval_mode, model_override=model_override, tools_override=tools_override, diff --git a/src/row_bot/app.py b/src/row_bot/app.py index aca97645..3afd4c2b 100644 --- a/src/row_bot/app.py +++ b/src/row_bot/app.py @@ -213,14 +213,23 @@ def _safe_dumps(obj, *args, **kwargs): ) from row_bot.ui.head_html import inject_head_html from row_bot.ui.setup_wizard import show_setup_wizard -from row_bot.ui.render import render_text_with_embeds, add_chat_message +from row_bot.ui.render import ( + add_chat_message, + agent_result_use_prompt, + render_text_with_embeds, +) from row_bot.ui.export import open_export from row_bot.ui.graph_panel import build_graph_panel from row_bot.ui.task_dialog import show_task_dialog from row_bot.ui.sidebar import build_sidebar from row_bot.ui.command_center import build_command_center from row_bot.ui.settings import open_settings -from row_bot.ui.streaming import Callbacks, send_message, build_interrupt_dialog +from row_bot.ui.streaming import ( + Callbacks, + _append_async_delegated_agent_completion_messages, + build_interrupt_dialog, + send_message, +) from row_bot.ui.home import build_home from row_bot.ui.chat import build_chat from row_bot.ui.transcript import ( @@ -1079,6 +1088,17 @@ def _send_message( internal_goal_continuation=internal_goal_continuation, ) + def _ask_parent_to_use_agent_result(run_id: str) -> None: + prompt = agent_result_use_prompt(run_id) + if not prompt: + return + try: + ui.notify("Asking the parent to use that Agent result.", type="info", close_button=True) + asyncio.create_task(_send_message(prompt)) + except Exception as exc: + logger.debug("Could not ask parent to use Agent result", exc_info=True) + ui.notify(f"Could not ask parent to use Agent result: {exc}", type="negative", close_button=True) + async def _send_active_voice_message(text: str, *, voice_mode: bool = False): binding = getattr(p, "active_voice_binding", None) if binding is not None and binding.is_current(state.thread_id): @@ -1292,7 +1312,14 @@ def _exit_developer(): open_settings=_open_settings, open_export=_open_export, show_interrupt=cb.show_interrupt, - add_chat_message=lambda msg: add_chat_message(msg, p, state.thread_id), + add_chat_message=( + lambda msg, **kwargs: add_chat_message( + msg, + p, + state.thread_id, + **kwargs, + ) + ), browse_file=browse_file, ) log_ui_perf( @@ -1413,7 +1440,12 @@ def _mark_chat_message_rendered(msg: dict) -> None: logger.debug("Transcript render-state mark failed", exc_info=True) def _add_chat_message_and_track(msg: dict) -> None: - add_chat_message(msg, p, state.thread_id) + add_chat_message( + msg, + p, + state.thread_id, + on_use_agent_result=_ask_parent_to_use_agent_result, + ) _mark_chat_message_rendered(msg) cb.add_chat_message = _add_chat_message_and_track @@ -1429,6 +1461,11 @@ def _add_chat_message_and_track(msg: dict) -> None: if callable(getattr(p, "refresh_goal_strip", None)) else None ) + cb.refresh_model_controls = lambda: ( + p.refresh_model_controls() + if callable(getattr(p, "refresh_model_controls", None)) + else None + ) def _refresh_chat_messages() -> None: """Synchronize the active transcript without a full visible rebuild.""" @@ -1487,7 +1524,12 @@ def _append_chunk() -> None: with p.chat_container: while end_idx < len(missing): msg_index, msg = missing[end_idx] - add_chat_message(msg, p, state.thread_id) + add_chat_message( + msg, + p, + state.thread_id, + on_use_agent_result=_ask_parent_to_use_agent_result, + ) p.transcript_rendered_keys.append(message_key(msg_index, msg)) end_idx += 1 if end_idx - start_idx >= TRANSCRIPT_MAX_CHUNK_MESSAGES: @@ -1544,7 +1586,12 @@ def _reconcile_chunk() -> None: try: with p.chat_container: while end_idx < len(display_msgs): - add_chat_message(display_msgs[end_idx], p, state.thread_id) + add_chat_message( + display_msgs[end_idx], + p, + state.thread_id, + on_use_agent_result=_ask_parent_to_use_agent_result, + ) p.transcript_rendered_keys.append(display_keys[end_idx]) end_idx += 1 if end_idx - start_idx >= TRANSCRIPT_MAX_CHUNK_MESSAGES: @@ -1568,6 +1615,116 @@ def _reconcile_chunk() -> None: # ── Timers ─────────────────────────────────────────────────────────── + _last_agent_run_refresh = {"thread_id": "", "key": ""} + + def _current_agent_run_refresh_key(tid: str) -> str: + if not tid: + return "" + try: + from row_bot.agent_runs import list_agent_runs + + rows = list_agent_runs(parent_thread_id=tid, kind="subagent", limit=12) + except Exception: + logger.debug("Agent run page refresh poll failed", exc_info=True) + return "" + parts: list[str] = [] + for row in rows: + parts.append("|".join( + str(row.get(field) or "") + for field in ( + "id", + "status", + "status_message", + "summary", + "error", + "steps_done", + "steps_total", + "updated_at", + "finished_at", + "stop_requested", + ) + )) + return "\n".join(parts) + + def _current_child_agent_run_ids(tid: str) -> list[str]: + if not tid: + return [] + try: + from row_bot.agent_runs import list_agent_runs + + return [ + str(row.get("id") or "").strip() + for row in list_agent_runs(parent_thread_id=tid, kind="subagent", limit=12) + if str(row.get("id") or "").strip() + ] + except Exception: + logger.debug("Agent run id poll failed", exc_info=True) + return [] + + def _thread_has_live_generation(tid: str) -> bool: + active_gen = _active_generations.get(tid) + return bool( + active_gen + and str(getattr(active_gen, "status", "") or "").lower() == "streaming" + and not bool(getattr(active_gen, "detached", False)) + and getattr(active_gen, "live_row", None) is not None + ) + + def _poll_agent_card_refresh() -> None: + tid = str(state.thread_id or "") + if not tid: + _last_agent_run_refresh["thread_id"] = "" + _last_agent_run_refresh["key"] = "" + return + key = _current_agent_run_refresh_key(tid) + if _last_agent_run_refresh.get("thread_id") != tid: + _last_agent_run_refresh["thread_id"] = tid + _last_agent_run_refresh["key"] = key + if _thread_has_live_generation(tid): + return + try: + completion_changed = _append_async_delegated_agent_completion_messages( + state.messages, + candidate_run_ids=_current_child_agent_run_ids(tid), + checkpoint_thread_id=tid, + ) + if completion_changed: + state.cache_active_messages() + if p.chat_container is not None and p.transcript_thread_id == tid: + p.transcript_rendered_keys = [] + _refresh_chat_messages() + except Exception: + logger.debug("Initial async delegated Agent completion poll failed", exc_info=True) + return + if key == _last_agent_run_refresh.get("key"): + return + _last_agent_run_refresh["key"] = key + try: + if callable(getattr(p, "refresh_parent_agent_strip", None)): + p.refresh_parent_agent_strip() + except Exception: + logger.debug("Parent Agent strip poll refresh failed", exc_info=True) + if _thread_has_live_generation(tid): + return + try: + if _append_async_delegated_agent_completion_messages( + state.messages, + candidate_run_ids=_current_child_agent_run_ids(tid), + checkpoint_thread_id=tid, + ): + state.cache_active_messages() + except Exception: + logger.debug("Async delegated Agent completion poll failed", exc_info=True) + if p.chat_container is None or p.transcript_thread_id != tid: + return + try: + # Agent Run cards are backed by DB rows, so message keys may stay + # stable while card content changes from queued/running/completed. + p.transcript_rendered_keys = [] + _refresh_chat_messages() + except Exception: + logger.debug("Agent card transcript poll refresh failed", exc_info=True) + def _poll_notifications() -> None: for t in drain_toasts(): _tkw = {"type": t.get("type", "info"), "close_button": True} @@ -1783,8 +1940,9 @@ def _update_token_counter() -> None: _notification_timer = safe_timer(1.0, _poll_notifications) _voice_timer = safe_timer(0.3, _poll_voice) + _agent_card_timer = safe_timer(1.0, _poll_agent_card_refresh) _token_timer = safe_timer(5.0, _update_token_counter) - deactivate_on_disconnect(_notification_timer, _voice_timer, _token_timer) + deactivate_on_disconnect(_notification_timer, _voice_timer, _agent_card_timer, _token_timer) # ── Build initial view ─────────────────────────────────────────────── _rebuild_main() diff --git a/src/row_bot/providers/selection.py b/src/row_bot/providers/selection.py index e88a134d..17eca229 100644 --- a/src/row_bot/providers/selection.py +++ b/src/row_bot/providers/selection.py @@ -53,10 +53,424 @@ class CanonicalModelSelection: source: str = "" +@dataclass(frozen=True) +class CatalogModelResolution: + ref: str + provider_id: str = "" + model_id: str = "" + display_label: str = "" + source: str = "" + alternatives: tuple[str, ...] = () + + class ModelSelectionError(ValueError): """Raised when a model selection cannot be canonicalized safely.""" +def resolve_catalog_model_selection( + value: str | None, + *, + surface: str = "chat", + allow_default: bool = False, + require_agent_ready: bool = False, + require_pinned: bool = False, +) -> CatalogModelResolution: + """Strictly resolve a model selection without inventing a provider. + + This helper is for user-facing persistent writes and child-agent model + selection. It reads configured quick choices, custom endpoint metadata, and + the cached model catalog only; it does not probe providers or refresh live + catalogs. When require_pinned is true, only configured Quick Choices are + accepted. + """ + raw = str(value or "").strip() + if not raw or raw.lower() == "default": + if allow_default: + return CatalogModelResolution(ref="", display_label="Default", source="default") + raise ModelSelectionError("Model selection is empty.") + + parsed = parse_model_ref(raw) + candidates = _strict_catalog_model_candidates(surface=surface) + if parsed: + provider_id, model_id = parsed + ref = model_ref(provider_id, model_id) + exact = _unique_catalog_candidates( + item for item in candidates if item["ref"].lower() == ref.lower() + ) + if exact: + if require_pinned and not _catalog_candidate_is_pinned(exact[0]): + raise ModelSelectionError(_pinned_model_selection_message(raw, candidates, provider_id=provider_id)) + return _catalog_resolution_from_candidate( + raw, + exact[0], + require_agent_ready=require_agent_ready, + ) + if require_pinned: + raise ModelSelectionError(_pinned_model_selection_message(raw, candidates, provider_id=provider_id)) + if _known_catalog_provider(provider_id): + return CatalogModelResolution( + ref=ref, + provider_id=provider_id, + model_id=model_id, + display_label=format_model_choice_label( + provider_id, + model_id, + include_icon=False, + ), + source="provider_ref", + ) + raise ModelSelectionError( + f"Unknown model provider '{provider_id}' in '{raw}'. Choose a configured model." + ) + + lower = raw.lower() + matches = _unique_catalog_candidates( + item for item in candidates if lower in item["aliases"] + ) + if require_pinned: + pinned_matches = [item for item in matches if _catalog_candidate_is_pinned(item)] + if matches and not pinned_matches: + raise ModelSelectionError(_pinned_model_selection_message(raw, candidates)) + matches = pinned_matches + if not matches: + if require_pinned: + raise ModelSelectionError(_pinned_model_selection_message(raw, candidates)) + raise ModelSelectionError( + f"Model '{raw}' was not found in configured model choices. " + "Use a provider-qualified model such as model:: " + "or pin the model in Settings -> Models first." + ) + if len(matches) > 1: + alternatives = tuple(_catalog_candidate_choice(item) for item in matches[:8]) + raise ModelSelectionError( + f"Ambiguous model selection '{raw}'. Use one of: " + f"{', '.join(alternatives)}." + ) + return _catalog_resolution_from_candidate( + raw, + matches[0], + require_agent_ready=require_agent_ready, + ) + + +def _known_catalog_provider(provider_id: str) -> bool: + provider = _canonical_provider_id(provider_id) + if not provider: + return False + try: + from row_bot.providers.catalog import get_provider_definition + + if get_provider_definition(provider) is not None: + return True + except Exception: + pass + try: + from row_bot.providers.custom import get_custom_endpoint + + return bool(get_custom_endpoint(provider)) + except Exception: + return False + + +def _catalog_resolution_from_candidate( + raw: str, + candidate: dict[str, Any], + *, + require_agent_ready: bool, +) -> CatalogModelResolution: + if candidate.get("active") is False: + reason = str(candidate.get("reason") or "This model choice is inactive.") + raise ModelSelectionError(f"Model '{raw}' is inactive. {reason}".strip()) + if require_agent_ready and str(candidate.get("runtime_mode") or "") == "chat_only": + reason = str(candidate.get("reason") or "This model is Chat Only; tools and actions are off.") + raise ModelSelectionError(f"Model '{raw}' is not available for agent runs. {reason}".strip()) + return CatalogModelResolution( + ref=str(candidate["ref"]), + provider_id=str(candidate["provider_id"]), + model_id=str(candidate["model_id"]), + display_label=str(candidate.get("display_label") or candidate.get("model_id") or ""), + source=str(candidate.get("source") or "catalog"), + ) + + +def _catalog_candidate_choice(candidate: dict[str, Any]) -> str: + label = str(candidate.get("display_label") or "") + ref = str(candidate.get("ref") or "") + return f"{label} ({ref})" if label and label != ref else ref + + +def _catalog_candidate_is_pinned(candidate: dict[str, Any]) -> bool: + return str(candidate.get("source") or "") == "quick_choice" + + +def _pinned_model_suggestions( + candidates: Iterable[dict[str, Any]], + *, + provider_id: str = "", + limit: int = 5, +) -> tuple[str, ...]: + provider = _canonical_provider_id(provider_id) + pinned = [ + item + for item in _unique_catalog_candidates(candidates) + if _catalog_candidate_is_pinned(item) and item.get("active") is not False + ] + scoped = [ + item + for item in pinned + if provider and str(item.get("provider_id") or "") == provider + ] + choices = scoped or pinned + return tuple(_catalog_candidate_choice(item) for item in choices[:limit]) + + +def _pinned_model_selection_message( + raw: str, + candidates: Iterable[dict[str, Any]], + *, + provider_id: str = "", +) -> str: + message = ( + f"Model '{raw}' is not pinned for Brain. " + "Pin it in Settings -> Models first, or choose an existing pinned Brain choice." + ) + suggestions = _pinned_model_suggestions(candidates, provider_id=provider_id) + if suggestions: + message += f" Active pinned Brain choices: {', '.join(suggestions)}." + return message + + +def _unique_catalog_candidates(candidates: Iterable[dict[str, Any]]) -> list[dict[str, Any]]: + by_ref: dict[str, dict[str, Any]] = {} + for item in candidates: + ref = str(item.get("ref") or "") + if ref and ref not in by_ref: + by_ref[ref] = item + return list(by_ref.values()) + + +def pinned_model_choice_summaries( + surface: str = "chat", + *, + include_inactive: bool = True, + limit: int = 0, +) -> list[dict[str, Any]]: + """Return compact, agent-readable pinned model choices for a surface.""" + summaries: list[dict[str, Any]] = [] + try: + options = list_model_choice_options(surface, include_inactive=include_inactive) + except Exception: + return summaries + for option in options: + provider_id = _canonical_provider_id(str(option.get("provider_id") or "")) + model_id = str(option.get("model_id") or "").strip() + ref = str(option.get("value") or "").strip() + if not provider_id or not model_id or not ref: + continue + active = option.get("active") is not False + summaries.append({ + "ref": ref, + "canonical_ref": ref, + "config_value": f"{provider_id}/{model_id}", + "label": str(option.get("label") or model_id), + "display_name": str(option.get("display_name") or model_id), + "provider_id": provider_id, + "provider": provider_display_label(provider_id), + "model_id": model_id, + "active": active, + "reason": str(option.get("reason") or ""), + "source": str(option.get("source") or "quick_choice"), + }) + if limit and len(summaries) >= limit: + break + return summaries + + +def _strict_catalog_model_candidates(*, surface: str) -> list[dict[str, Any]]: + candidates: list[dict[str, Any]] = [] + + def add_candidate( + *, + provider_id: str, + model_id: str, + display_name: str = "", + source: str, + aliases: Iterable[str] = (), + active: bool = True, + reason: str = "", + visibility: Iterable[str] | None = None, + capabilities_snapshot: dict[str, Any] | None = None, + runtime_mode: str = "", + ) -> None: + provider = _canonical_provider_id(provider_id) + model = str(model_id or "").strip() + if not provider or not model: + return + snapshot = dict(capabilities_snapshot or {}) + if not _catalog_candidate_supports_surface( + surface, + visibility=visibility, + capabilities_snapshot=snapshot, + ): + return + ref = model_ref(provider, model) + label = format_model_choice_label( + provider, + model, + display_name or model, + include_icon=False, + ) + alias_values = { + ref, + model, + display_name or model, + f"{provider}/{model}", + } + alias_values.update(str(item or "") for item in aliases) + candidates.append({ + "ref": ref, + "provider_id": provider, + "model_id": model, + "display_label": label, + "source": source, + "aliases": {item.strip().lower() for item in alias_values if item and item.strip()}, + "active": bool(active), + "reason": str(reason or ""), + "runtime_mode": str(runtime_mode or ""), + }) + + for choice in _configured_quick_model_choices(): + if choice.get("kind") != "model": + continue + provider_id = str(choice.get("provider_id") or "") + model_id = str(choice.get("model_id") or "") + display_name = str(choice.get("display_name") or model_id) + add_candidate( + provider_id=provider_id, + model_id=model_id, + display_name=display_name, + source="quick_choice", + aliases=(str(choice.get("id") or ""),), + active=choice.get("active") is not False, + reason=str(choice.get("inactive_reason") or choice.get("last_error") or ""), + visibility=choice.get("visibility") if isinstance(choice.get("visibility"), list) else None, + capabilities_snapshot=choice.get("capabilities_snapshot") + if isinstance(choice.get("capabilities_snapshot"), dict) + else None, + ) + + for provider_id, model_id, display_name in _custom_endpoint_model_matches_for_catalog(): + add_candidate( + provider_id=provider_id, + model_id=model_id, + display_name=display_name, + source="custom_endpoint_model", + ) + + for provider_id, model_id, display_name, snapshot in _cached_catalog_model_matches(): + add_candidate( + provider_id=provider_id, + model_id=model_id, + display_name=display_name, + source="cached_catalog", + capabilities_snapshot=snapshot, + ) + + return _unique_catalog_candidates(candidates) + + +def _configured_quick_model_choices() -> list[dict[str, Any]]: + try: + return [ + dict(choice) + for choice in load_provider_config().get("quick_choices", []) + if isinstance(choice, dict) + ] + except Exception: + return [] + + +def _catalog_candidate_supports_surface( + surface: str, + *, + visibility: Iterable[str] | None, + capabilities_snapshot: dict[str, Any], +) -> bool: + selected_surface = str(surface or "").strip() + if not selected_surface: + return True + visible = {str(item) for item in (visibility or []) if str(item)} + if visible and selected_surface not in visible: + return False + has_structured_snapshot = bool( + capabilities_snapshot.get("tasks") + or capabilities_snapshot.get("input_modalities") + or capabilities_snapshot.get("output_modalities") + or capabilities_snapshot.get("capabilities") + ) + if has_structured_snapshot and not snapshot_supports_surface(capabilities_snapshot, selected_surface): + return False + return True + + +def _custom_endpoint_model_matches_for_catalog() -> list[tuple[str, str, str]]: + matches: list[tuple[str, str, str]] = [] + try: + from row_bot.providers.custom import custom_endpoint_models, list_custom_endpoints + + for endpoint in list_custom_endpoints(): + if endpoint.get("enabled") is False: + continue + provider_id = str(endpoint.get("provider_id") or "") + if not provider_id: + continue + for item in custom_endpoint_models(provider_id): + model_id = str(item.get("model_id") or item.get("id") or "") + display_name = str(item.get("display_name") or item.get("label") or model_id) + if model_id: + matches.append((provider_id, model_id, display_name)) + except Exception: + return [] + return matches + + +def _cached_catalog_model_matches() -> list[tuple[str, str, str, dict[str, Any]]]: + matches: list[tuple[str, str, str, dict[str, Any]]] = [] + try: + from row_bot.providers.model_catalog_cache import read_model_catalog_cache + + snapshot = read_model_catalog_cache() + except Exception: + return [] + for key, info in snapshot.cloud_cache.items(): + if not isinstance(info, dict): + continue + parsed = parse_model_ref(str(key)) + provider_id = str(info.get("provider") or (parsed[0] if parsed else "") or "") + model_id = str((parsed[1] if parsed else "") or info.get("model_id") or key) + if not provider_id or not model_id: + continue + display = str(info.get("display_name") or info.get("label") or info.get("name") or model_id) + snapshot_data = ( + info.get("capabilities_snapshot") + if isinstance(info.get("capabilities_snapshot"), dict) + else {} + ) + matches.append((provider_id, model_id, display, dict(snapshot_data))) + for row in snapshot.ollama_rows: + model_id = str(row.get("model") or row.get("model_id") or row.get("name") or row.get("id") or "") + if not model_id: + continue + display = str(row.get("display_name") or row.get("label") or model_id) + snapshot_data = ( + row.get("capabilities_snapshot") + if isinstance(row.get("capabilities_snapshot"), dict) + else {} + ) + matches.append(("ollama", model_id, display, dict(snapshot_data))) + return matches + + def model_selection_diagnostics( value: str | None, *, @@ -659,14 +1073,79 @@ def _is_auto_capability_reason(reason: Any) -> bool: return str(reason or "").startswith("Capability metadata says this model is not compatible with ") +def _ollama_uncached_capability_snapshot() -> dict[str, Any]: + return { + "capabilities": ["chat", "streaming", "text"], + "input_modalities": ["text"], + "output_modalities": ["text"], + "tasks": ["chat"], + "tool_calling": None, + "streaming": True, + "endpoint_compatibility": ["ollama_chat"], + "transport": "ollama_chat", + "source_confidence": "inferred", + "last_verified_at": "", + } + + +def _strip_weak_ollama_vision(snapshot: dict[str, Any]) -> dict[str, Any]: + cleaned = dict(snapshot) + capabilities = cleaned.get("capabilities") + if isinstance(capabilities, list): + cleaned["capabilities"] = [ + item for item in capabilities + if str(item).strip().lower() != "vision" + ] + input_modalities = cleaned.get("input_modalities") + if isinstance(input_modalities, list): + kept = [ + item for item in input_modalities + if str(item).strip().lower() != "image" + ] + cleaned["input_modalities"] = kept or ["text"] + return cleaned + + +def _ollama_snapshot_has_weak_inferred_vision(model_id: str, snapshot: dict[str, Any]) -> bool: + input_modalities = snapshot.get("input_modalities") + has_image = isinstance(input_modalities, list) and any( + str(item).strip().lower() == "image" for item in input_modalities + ) + if not has_image: + return False + if str(snapshot.get("source_confidence") or "").strip().lower() != "inferred": + return False + if str(snapshot.get("last_verified_at") or "").strip(): + return False + try: + from row_bot.providers.ollama import is_ollama_vision_capable + + return not is_ollama_vision_capable(model_id) + except Exception: + return True + + def _inferred_capability_snapshot(choice: dict[str, Any]) -> dict[str, Any]: provider_id = str(choice.get("provider_id") or "") model_id = str(choice.get("model_id") or "") if not provider_id or not model_id: return {} try: - from row_bot.providers.capability_resolution import resolve_capability_snapshot + from row_bot.providers.capability_resolution import ( + resolve_capability_metadata, + resolve_capability_snapshot, + ) + if provider_id in {"local", "ollama", "ollama_cloud"}: + resolved = resolve_capability_metadata( + provider_id, + model_id, + include_static_fallback=False, + ) + cached = resolved.snapshot + if cached and _ollama_snapshot_has_weak_inferred_vision(model_id, cached): + return _strip_weak_ollama_vision(cached) + return cached or _ollama_uncached_capability_snapshot() return resolve_capability_snapshot(provider_id, model_id) except Exception: return {} diff --git a/src/row_bot/slash_commands.py b/src/row_bot/slash_commands.py index 6e9c647b..84d91bad 100644 --- a/src/row_bot/slash_commands.py +++ b/src/row_bot/slash_commands.py @@ -385,11 +385,14 @@ def dispatch_text_command( request = parse_agent_spawn_text(text) if request is None: return format_agent_spawn_usage() - run = spawn_agent_from_request( - thread_id, - request, - enabled_tool_names=enabled_tool_names, - ) + try: + run = spawn_agent_from_request( + thread_id, + request, + enabled_tool_names=enabled_tool_names, + ) + except Exception as exc: + return f"Could not start Agent: {exc}" return format_agent_spawn_started(run, request) if spec.id == "goal": from row_bot.goals import handle_goal_command diff --git a/src/row_bot/tasks.py b/src/row_bot/tasks.py index b738ce01..c1f7a3b8 100644 --- a/src/row_bot/tasks.py +++ b/src/row_bot/tasks.py @@ -35,7 +35,7 @@ from contextvars import ContextVar from datetime import datetime, timedelta from functools import wraps -from typing import TYPE_CHECKING, Callable, TypeVar +from typing import TYPE_CHECKING, Any, Callable, Iterable, Mapping, Sequence, TypeVar from row_bot.approval_policy import ( DEFAULT_APPROVAL_MODE, @@ -60,6 +60,21 @@ _OLD_WF_DB = str(_DATA_DIR / "workflows.db") _TASK_CONFIG_PATH = str(_DATA_DIR / "task_config.json") _SCHEMA_VERSION = 1 +DEFAULT_WORKFLOW_AGENT_PROFILE_ID = "builtin:row_bot_default" +WORKFLOW_PROFILE_MIGRATION_VERSION = "workflow_profile_v1" +_WORKFLOW_READ_ONLY_DEFAULT_DENY_TOOLS = { + "calendar", + "custom_tool_builder", + "designer", + "gmail", + "goal", + "image_gen", + "row_bot_updater", + "task", + "tracker", + "video_gen", + "x", +} _SCHEMA_LOCK = threading.RLock() _SCHEMA_READY_PATH: str | None = None _LAST_SCHEMA_REPAIR: dict[str, object] = {} @@ -96,7 +111,10 @@ tools_override TEXT, channels TEXT, advanced_mode INTEGER DEFAULT 0, - agent_profile_id TEXT DEFAULT '' + agent_profile_id TEXT DEFAULT '', + profile_migration_status TEXT DEFAULT '', + profile_migration_note TEXT DEFAULT '', + profile_migration_snapshot_json TEXT DEFAULT '{}' ) """, "task_runs": """ @@ -201,6 +219,9 @@ ("channels", "TEXT"), ("advanced_mode", "INTEGER DEFAULT 0"), ("agent_profile_id", "TEXT DEFAULT ''"), + ("profile_migration_status", "TEXT DEFAULT ''"), + ("profile_migration_note", "TEXT DEFAULT ''"), + ("profile_migration_snapshot_json", "TEXT DEFAULT '{}'"), ], "task_runs": [ ("finished_at", "TEXT"), @@ -242,7 +263,8 @@ "persistent_thread_id", "delete_after_run", "allowed_commands", "allowed_recipients", "skills_override", "steps", "safety_mode", "concurrency_group", "trigger", "tools_override", "channels", - "advanced_mode", "agent_profile_id", + "advanced_mode", "agent_profile_id", "profile_migration_status", + "profile_migration_note", "profile_migration_snapshot_json", }, "task_runs": { "id", "task_id", "thread_id", "started_at", "finished_at", "status", @@ -568,7 +590,10 @@ def _init_db() -> None: tools_override TEXT, -- JSON list of tool names (null = all enabled) channels TEXT, -- JSON list of channel names (null = workflow default) advanced_mode INTEGER DEFAULT 0, -- 1 = reopen in Advanced editor mode - agent_profile_id TEXT DEFAULT '' -- optional Agent Profile id/slug + agent_profile_id TEXT DEFAULT '', -- optional Agent Profile id/slug + profile_migration_status TEXT DEFAULT '', + profile_migration_note TEXT DEFAULT '', + profile_migration_snapshot_json TEXT DEFAULT '{}' ) """) conn.execute(""" @@ -655,6 +680,9 @@ def _init_db() -> None: ("channels", "TEXT"), ("advanced_mode", "INTEGER DEFAULT 0"), ("agent_profile_id", "TEXT DEFAULT ''"), + ("profile_migration_status", "TEXT DEFAULT ''"), + ("profile_migration_note", "TEXT DEFAULT ''"), + ("profile_migration_snapshot_json", "TEXT DEFAULT '{}'"), ]: try: conn.execute(f"ALTER TABLE tasks ADD COLUMN {col} {defn}") @@ -829,8 +857,485 @@ def _migrate_from_workflows() -> None: conn.close() +def _ordered_text_list(value: Any) -> list[str]: + if value is None: + return [] + if isinstance(value, str): + items = [value] + elif isinstance(value, (list, tuple, set, frozenset)): + items = list(value) + else: + return [] + result: list[str] = [] + seen: set[str] = set() + for item in items: + text = str(item or "").strip() + if text and text not in seen: + seen.add(text) + result.append(text) + return result + + +def _json_list_value(value: Any) -> list[str] | None: + if value is None: + return None + if isinstance(value, str): + text = value.strip() + if not text: + return None + try: + loaded = json.loads(text) + except Exception: + return _ordered_text_list(text) + return _ordered_text_list(loaded) + return _ordered_text_list(value) + + +def _default_task_skill_names_for_migration() -> set[str]: + defaults = {"proactive_agent"} + try: + from row_bot.skills import get_default_active_skill_names + + defaults.update(get_default_active_skill_names("task")) + except Exception: + logger.debug("Could not load default task skills for workflow migration", exc_info=True) + return {name for name in defaults if name} + + +def _custom_legacy_skill_names(skills_override: Sequence[str] | None) -> list[str]: + names = _ordered_text_list(skills_override) + if not names: + return [] + default_names = _default_task_skill_names_for_migration() + custom = [name for name in names if name not in default_names] + if custom: + return custom + return [] + + +def _workflow_policy_key( + *, + tools_override: Sequence[str] | None, + skills_override: Sequence[str] | None, +) -> dict[str, list[str]]: + return { + "tools": sorted(_ordered_text_list(tools_override)), + "skills": sorted(_custom_legacy_skill_names(skills_override)), + } + + +def _infer_migrated_profile_capability(tools: Sequence[str], skills: Sequence[str]) -> str: + if not tools: + return "orchestrator" if skills else "read_only" + write_or_delivery_tools = { + "calendar", + "custom_tool_builder", + "designer", + "gmail", + "goal", + "image_gen", + "row_bot_updater", + "task", + "tracker", + "video_gen", + "x", + } + return "write_capable" if any(tool in write_or_delivery_tools for tool in tools) else "read_only" + + +def _workflow_policy_signature( + *, + tools_override: Sequence[str] | None, + skills_override: Sequence[str] | None, +) -> dict[str, Any]: + key = _workflow_policy_key( + tools_override=tools_override, + skills_override=skills_override, + ) + return { + **key, + "capability": _infer_migrated_profile_capability(key["tools"], key["skills"]), + } + + +def _profile_policy_key(profile: Mapping[str, Any]) -> dict[str, list[str]]: + tool_policy = profile.get("tool_policy_json") or {} + skill_policy = profile.get("skill_policy_json") or {} + if not isinstance(tool_policy, dict): + tool_policy = {} + if not isinstance(skill_policy, dict): + skill_policy = {} + return { + "tools": sorted(_ordered_text_list(tool_policy.get("allow_tools"))), + "skills": sorted(_ordered_text_list(skill_policy.get("skills_override"))), + } + + +def _known_workflow_tool_ids() -> set[str]: + try: + from row_bot.agent_tool_catalog import list_agent_tool_catalog + + return { + str(item.get("id") or "") + for item in list_agent_tool_catalog(include_unavailable=True) + if str(item.get("id") or "") + } + except Exception: + logger.debug("Could not load Agent Profile tool catalog for migration", exc_info=True) + return set() + + +def _known_workflow_skill_names() -> set[str]: + try: + from row_bot.skills import get_all_skills, load_skills, skills_loaded + + if not skills_loaded(): + load_skills() + return {skill.name for skill in get_all_skills()} + except Exception: + logger.debug("Could not load skills for workflow migration", exc_info=True) + return set() + + +def _missing_legacy_policy_items( + *, + tools_override: Sequence[str] | None, + skills_override: Sequence[str] | None, +) -> dict[str, list[str]]: + missing: dict[str, list[str]] = {} + tools = _ordered_text_list(tools_override) + if tools: + known_tools = _known_workflow_tool_ids() + if known_tools: + missing_tools = sorted(tool for tool in tools if tool not in known_tools) + if missing_tools: + missing["tools"] = missing_tools + skills = _custom_legacy_skill_names(skills_override) + if skills: + known_skills = _known_workflow_skill_names() + if known_skills: + missing_skills = sorted(skill for skill in skills if skill not in known_skills) + if missing_skills: + missing["skills"] = missing_skills + return missing + + +def _migration_profile_name(signature: Mapping[str, Any]) -> str: + tools = [str(item) for item in signature.get("tools") or []] + skills = [str(item) for item in signature.get("skills") or []] + if tools: + labels = [tool.replace("_", " ").replace("-", " ").title() for tool in tools[:3]] + joined = " + ".join(labels) + return f"Migrated {joined} Tools" if joined else "Migrated Workflow Profile" + if skills: + labels = [skill.replace("_", " ").replace("-", " ").title() for skill in skills[:3]] + joined = " + ".join(labels) + return f"Migrated {joined} Skills" if joined else "Migrated Workflow Profile" + return "Migrated Workflow Profile" + + +def _find_matching_builtin_profile(policy_key: Mapping[str, list[str]]) -> dict[str, Any] | None: + try: + from row_bot.agent_profiles import list_agent_profiles + + for profile in list_agent_profiles(enabled_only=True, include_builtins=True): + if profile.get("source") == "builtin" and _profile_policy_key(profile) == policy_key: + return profile + except Exception: + logger.debug("Could not match built-in Agent Profile for workflow migration", exc_info=True) + return None + + +def _find_migration_profile(signature: Mapping[str, Any]) -> dict[str, Any] | None: + try: + from row_bot.agent_profiles import list_agent_profiles + + for profile in list_agent_profiles(enabled_only=False, include_builtins=False): + provenance = profile.get("provenance_json") or {} + if not isinstance(provenance, dict): + continue + if provenance.get("migration") != WORKFLOW_PROFILE_MIGRATION_VERSION: + continue + if provenance.get("policy_signature") == dict(signature): + return profile + except Exception: + logger.debug("Could not find reusable migration profile", exc_info=True) + return None + + +def _append_migration_profile_task_id(profile: Mapping[str, Any], task_id: str) -> None: + if not profile or str(profile.get("id") or "").startswith("builtin:"): + return + try: + from row_bot.agent_profiles import save_agent_profile + + provenance = dict(profile.get("provenance_json") or {}) + source_task_ids = _ordered_text_list(provenance.get("source_task_ids")) + if task_id not in source_task_ids: + provenance["source_task_ids"] = [*source_task_ids, task_id] + save_agent_profile({**dict(profile), "provenance_json": provenance}) + except Exception: + logger.debug("Could not update migration profile provenance", exc_info=True) + + +def _create_migration_profile(signature: Mapping[str, Any], task_id: str) -> dict[str, Any]: + from row_bot.agent_profiles import _unique_slug, normalize_profile_slug, save_agent_profile + + name = _migration_profile_name(signature) + slug = _unique_slug(normalize_profile_slug(name) or "migrated_workflow_profile") + capability = str(signature.get("capability") or "read_only") + allow_tools = _ordered_text_list(signature.get("tools")) + skills = _ordered_text_list(signature.get("skills")) + return save_agent_profile( + slug=slug, + display_name=name, + description="Created by the workflow Agent Profile migration from legacy workflow policy.", + when_to_use="Use for workflows migrated from legacy tool or skill overrides with this policy.", + instructions=( + "Run the workflow using the migrated legacy workflow capability policy. " + "Review this profile before broad reuse." + ), + handoff_contract="Summarize workflow results, approvals, delivery status, and any follow-up needed.", + source="workflow_created", + tool_policy_json={ + "capability": capability, + "allow_tools": allow_tools, + "allow_delegation": False, + }, + skill_policy_json={"skills_override": skills}, + context_policy_json={ + "default_context_mode": "auto", + "include_parent_summary": True, + "include_selected_messages": False, + "include_workspace_context": True, + "max_context_tokens": 0, + }, + workspace_policy_json={ + "workspace_mode_default": "read_only" if capability == "read_only" else "auto", + "write_lock_required": capability in {"write_capable", "orchestrator"}, + "worktree_allowed": False, + "developer_workspace_required": False, + }, + approval_policy_json={"mode": "inherit"}, + model_policy_json={"mode": "inherit"}, + ui_json={"icon": "rule", "color": "blue-grey", "group": "Migrated"}, + provenance_json={ + "migration": WORKFLOW_PROFILE_MIGRATION_VERSION, + "source_task_ids": [task_id], + "policy_signature": dict(signature), + }, + ) + + +def _resolve_workflow_profile_for_legacy_policy( + *, + task_id: str, + task_name: str = "", + agent_profile_id: str | None, + tools_override: Sequence[str] | None, + skills_override: Sequence[str] | None, + preserve_existing_profile: bool = False, +) -> dict[str, Any]: + tools = _ordered_text_list(tools_override) + skills = _ordered_text_list(skills_override) + raw_policy_present = tools_override is not None or skills_override is not None + snapshot = { + "migration": WORKFLOW_PROFILE_MIGRATION_VERSION, + "task_id": task_id, + "task_name": task_name, + "old_tools_override": tools if tools_override is not None else None, + "old_skills_override": skills if skills_override is not None else None, + } + + profile_ref = str(agent_profile_id or "").strip() + if preserve_existing_profile and profile_ref: + try: + from row_bot.agent_profiles import require_agent_profile + + profile = require_agent_profile(profile_ref, enabled_only=True) + snapshot["selected_profile_id"] = profile["id"] + snapshot["selected_profile_slug"] = profile["slug"] + return { + "profile_id": profile["id"], + "status": "not_needed", + "note": "Workflow already had an Agent Profile; legacy overrides were retired.", + "snapshot": snapshot, + "clear_old_overrides": True, + "disable": False, + } + except Exception as exc: + snapshot["invalid_profile_reference"] = profile_ref + snapshot["invalid_profile_error"] = str(exc) + + policy_key = _workflow_policy_key( + tools_override=tools, + skills_override=skills, + ) + custom_policy = bool(policy_key["tools"] or policy_key["skills"]) + if not custom_policy: + note = "Assigned Default Agent Profile." + status = "not_needed" + if raw_policy_present and skills: + note = "Ignored legacy task default skill snapshot and assigned Default Agent Profile." + status = "needs_review" + snapshot["policy_signature"] = _workflow_policy_signature( + tools_override=[], + skills_override=[], + ) + return { + "profile_id": DEFAULT_WORKFLOW_AGENT_PROFILE_ID, + "status": status, + "note": note, + "snapshot": snapshot, + "clear_old_overrides": True, + "disable": False, + } + + missing = _missing_legacy_policy_items( + tools_override=policy_key["tools"], + skills_override=policy_key["skills"], + ) + if missing: + snapshot["missing_policy_items"] = missing + return { + "profile_id": profile_ref or "", + "status": "blocked", + "note": "Legacy workflow policy references missing tools or skills.", + "snapshot": snapshot, + "clear_old_overrides": False, + "disable": True, + } + + builtin = _find_matching_builtin_profile(policy_key) + if builtin: + snapshot["policy_signature"] = _workflow_policy_signature( + tools_override=policy_key["tools"], + skills_override=policy_key["skills"], + ) + snapshot["selected_profile_id"] = builtin["id"] + snapshot["selected_profile_slug"] = builtin["slug"] + return { + "profile_id": builtin["id"], + "status": "exact_profile", + "note": f"Legacy workflow policy matched built-in Agent Profile: {builtin['display_name']}.", + "snapshot": snapshot, + "clear_old_overrides": True, + "disable": False, + } + + signature = _workflow_policy_signature( + tools_override=policy_key["tools"], + skills_override=policy_key["skills"], + ) + profile = _find_migration_profile(signature) + if profile is None: + profile = _create_migration_profile(signature, task_id) + else: + _append_migration_profile_task_id(profile, task_id) + snapshot["policy_signature"] = signature + snapshot["selected_profile_id"] = profile["id"] + snapshot["selected_profile_slug"] = profile["slug"] + return { + "profile_id": profile["id"], + "status": "created_profile", + "note": f"Legacy workflow policy migrated to Agent Profile: {profile['display_name']}.", + "snapshot": snapshot, + "clear_old_overrides": True, + "disable": False, + } + + +def _task_row_legacy_policy(row: sqlite3.Row | Mapping[str, Any]) -> tuple[list[str] | None, list[str] | None]: + raw = dict(row) + return ( + _json_list_value(raw.get("tools_override")), + _json_list_value(raw.get("skills_override")), + ) + + +def migrate_workflow_profile_policies() -> dict[str, Any]: + """Migrate legacy workflow tool/skill overrides into Agent Profiles.""" + ensure_task_schema() + conn = _raw_conn() + migrated = 0 + blocked = 0 + needs_review = 0 + created_or_reused = 0 + try: + rows = conn.execute( + "SELECT id, name, agent_profile_id, tools_override, skills_override, " + "enabled, profile_migration_status FROM tasks" + ).fetchall() + for row in rows: + raw_tools, raw_skills = _task_row_legacy_policy(row) + profile_ref = str(row["agent_profile_id"] or "") + status = str(row["profile_migration_status"] or "") + if ( + status + and profile_ref + and raw_tools is None + and raw_skills is None + ): + continue + conversion = _resolve_workflow_profile_for_legacy_policy( + task_id=str(row["id"]), + task_name=str(row["name"] or ""), + agent_profile_id=profile_ref, + tools_override=raw_tools, + skills_override=raw_skills, + preserve_existing_profile=True, + ) + new_enabled = 0 if conversion["disable"] else int(bool(row["enabled"])) + if conversion["status"] == "blocked": + blocked += 1 + if conversion["status"] == "needs_review": + needs_review += 1 + if conversion["status"] == "created_profile": + created_or_reused += 1 + clear_old = bool(conversion["clear_old_overrides"]) + conn.execute( + "UPDATE tasks SET agent_profile_id = ?, " + "tools_override = CASE WHEN ? THEN NULL ELSE tools_override END, " + "skills_override = CASE WHEN ? THEN NULL ELSE skills_override END, " + "enabled = ?, profile_migration_status = ?, " + "profile_migration_note = ?, profile_migration_snapshot_json = ? " + "WHERE id = ?", + ( + str(conversion["profile_id"] or profile_ref or DEFAULT_WORKFLOW_AGENT_PROFILE_ID), + 1 if clear_old else 0, + 1 if clear_old else 0, + new_enabled, + conversion["status"], + conversion["note"], + json.dumps(conversion["snapshot"], sort_keys=True), + row["id"], + ), + ) + conn.commit() + migrated += 1 + conn.commit() + finally: + conn.close() + return { + "migrated": migrated, + "blocked": blocked, + "needs_review": needs_review, + "created_or_reused_profiles": created_or_reused, + } + + +def _migrate_workflow_profile_policies_best_effort() -> None: + try: + result = migrate_workflow_profile_policies() + if result.get("migrated"): + logger.info("Workflow Agent Profile migration result: %s", result) + except Exception as exc: + logger.warning("Workflow Agent Profile migration failed (non-fatal): %s", exc) + + _init_db() _migrate_from_workflows() +_migrate_workflow_profile_policies_best_effort() # ── Template Variables ─────────────────────────────────────────────────────── @@ -876,9 +1381,14 @@ def _canonicalize_workflow_model_override(value: str | None) -> str | None: raw = str(value or "").strip() if not raw: return None - from row_bot.providers.selection import canonicalize_model_selection + from row_bot.providers.selection import resolve_catalog_model_selection - canonical = canonicalize_model_selection(raw, "workflow", allow_default=True) + canonical = resolve_catalog_model_selection( + raw, + surface="workflow", + allow_default=True, + require_agent_ready=True, + ) return canonical.ref or None @@ -904,14 +1414,9 @@ def _canonicalize_workflow_steps(steps: list[dict] | None) -> list[dict] | None: step["model_override"] = canonical else: step.pop("model_override", None) - if "agent_profile_id" in step: - canonical_profile = _canonicalize_agent_profile_reference( - step.get("agent_profile_id") - ) - if canonical_profile: - step["agent_profile_id"] = canonical_profile - else: - step.pop("agent_profile_id", None) + # Workflow runtime policy is selected at the workflow level. Step-level + # profile pointers are intentionally ignored in this migration phase. + step.pop("agent_profile_id", None) return steps @@ -1075,18 +1580,38 @@ def create_task( if advanced_mode is None: advanced_mode = bool(steps) model_override = _canonicalize_workflow_model_override(model_override) - agent_profile_id = _canonicalize_agent_profile_reference(agent_profile_id) + legacy_policy_input = tools_override is not None or skills_override is not None + agent_profile_id = _canonicalize_agent_profile_reference( + agent_profile_id or DEFAULT_WORKFLOW_AGENT_PROFILE_ID + ) safety_mode = legacy_safety_mode_to_approval_mode(safety_mode) + profile_migration_status = "not_needed" + profile_migration_note = "Profile-first workflow." + profile_migration_snapshot: dict[str, Any] = {} if notify_only: skills_override = None - elif skills_override is None and apply_default_skills: - try: - from row_bot.skills import get_default_active_skill_names - - skills_override = get_default_active_skill_names("task") - except Exception: - logger.debug("Failed to resolve default workflow skills", exc_info=True) - skills_override = [] + tools_override = None + elif legacy_policy_input: + conversion = _resolve_workflow_profile_for_legacy_policy( + task_id=task_id, + task_name=name, + agent_profile_id=agent_profile_id, + tools_override=tools_override, + skills_override=skills_override, + preserve_existing_profile=False, + ) + agent_profile_id = str(conversion["profile_id"] or agent_profile_id) + profile_migration_status = str(conversion["status"] or "not_needed") + profile_migration_note = str(conversion["note"] or "") + profile_migration_snapshot = dict(conversion["snapshot"] or {}) + if conversion["clear_old_overrides"]: + tools_override = None + skills_override = None + if conversion["disable"]: + enabled = False + else: + skills_override = None + tools_override = None # If steps provided, also sync prompts for backward compat if steps: assign_step_ids(steps) @@ -1099,8 +1624,9 @@ def create_task( "notify_label, delivery_channel, delivery_target, model_override, " "persistent_thread_id, delete_after_run, created_at, enabled, skills_override, " "steps, safety_mode, concurrency_group, trigger, tools_override, channels, " - "advanced_mode, agent_profile_id) " - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + "advanced_mode, agent_profile_id, profile_migration_status, " + "profile_migration_note, profile_migration_snapshot_json) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ( task_id, name, description, icon, json.dumps(prompts), schedule, at, int(notify_only), notify_label, @@ -1115,6 +1641,9 @@ def create_task( json.dumps(channels) if channels is not None else None, int(bool(advanced_mode)), agent_profile_id, + profile_migration_status, + profile_migration_note, + json.dumps(profile_migration_snapshot, sort_keys=True), ), ) conn.commit() @@ -1170,9 +1699,35 @@ def update_task(task_id: str, **kwargs) -> None: "skills_override", "steps", "safety_mode", "concurrency_group", "trigger", "tools_override", "channels", "advanced_mode", "agent_profile_id", + "profile_migration_status", "profile_migration_note", + "profile_migration_snapshot_json", } # ── Validate delivery if either field is being changed ─────────── + if {"tools_override", "skills_override"} & set(kwargs): + current = get_task(task_id) or {} + conversion = _resolve_workflow_profile_for_legacy_policy( + task_id=task_id, + task_name=str(kwargs.get("name") or current.get("name") or ""), + agent_profile_id=str(kwargs.get("agent_profile_id") or current.get("agent_profile_id") or ""), + tools_override=kwargs.get("tools_override", current.get("tools_override")), + skills_override=kwargs.get("skills_override", current.get("skills_override")), + preserve_existing_profile=False, + ) + kwargs["agent_profile_id"] = str( + conversion["profile_id"] + or current.get("agent_profile_id") + or DEFAULT_WORKFLOW_AGENT_PROFILE_ID + ) + kwargs["profile_migration_status"] = str(conversion["status"] or "not_needed") + kwargs["profile_migration_note"] = str(conversion["note"] or "") + kwargs["profile_migration_snapshot_json"] = dict(conversion["snapshot"] or {}) + if conversion["clear_old_overrides"]: + kwargs["tools_override"] = None + kwargs["skills_override"] = None + if conversion["disable"]: + kwargs["enabled"] = False + if "delivery_channel" in kwargs or "delivery_target" in kwargs: # Merge with existing values to get full picture task = get_task(task_id) @@ -1190,14 +1745,14 @@ def update_task(task_id: str, **kwargs) -> None: if key == "safety_mode": value = legacy_safety_mode_to_approval_mode(value) if key == "agent_profile_id": - value = _canonicalize_agent_profile_reference(value) + value = _canonicalize_agent_profile_reference(value or DEFAULT_WORKFLOW_AGENT_PROFILE_ID) if key == "steps" and isinstance(value, list): assign_step_ids(value) _canonicalize_workflow_steps(value) if key in ("prompts", "allowed_commands", "allowed_recipients", "skills_override", "steps", "trigger", - "tools_override", "channels"): - value = json.dumps(value) + "tools_override", "channels", "profile_migration_snapshot_json"): + value = json.dumps(value, sort_keys=True) if value is not None else None if key in ("notify_only", "delete_after_run", "advanced_mode"): value = int(value) conn.execute( @@ -1342,6 +1897,13 @@ def _row_to_dict(row: sqlite3.Row) -> dict: raw_channels = d.get("channels") d["channels"] = json.loads(raw_channels) if raw_channels else None d["agent_profile_id"] = str(d.get("agent_profile_id") or "") + d["profile_migration_status"] = str(d.get("profile_migration_status") or "") + d["profile_migration_note"] = str(d.get("profile_migration_note") or "") + raw_migration = d.get("profile_migration_snapshot_json") + try: + d["profile_migration_snapshot_json"] = json.loads(raw_migration) if raw_migration else {} + except Exception: + d["profile_migration_snapshot_json"] = {} if d.get("safety_mode"): d["safety_mode"] = legacy_safety_mode_to_approval_mode(d.get("safety_mode")) # Auto-convert: if steps is empty but prompts exist, synthesize steps @@ -1381,14 +1943,105 @@ def _steps_to_prompts(steps: list[dict]) -> list[str]: def _workflow_agent_profile_ref(task: dict | None) -> str: if not task: - return "" + return DEFAULT_WORKFLOW_AGENT_PROFILE_ID profile_ref = str(task.get("agent_profile_id") or "") if profile_ref: return profile_ref - for step in task.get("steps") or []: - if isinstance(step, dict) and step.get("agent_profile_id"): - return str(step.get("agent_profile_id") or "") - return "" + return DEFAULT_WORKFLOW_AGENT_PROFILE_ID + + +def _workflow_agent_profile_snapshot(task: dict | None) -> dict[str, Any]: + profile_ref = _workflow_agent_profile_ref(task) + try: + from row_bot.agent_profiles import snapshot_agent_profile + + return snapshot_agent_profile(profile_ref) + except Exception: + logger.debug("Could not snapshot workflow Agent Profile %s", profile_ref, exc_info=True) + return {} + + +def _workflow_profile_tool_allowlist(profile_snapshot: Mapping[str, Any]) -> list[str]: + tool_policy = profile_snapshot.get("tool_policy_json") or {} + if not isinstance(tool_policy, dict): + return [] + return _ordered_text_list(tool_policy.get("allow_tools")) + + +def _workflow_default_task_skills() -> list[str]: + try: + from row_bot.skills import get_default_active_skill_names + + return _ordered_text_list(get_default_active_skill_names("task")) + except Exception: + logger.debug("Could not load default task skills for workflow runtime", exc_info=True) + return [] + + +def _workflow_profile_skills(profile_snapshot: Mapping[str, Any]) -> list[str]: + skill_policy = profile_snapshot.get("skill_policy_json") or {} + if not isinstance(skill_policy, dict): + return [] + base = _ordered_text_list(skill_policy.get("skills_override")) + if not base: + base = _workflow_default_task_skills() + deny = set(_ordered_text_list(skill_policy.get("deny_skills"))) + return [name for name in base if name not in deny] + + +def _filter_workflow_tools_for_profile( + enabled_tool_names: Sequence[str], + profile_snapshot: Mapping[str, Any], +) -> list[str]: + requested = _ordered_text_list(enabled_tool_names) + tool_policy = profile_snapshot.get("tool_policy_json") or {} + if not isinstance(tool_policy, dict): + tool_policy = {} + allow = set(_ordered_text_list(tool_policy.get("allow_tools"))) + capability = str(tool_policy.get("capability") or "read_only") + filtered = list(requested) + if capability == "read_only" and not allow: + filtered = [name for name in filtered if name not in _WORKFLOW_READ_ONLY_DEFAULT_DENY_TOOLS] + if allow: + mcp_allowed = "mcp" in allow or any(name.startswith("mcp_") for name in allow) + filtered = [ + name + for name in filtered + if name in allow or (name == "mcp" and mcp_allowed) + ] + return filtered + + +def _workflow_profile_runtime_policy( + task: dict, + enabled_tool_names: Sequence[str], +) -> dict[str, Any]: + from row_bot.agent_profiles import resolve_profile_for_run + + parent_approval = get_task_approval_mode(task) + resolved = resolve_profile_for_run( + _workflow_agent_profile_ref(task), + parent_approval_mode=parent_approval, + require_enabled=True, + ) + profile_snapshot = dict(resolved["profile_snapshot"]) + tool_allowlist = _workflow_profile_tool_allowlist(profile_snapshot) + return { + "agent_profile_id": str(resolved["profile_id"] or DEFAULT_WORKFLOW_AGENT_PROFILE_ID), + "agent_profile_slug": str(resolved["profile_slug"] or ""), + "agent_profile_snapshot": profile_snapshot, + "approval_mode": normalize_approval_mode( + str(resolved["effective_approval_mode"] or parent_approval), + parent_approval, + ), + "skills_override": _workflow_profile_skills(profile_snapshot), + "tool_allowlist": tool_allowlist, + "effective_tool_names": _filter_workflow_tools_for_profile( + enabled_tool_names, + profile_snapshot, + ), + "warnings": list(resolved.get("warnings") or []), + } def _mirror_workflow_agent_run_start( @@ -1402,6 +2055,7 @@ def _mirror_workflow_agent_run_start( try: task = get_task(task_id) from row_bot.agent_runs import mirror_workflow_run_start + profile_snapshot = _workflow_agent_profile_snapshot(task) mirror_workflow_run_start( run_id, @@ -1410,10 +2064,11 @@ def _mirror_workflow_agent_run_start( display_name=task_name or (task or {}).get("name", ""), steps_total=steps_total, profile_id=_workflow_agent_profile_ref(task), + profile_snapshot_json=profile_snapshot, approval_mode=get_task_approval_mode(task) if task else DEFAULT_APPROVAL_MODE, model_override=str((task or {}).get("model_override") or ""), - tools_override=(task or {}).get("tools_override"), - skills_override=(task or {}).get("skills_override"), + tools_override=_workflow_profile_tool_allowlist(profile_snapshot) or None, + skills_override=_workflow_profile_skills(profile_snapshot), ) except Exception: logger.debug("Workflow agent-run mirror start failed for %s", run_id, exc_info=True) @@ -2217,22 +2872,31 @@ def _task_log(msg: str) -> None: last_response = list(step_outputs.values())[-1] # Determine effective approval mode (block/approve/allow_all) - approval_mode = get_task_approval_mode(task) - effective_tool_names = enabled_tool_names - - # Skills override — set on thread so the pre-model hook picks it up - if task.get("skills_override") is not None: - from row_bot.threads import set_thread_skills_override - set_thread_skills_override(thread_id, task["skills_override"]) - - # Apply tools_override (explicit selection from UI) - if task.get("tools_override"): - override_set = set(task["tools_override"]) - effective_tool_names = [ - t for t in effective_tool_names if t in override_set - ] + try: + runtime_policy = _workflow_profile_runtime_policy(task, enabled_tool_names) + except Exception: + logger.exception( + "Task '%s' could not resolve Agent Profile policy; falling back to Default profile", + task.get("name", ""), + ) + fallback_task = {**task, "agent_profile_id": DEFAULT_WORKFLOW_AGENT_PROFILE_ID} + runtime_policy = _workflow_profile_runtime_policy(fallback_task, enabled_tool_names) + + approval_mode = str(runtime_policy["approval_mode"]) + effective_tool_names = list(runtime_policy["effective_tool_names"]) + profile_snapshot = dict(runtime_policy["agent_profile_snapshot"]) + profile_skills = list(runtime_policy["skills_override"]) + tool_allowlist = list(runtime_policy["tool_allowlist"]) + + from row_bot.threads import _set_thread_agent_profile, _set_thread_approval_mode, set_thread_skills_override + + _set_thread_agent_profile(thread_id, str(runtime_policy["agent_profile_id"])) + _set_thread_approval_mode(thread_id, approval_mode) + set_thread_skills_override(thread_id, profile_skills) + + if tool_allowlist: logger.info( - "Task '%s' using tools_override — %d tool(s): %s", + "Task '%s' using Agent Profile tool allow-list - %d tool(s): %s", task["name"], len(effective_tool_names), effective_tool_names, ) @@ -2249,9 +2913,13 @@ def _task_log(msg: str) -> None: "runtime_surface": "workflow", "runtime_mode": "agent", "approval_mode": approval_mode, + "agent_profile_id": str(runtime_policy["agent_profile_id"]), + "agent_profile_snapshot": profile_snapshot, }, "recursion_limit": RECURSION_LIMIT_TASK, } + if tool_allowlist: + config["configurable"]["tool_allowlist"] = tool_allowlist def _format_interrupt_details(interrupts: list[dict]) -> list[str]: details = [] @@ -2999,7 +3667,12 @@ def _prepare_task_thread(task: dict) -> str: - thread_meta creation - model_override propagation """ - from row_bot.threads import _save_thread_meta, _set_thread_approval_mode, _set_thread_model_override + from row_bot.threads import ( + _save_thread_meta, + _set_thread_agent_profile, + _set_thread_approval_mode, + _set_thread_model_override, + ) thread_id = task.get("persistent_thread_id") or uuid.uuid4().hex[:12] if not task.get("notify_only"): @@ -3010,6 +3683,10 @@ def _prepare_task_thread(task: dict) -> str: _save_thread_meta(thread_id, thread_name) if task.get("model_override"): _set_thread_model_override(thread_id, task["model_override"]) + try: + _set_thread_agent_profile(thread_id, _workflow_agent_profile_ref(task)) + except Exception: + logger.debug("Could not set workflow thread Agent Profile", exc_info=True) _set_thread_approval_mode(thread_id, get_task_approval_mode(task)) return thread_id @@ -4923,6 +5600,8 @@ def add_default_workflow_templates() -> int: steps=t.get("steps"), enabled=t.get("enabled", False), channels=t.get("channels"), + agent_profile_id=DEFAULT_WORKFLOW_AGENT_PROFILE_ID, + apply_default_skills=False, ) created += 1 return created @@ -4955,6 +5634,8 @@ def seed_default_tasks() -> None: steps=t.get("steps"), enabled=t.get("enabled", False), channels=t.get("channels"), + agent_profile_id=DEFAULT_WORKFLOW_AGENT_PROFILE_ID, + apply_default_skills=False, ) open(_MARKER, "w").close() logger.info("Seeded %d default tasks", len(_DEFAULT_TASKS)) @@ -4965,6 +5646,7 @@ def seed_default_tasks() -> None: list_workflows = list_tasks create_workflow = lambda name, prompts, description="", icon="⚡", schedule=None: create_task( name=name, prompts=prompts, description=description, icon=icon, schedule=schedule, + agent_profile_id=DEFAULT_WORKFLOW_AGENT_PROFILE_ID, apply_default_skills=False, ) update_workflow = update_task delete_workflow = delete_task diff --git a/src/row_bot/tools/agent_tool.py b/src/row_bot/tools/agent_tool.py index c0d9d1ba..0863e614 100644 --- a/src/row_bot/tools/agent_tool.py +++ b/src/row_bot/tools/agent_tool.py @@ -71,6 +71,7 @@ def _public_run(run: dict[str, Any] | None) -> dict[str, Any]: "max_turns": run.get("max_turns", 0), "summary": run.get("summary", ""), "error": run.get("error", ""), + "model_override": run.get("model_override", ""), "stop_requested": bool(run.get("stop_requested", False)), "parent_message_count": len(parent_messages), "latest_parent_message": parent_messages[-1] if parent_messages else "", @@ -125,8 +126,6 @@ def _public_workflow(task: dict[str, Any] | None) -> dict[str, Any]: "advanced_mode": bool(task.get("advanced_mode", False)), "agent_profile_id": task.get("agent_profile_id", ""), "model_override": task.get("model_override", ""), - "tools_override": task.get("tools_override"), - "skills_override": task.get("skills_override"), "steps": task.get("steps", []), } @@ -162,13 +161,27 @@ class _DelegateWorkInput(BaseModel): description="Context mode: auto, focused, recent, full, empty, or resume.", ) display_name: str = Field(default="", description="Optional short display name for the child Agent run.") + model: str = Field( + default="", + description=( + "Optional active pinned Brain model canonical ref or exact pinned label for this child Agent. " + "For natural model requests, inspect row_bot_status category='model' first and pass the selected canonical ref. " + "Leave empty to inherit the parent model." + ), + ) parent_thread_id: str = Field( default="", description="Optional parent thread id. Omit to use the current thread.", ) parent_run_id: str = Field(default="", description="Optional parent Agent Run id for nested tracking.") parent_message_id: str = Field(default="", description="Optional parent message id that triggered delegation.") - wait: bool = Field(default=False, description="If true, wait briefly for the child result before returning.") + wait: bool = Field( + default=False, + description=( + "Prefer false so the child runs asynchronously and the parent thread remains responsive. " + "Use true only when the user explicitly asks you to wait or same-turn synthesis is required." + ), + ) timeout_seconds: float = Field(default=60.0, description="Maximum seconds to wait when wait=true.") @@ -226,6 +239,7 @@ def _delegate_work( context: str = "", context_mode: str = "auto", display_name: str = "", + model: str = "", parent_thread_id: str = "", parent_run_id: str = "", parent_message_id: str = "", @@ -235,6 +249,24 @@ def _delegate_work( runtime = _runtime_context() parent_thread_id = parent_thread_id or str(runtime.get("thread_id") or "") enabled_tool_names = list(runtime.get("enabled_tool_names") or ()) + model_override = "" + if str(model or "").strip(): + try: + from row_bot.providers.selection import resolve_catalog_model_selection + + resolved_model = resolve_catalog_model_selection( + model, + surface="chat", + require_agent_ready=True, + require_pinned=True, + ) + model_override = resolved_model.ref + except Exception as exc: + return _json_response({ + "ok": False, + "message": str(exc), + "model": str(model or "").strip(), + }) run = agent_runner.spawn_agent_run( objective, parent_thread_id=parent_thread_id, @@ -245,6 +277,7 @@ def _delegate_work( context=context, context_mode=context_mode, enabled_tool_names=enabled_tool_names, + model_override=model_override, wait=wait, timeout=timeout_seconds if wait else None, ) @@ -428,10 +461,6 @@ def _agent_promote(run_id: str, target: str = "profile") -> str: "prompt": "\n".join(prompt_lines), } ] - if profile_ref: - steps[0]["agent_profile_id"] = profile_ref - - tools_override = _as_list(run.get("tools_override")) task_id = tasks_db.create_task( name=f"Promoted {run.get('display_name') or run_id}", description=( @@ -441,15 +470,11 @@ def _agent_promote(run_id: str, target: str = "profile") -> str: icon="hub", steps=steps, model_override=str(run.get("model_override") or "") or None, - skills_override=_as_list(run.get("skills_override")), - tools_override=tools_override if tools_override else None, safety_mode=str(run.get("approval_mode") or "block"), agent_profile_id=profile_ref or None, enabled=False, apply_default_skills=False, ) - if not tools_override: - tasks_db.update_task(task_id, tools_override=[]) task = tasks_db.get_task(task_id) return _json_response({ "ok": True, @@ -488,7 +513,7 @@ def as_langchain_tools(self) -> list: StructuredTool.from_function( func=_delegate_work, name="delegate_work", - description="Start a child Agent for focused background work.", + description="Start a child Agent for focused async background work.", args_schema=_DelegateWorkInput, ), StructuredTool.from_function( diff --git a/src/row_bot/tools/row_bot_status_tool.py b/src/row_bot/tools/row_bot_status_tool.py index 9711ca4a..8f71ca76 100644 --- a/src/row_bot/tools/row_bot_status_tool.py +++ b/src/row_bot/tools/row_bot_status_tool.py @@ -43,7 +43,7 @@ class _StatusQueryInput(BaseModel): description=( "What to query. One of: 'overview' (full status summary), " "'version' (Row-Bot version number), " - "'model' (current model and provider info), " + "'model' (current Brain model, provider info, and pinned Brain model choices), " "'channels' (messaging channel status), " "'memory' (knowledge graph stats), " "'skills' (Skill Library availability and pinned defaults), " @@ -58,9 +58,9 @@ class _StatusQueryInput(BaseModel): "'agents' (durable Agent Runs, subagents, workflow mirrors, writer locks, and V1 defaults), " "'agent_profiles' (Agent Profile Library counts, sources, active profile, and selected tools), " "'goals' (current Goal Mode status, turn budgets, progress, and blockers), " - "'vision' (vision/camera model and settings), " - "'image_gen' (image generation model), " - "'video_gen' (video generation model), " + "'vision' (vision/camera model, settings, and pinned Vision choices), " + "'image_gen' (image generation model and pinned Image choices), " + "'video_gen' (video generation model and pinned Video choices), " "'voice' (TTS and STT settings), " "'config' (context window, dream cycle, wiki vault, memory extraction), " "'designer' (Designer project count and recent projects), " @@ -75,7 +75,7 @@ class _SettingUpdateInput(BaseModel): setting: str = Field( description=( "The setting to change. One of: " - "'model' (switch active model; value may be a local model, provider model, or Quick Choice), " + "'model' (switch active Brain model; value must be an active pinned Brain Quick Choice canonical ref or exact label), " "'vision_model' (switch Vision model; value may be an installed local vision model, provider vision model, or Vision Quick Choice), " "'name' (change assistant name), " "'personality' (change personality text), " @@ -175,8 +175,70 @@ def _surface_label(surface: str) -> str: return {"chat": "Brain", "vision": "Vision"}.get(surface, surface.replace("_", " ").title()) +def _pinned_choice_status_lines( + surface: str, + heading: str, + *, + value_label: str = "Canonical ref", + value_key: str = "canonical_ref", + limit: int | None = 8, +) -> list[str]: + try: + from row_bot.providers.selection import pinned_model_choice_summaries + + choices = pinned_model_choice_summaries(surface, include_inactive=True) + except Exception as exc: + return ["", f"**{heading}**", f"- Unavailable: {exc}"] + + lines = ["", f"**{heading}**"] + if not choices: + lines.append("- None pinned. Pin choices in Settings -> Models.") + return lines + + active = [choice for choice in choices if choice.get("active") is not False] + inactive = [choice for choice in choices if choice.get("active") is False] + if surface == "chat": + lines.append( + "- For natural Brain model requests, choose from this list and call tools with the canonical ref." + ) + shown_active = active if not limit or limit < 0 else active[:limit] + for choice in shown_active: + value = str(choice.get(value_key) or choice.get("canonical_ref") or "") + lines.append( + f"- {choice.get('display_name')} - {choice.get('provider')}: " + f"{value_label}: {value} (provider_id: {choice.get('provider_id')}, model_id: {choice.get('model_id')})" + ) + hidden_count = len(active) - len(shown_active) + if hidden_count > 0: + lines.append(f"- Plus {hidden_count} more active pinned choice(s).") + if inactive: + lines.append(f"- Not usable now: {len(inactive)} pinned choice(s)") + for choice in inactive[:3]: + value = str(choice.get(value_key) or choice.get("canonical_ref") or "") + reason = str(choice.get("reason") or "inactive") + lines.append( + f" - {choice.get('display_name')} - {choice.get('provider')}: " + f"{value_label}: {value} ({reason})" + ) + return lines + + def _resolve_model_update_value(value: str, *, surface: str) -> tuple[str | None, str | None]: """Resolve a status-tool model update to a runnable model id or an error.""" + if surface == "chat": + try: + from row_bot.providers.selection import resolve_catalog_model_selection + + resolved = resolve_catalog_model_selection( + value, + surface="chat", + require_agent_ready=True, + require_pinned=True, + ) + return resolved.ref, None + except Exception as exc: + return None, str(exc) + from row_bot.models import is_model_local, list_cloud_models, list_cloud_vision_models from row_bot.providers.capabilities import snapshot_supports_surface from row_bot.providers.selection import list_quick_choices, model_ref, resolve_selection @@ -331,13 +393,16 @@ def _query_overview() -> str: "channels", "skills", "tools", "mcp", "identity", "tasks", "agents", "agent_profiles", "goals", "insights", "evolution", "config", "designer", "updates"): try: - parts.append(_QUERY_HANDLERS[cat]()) + if cat == "model": + parts.append(_query_model(compact_pinned=True)) + else: + parts.append(_QUERY_HANDLERS[cat]()) except Exception as exc: parts.append(f"[{cat}] Error: {exc}") return "\n\n".join(parts) -def _query_model() -> str: +def _query_model(*, compact_pinned: bool = False) -> str: try: from row_bot.models import (get_current_model, get_context_size, get_provider_emoji, _active_model_override, @@ -396,6 +461,13 @@ def _query_model() -> str: lines.append(f"- Provider context cap: {get_cloud_context_size():,} tokens") if override and override != default_model: lines.append(f"- ⚠️ Override active (global default: {default_model})") + lines.extend(_pinned_choice_status_lines( + "chat", + "Pinned Brain Model Choices", + value_label="Canonical ref", + value_key="canonical_ref", + limit=8 if compact_pinned else 0, + )) return "\n".join(lines) except Exception as exc: return f"**Current Model**\nError retrieving model info: {exc}" @@ -1259,6 +1331,12 @@ def _query_vision() -> str: lines.append(f"- {label}: {vision_error}") if incompat_reason: lines.append(f"- Vision compatibility: {incompat_reason}") + lines.extend(_pinned_choice_status_lines( + "vision", + "Pinned Vision Model Choices", + value_label="Selection value", + value_key="canonical_ref", + )) return "\n".join(lines) except Exception as exc: return f"**Vision**\nError: {exc}" @@ -1285,6 +1363,12 @@ def _query_image_gen() -> str: ] if selection == DEFAULT_MODEL: lines.append(f"- (default — change in Settings → Models)") + lines.extend(_pinned_choice_status_lines( + "image", + "Pinned Image Model Choices", + value_label="Config value", + value_key="config_value", + )) return "\n".join(lines) except Exception as exc: return f"**Image Generation**\nError: {exc}" @@ -1303,6 +1387,12 @@ def _query_video_gen() -> str: ] if selection == DEFAULT_MODEL: lines.append("- (default — change in Settings → Models)") + lines.extend(_pinned_choice_status_lines( + "video", + "Pinned Video Model Choices", + value_label="Config value", + value_key="config_value", + )) return "\n".join(lines) except Exception as exc: return f"**Video Generation**\nError: {exc}" @@ -1577,6 +1667,13 @@ def interrupt(payload: dict) -> bool: value = value.strip() if setting == "model": + runtime_surface = str(_active_runtime_context().get("runtime_surface") or "").strip() + if runtime_surface in {"agent_child", "agent_child_resume"}: + return ( + "Child Agents cannot switch their own runtime model. " + "Ask the parent to spawn the child with delegate_work(model=...) " + "or use /agent --model=model:provider:model-id for explicit command spawns." + ) approval = interrupt({ "tool": "row_bot_update_setting", "label": "Change active model", @@ -2193,7 +2290,7 @@ def display_name(self) -> str: @property def description(self) -> str: return ( - "Query or change Row-Bot's own configuration: current model, " + "Query or change Row-Bot's own configuration: current model and pinned model choices, " "active channels, memory stats, skills, tools, API keys, " "identity settings, and scheduled tasks." ) @@ -2228,7 +2325,7 @@ def as_langchain_tools(self) -> list: name="row_bot_update_setting", description=( "Change a Row-Bot setting. Requires user confirmation. " - "Settings: model, vision_model, name, personality, context_size, " + "Settings: model (pinned Brain canonical ref or exact pinned label), vision_model, name, personality, context_size, " "cloud_context_size, dream_cycle (on/off), " "dream_window (e.g. '1-5'), " "skill_toggle for Skill Library Available/Off (e.g. 'deep_research:off'), " diff --git a/src/row_bot/tools/task_tool.py b/src/row_bot/tools/task_tool.py index 44262308..ff02a169 100644 --- a/src/row_bot/tools/task_tool.py +++ b/src/row_bot/tools/task_tool.py @@ -96,10 +96,14 @@ class _TaskCreateInput(BaseModel): model: str | None = Field( default=None, description=( - "Optional Ollama model name to use for this task instead of the global default " - "(e.g. 'qwen3:32b'). Leave empty to use the current brain model." + "Optional workflow-level model override. Use a provider-qualified or pinned model. " + "Leave empty to use the current brain model." ), ) + agent_profile: str | None = Field( + default=None, + description="Optional Agent Profile slug or id. Defaults to the Default profile.", + ) steps: list[dict] | None = Field( default=None, description=( @@ -203,10 +207,14 @@ class _TaskUpdateInput(BaseModel): model: str | None = Field( default=None, description=( - "Optional Ollama model name to use for this task instead of the global default. " + "Optional workflow-level model override. " "Set to empty string '' to clear and use the default model." ), ) + agent_profile: str | None = Field( + default=None, + description="Agent Profile slug or id. Set to empty string '' to use the Default profile.", + ) persistent_thread: bool | None = Field( default=None, description=( @@ -230,6 +238,7 @@ def _task_create( delivery_channel: str | None = None, delivery_target: str | None = None, model: str | None = None, + agent_profile: str | None = None, steps: list[dict] | None = None, approval_mode: str | None = None, safety_mode: str | None = None, @@ -262,6 +271,8 @@ def _task_create( persistent_thread_id=p_thread_id, steps=steps, safety_mode=effective_approval_mode, + agent_profile_id=agent_profile or tasks_db.DEFAULT_WORKFLOW_AGENT_PROFILE_ID, + apply_default_skills=False, ) task = tasks_db.get_task(task_id) @@ -371,6 +382,7 @@ def _task_update( safety_mode: str | None = None, enabled: bool | None = None, model: str | None = None, + agent_profile: str | None = None, persistent_thread: bool | None = None, ) -> str: """Update fields on an existing task.""" @@ -393,6 +405,8 @@ def _task_update( updates["enabled"] = enabled if model is not None: updates["model_override"] = model if model else None + if agent_profile is not None: + updates["agent_profile_id"] = agent_profile or tasks_db.DEFAULT_WORKFLOW_AGENT_PROFILE_ID if persistent_thread is not None: if persistent_thread: # Only generate a new ID if one doesn't already exist diff --git a/src/row_bot/ui/buddy.py b/src/row_bot/ui/buddy.py index 7256dd52..f5fd4ec5 100644 --- a/src/row_bot/ui/buddy.py +++ b/src/row_bot/ui/buddy.py @@ -35,7 +35,7 @@ } })(); - +