diff --git a/components/ambient-api-server/plugins/agents/start_handler.go b/components/ambient-api-server/plugins/agents/start_handler.go index ea8ea98e9..b2f92851a 100644 --- a/components/ambient-api-server/plugins/agents/start_handler.go +++ b/components/ambient-api-server/plugins/agents/start_handler.go @@ -56,7 +56,12 @@ func (h *startHandler) Start(w http.ResponseWriter, r *http.Request) { return } - if existing, _ := h.session.ActiveByAgentID(ctx, agentID); existing != nil { + existing, activeErr := h.session.ActiveByAgentID(ctx, agentID) + if activeErr != nil { + handlers.HandleError(ctx, w, activeErr) + return + } + if existing != nil { resp := &StartResponse{ Session: sessions.PresentSession(existing), } diff --git a/components/ambient-api-server/plugins/sessions/mock_dao.go b/components/ambient-api-server/plugins/sessions/mock_dao.go index c66593af1..a46f7d676 100644 --- a/components/ambient-api-server/plugins/sessions/mock_dao.go +++ b/components/ambient-api-server/plugins/sessions/mock_dao.go @@ -80,10 +80,16 @@ func (d *sessionDaoMock) AllByProjectId(ctx context.Context, projectId string) ( func (d *sessionDaoMock) ActiveByAgentID(ctx context.Context, agentID string) (*Session, error) { activePhases := map[string]bool{"Pending": true, "Creating": true, "Running": true} + var newest *Session for _, s := range d.sessions { if s.AgentId != nil && *s.AgentId == agentID && s.Phase != nil && activePhases[*s.Phase] { - return s, nil + if newest == nil || s.CreatedAt.After(newest.CreatedAt) { + newest = s + } } } + if newest != nil { + return newest, nil + } return nil, gorm.ErrRecordNotFound } diff --git a/components/ambient-api-server/plugins/sessions/service.go b/components/ambient-api-server/plugins/sessions/service.go index 1f04d5d47..73e573896 100644 --- a/components/ambient-api-server/plugins/sessions/service.go +++ b/components/ambient-api-server/plugins/sessions/service.go @@ -2,12 +2,14 @@ package sessions import ( "context" + stderrors "errors" "github.com/openshift-online/rh-trex-ai/pkg/api" "github.com/openshift-online/rh-trex-ai/pkg/db" "github.com/openshift-online/rh-trex-ai/pkg/errors" "github.com/openshift-online/rh-trex-ai/pkg/logger" "github.com/openshift-online/rh-trex-ai/pkg/services" + "gorm.io/gorm" ) const sessionsLockType db.LockType = "sessions" @@ -269,7 +271,10 @@ func (s *sqlSessionService) Start(ctx context.Context, id string) (*Session, *er func (s *sqlSessionService) ActiveByAgentID(ctx context.Context, agentID string) (*Session, *errors.ServiceError) { session, err := s.sessionDao.ActiveByAgentID(ctx, agentID) if err != nil { - return nil, nil + if stderrors.Is(err, gorm.ErrRecordNotFound) { + return nil, nil + } + return nil, errors.GeneralError("unable to look up active session for agent %s: %s", agentID, err) } return session, nil } diff --git a/components/ambient-cli/cmd/acpctl/login/authcode.go b/components/ambient-cli/cmd/acpctl/login/authcode.go index 4456d9d56..f7b9a594a 100644 --- a/components/ambient-cli/cmd/acpctl/login/authcode.go +++ b/components/ambient-cli/cmd/acpctl/login/authcode.go @@ -40,14 +40,19 @@ type authCodeResult struct { err error } -func runAuthCodeFlow(issuerURL, clientID, clientSecret string) (string, error) { +type tokenResult struct { + AccessToken string + RefreshToken string +} + +func runAuthCodeFlow(issuerURL, clientID, clientSecret string) (*tokenResult, error) { issuerURL = strings.TrimRight(issuerURL, "/") authorizeURL := issuerURL + "/protocol/openid-connect/auth" tokenURL := issuerURL + "/protocol/openid-connect/token" listener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { - return "", fmt.Errorf("start local callback listener: %w", err) + return nil, fmt.Errorf("start local callback listener: %w", err) } defer listener.Close() @@ -56,12 +61,12 @@ func runAuthCodeFlow(issuerURL, clientID, clientSecret string) (string, error) { state, err := generateRandomState() if err != nil { - return "", fmt.Errorf("generate state: %w", err) + return nil, fmt.Errorf("generate state: %w", err) } codeVerifier, codeChallenge, err := generatePKCE() if err != nil { - return "", fmt.Errorf("generate PKCE: %w", err) + return nil, fmt.Errorf("generate PKCE: %w", err) } authURL := buildAuthURL(authorizeURL, clientID, redirectURI, state, codeChallenge) @@ -91,19 +96,19 @@ func runAuthCodeFlow(issuerURL, clientID, clientSecret string) (string, error) { select { case result = <-resultCh: case <-ctx.Done(): - return "", fmt.Errorf("timed out waiting for authorization callback (%.0fs)", callbackTimeout.Seconds()) + return nil, fmt.Errorf("timed out waiting for authorization callback (%.0fs)", callbackTimeout.Seconds()) } if result.err != nil { - return "", fmt.Errorf("authorization failed: %w", result.err) + return nil, fmt.Errorf("authorization failed: %w", result.err) } - token, err := exchangeCodeForToken(tokenURL, clientID, clientSecret, result.code, redirectURI, codeVerifier) + tokens, err := exchangeCodeForTokens(tokenURL, clientID, clientSecret, result.code, redirectURI, codeVerifier) if err != nil { - return "", fmt.Errorf("exchange authorization code: %w", err) + return nil, fmt.Errorf("exchange authorization code: %w", err) } - return token, nil + return tokens, nil } func generateRandomState() (string, error) { @@ -182,6 +187,14 @@ func callbackHandler(expectedState string, resultCh chan<- authCodeResult) http. } func exchangeCodeForToken(tokenURL, clientID, clientSecret, code, redirectURI, codeVerifier string) (string, error) { + tokens, err := exchangeCodeForTokens(tokenURL, clientID, clientSecret, code, redirectURI, codeVerifier) + if err != nil { + return "", err + } + return tokens.AccessToken, nil +} + +func exchangeCodeForTokens(tokenURL, clientID, clientSecret, code, redirectURI, codeVerifier string) (*tokenResult, error) { params := url.Values{ "grant_type": {"authorization_code"}, "code": {code}, @@ -193,25 +206,23 @@ func exchangeCodeForToken(tokenURL, clientID, clientSecret, code, redirectURI, c params.Set("client_secret", clientSecret) } - // Raw net/http is intentional here: this call goes to an external OIDC provider - // (RH SSO), not the Ambient API server. The SDK client is for Ambient API calls only. httpClient := &http.Client{Timeout: 30 * time.Second} resp, err := httpClient.PostForm(tokenURL, params) if err != nil { - return "", fmt.Errorf("POST to token endpoint: %w", err) + return nil, fmt.Errorf("POST to token endpoint: %w", err) } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { - return "", fmt.Errorf("read token response: %w", err) + return nil, fmt.Errorf("read token response: %w", err) } if resp.StatusCode != http.StatusOK { - return "", tokenEndpointError(resp.StatusCode, body) + return nil, tokenEndpointError(resp.StatusCode, body) } - return parseTokenResponse(body) + return parseTokensResponse(body) } func tokenEndpointError(statusCode int, body []byte) error { @@ -231,16 +242,28 @@ func tokenEndpointError(statusCode int, body []byte) error { } func parseTokenResponse(body []byte) (string, error) { + tokens, err := parseTokensResponse(body) + if err != nil { + return "", err + } + return tokens.AccessToken, nil +} + +func parseTokensResponse(body []byte) (*tokenResult, error) { var resp struct { - AccessToken string `json:"access_token"` + AccessToken string `json:"access_token"` + RefreshToken string `json:"refresh_token"` } if err := json.Unmarshal(body, &resp); err != nil { - return "", fmt.Errorf("parse token response: %w", err) + return nil, fmt.Errorf("parse token response: %w", err) } if resp.AccessToken == "" { - return "", errors.New("no access_token in token response") + return nil, errors.New("no access_token in token response") } - return resp.AccessToken, nil + return &tokenResult{ + AccessToken: resp.AccessToken, + RefreshToken: resp.RefreshToken, + }, nil } func openBrowser(target string) error { diff --git a/components/ambient-cli/cmd/acpctl/login/authcode_test.go b/components/ambient-cli/cmd/acpctl/login/authcode_test.go index 3f03b7f56..22580c565 100644 --- a/components/ambient-cli/cmd/acpctl/login/authcode_test.go +++ b/components/ambient-cli/cmd/acpctl/login/authcode_test.go @@ -308,6 +308,71 @@ func TestExchangeCodeForToken_ErrorResponse(t *testing.T) { } } +func TestParseTokensResponse_WithRefreshToken(t *testing.T) { + body, _ := json.Marshal(map[string]string{ + "access_token": "my-access-token", + "refresh_token": "my-refresh-token", + "token_type": "Bearer", + }) + + result, err := parseTokensResponse(body) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result.AccessToken != "my-access-token" { + t.Errorf("expected %q, got %q", "my-access-token", result.AccessToken) + } + if result.RefreshToken != "my-refresh-token" { + t.Errorf("expected %q, got %q", "my-refresh-token", result.RefreshToken) + } +} + +func TestParseTokensResponse_NoRefreshToken(t *testing.T) { + body, _ := json.Marshal(map[string]string{ + "access_token": "my-access-token", + "token_type": "Bearer", + }) + + result, err := parseTokensResponse(body) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result.AccessToken != "my-access-token" { + t.Errorf("expected %q, got %q", "my-access-token", result.AccessToken) + } + if result.RefreshToken != "" { + t.Errorf("expected empty refresh token, got %q", result.RefreshToken) + } +} + +func TestParseTokensResponse_MissingAccessToken(t *testing.T) { + body, _ := json.Marshal(map[string]string{"refresh_token": "refresh-only"}) + + _, err := parseTokensResponse(body) + if err == nil { + t.Fatal("expected error for missing access_token") + } +} + +func TestExchangeCodeForTokens_ReturnsRefresh(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `{"access_token":"access-123","refresh_token":"refresh-456","token_type":"Bearer"}`) + })) + defer srv.Close() + + result, err := exchangeCodeForTokens(srv.URL, "client-id", "", "mycode", "http://127.0.0.1:9/callback", "myverifier") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result.AccessToken != "access-123" { + t.Errorf("expected %q, got %q", "access-123", result.AccessToken) + } + if result.RefreshToken != "refresh-456" { + t.Errorf("expected %q, got %q", "refresh-456", result.RefreshToken) + } +} + func TestCallbackHandler_OAuthErrorFallsBackToErrorCode(t *testing.T) { resultCh := make(chan authCodeResult, 1) handler := callbackHandler("state", resultCh) diff --git a/components/ambient-cli/cmd/acpctl/login/cmd.go b/components/ambient-cli/cmd/acpctl/login/cmd.go index 7328d2977..0c5e92725 100644 --- a/components/ambient-cli/cmd/acpctl/login/cmd.go +++ b/components/ambient-cli/cmd/acpctl/login/cmd.go @@ -84,13 +84,19 @@ func run(cmd *cobra.Command, positional []string) error { var accessToken string if args.useAuthCode { - token, err := runAuthCodeFlow(args.issuerURL, args.clientID, args.clientSecret) + tokens, err := runAuthCodeFlow(args.issuerURL, args.clientID, args.clientSecret) if err != nil { return fmt.Errorf("auth-code login: %w", err) } - accessToken = token + accessToken = tokens.AccessToken + cfg.RefreshToken = tokens.RefreshToken + cfg.IssuerURL = args.issuerURL + cfg.ClientID = args.clientID } else { accessToken = args.token + cfg.RefreshToken = "" + cfg.IssuerURL = "" + cfg.ClientID = "" } cfg.AccessToken = accessToken diff --git a/components/ambient-cli/cmd/acpctl/session/cmd.go b/components/ambient-cli/cmd/acpctl/session/cmd.go index 267c9fa75..0dbe9f802 100644 --- a/components/ambient-cli/cmd/acpctl/session/cmd.go +++ b/components/ambient-cli/cmd/acpctl/session/cmd.go @@ -12,7 +12,8 @@ var Cmd = &cobra.Command{ Examples: acpctl session messages # list messages (snapshot) - acpctl session messages -f # live SSE stream (reconnectable) + acpctl session messages -f # live SSE stream (ends at turn end) + acpctl session messages -F # continuous follow (Ctrl+C to stop) acpctl session send "Hello!" # send a message acpctl session send "Hello!" -f # send and stream until done acpctl session events # raw AG-UI event stream`, diff --git a/components/ambient-cli/cmd/acpctl/session/messages.go b/components/ambient-cli/cmd/acpctl/session/messages.go index 388c4289e..c9d959e84 100644 --- a/components/ambient-cli/cmd/acpctl/session/messages.go +++ b/components/ambient-cli/cmd/acpctl/session/messages.go @@ -20,10 +20,11 @@ import ( ) var msgArgs struct { - follow bool - followJSON bool - outputFormat string - afterSeq int + follow bool + followContinuous bool + followJSON bool + outputFormat string + afterSeq int } var messagesCmd = &cobra.Command{ @@ -33,13 +34,16 @@ var messagesCmd = &cobra.Command{ Without -f, fetches a snapshot of messages from the message store. With -f, connects to the live SSE event stream and renders events -as readable text. The stream stays open until Ctrl+C. -With -f --json, emits raw AG-UI JSON events instead of text. +as readable text. The stream ends when the current turn finishes. +With -F, continuously follows the stream, reconnecting after each +turn ends. The stream stays open until Ctrl+C. +With -f --json or -F --json, emits raw AG-UI JSON events instead of text. Examples: acpctl session messages # snapshot - acpctl session messages -f # live SSE stream (Ctrl+C to stop) - acpctl session messages -f --json # raw AG-UI JSON events + acpctl session messages -f # live stream (ends at turn end) + acpctl session messages -F # continuous follow (Ctrl+C to stop) + acpctl session messages -F --json # continuous raw AG-UI JSON events acpctl session messages -o json # JSON snapshot acpctl session messages --after 5 # messages after seq 5`, Args: cobra.ExactArgs(1), @@ -47,8 +51,9 @@ Examples: } func init() { - messagesCmd.Flags().BoolVarP(&msgArgs.follow, "follow", "f", false, "Stream live SSE events (same stream as send -f)") - messagesCmd.Flags().BoolVar(&msgArgs.followJSON, "json", false, "with -f: emit raw AG-UI JSON events instead of text") + messagesCmd.Flags().BoolVarP(&msgArgs.follow, "follow", "f", false, "Stream live SSE events until the current turn ends") + messagesCmd.Flags().BoolVarP(&msgArgs.followContinuous, "follow-continuous", "F", false, "Continuously follow SSE events, reconnecting between turns (Ctrl+C to stop)") + messagesCmd.Flags().BoolVar(&msgArgs.followJSON, "json", false, "with -f/-F: emit raw AG-UI JSON events instead of text") messagesCmd.Flags().StringVarP(&msgArgs.outputFormat, "output", "o", "", "Output format: json") messagesCmd.Flags().IntVar(&msgArgs.afterSeq, "after", 0, "Only show messages after this sequence number") } @@ -61,6 +66,9 @@ func runMessages(cmd *cobra.Command, args []string) error { return err } + if msgArgs.followContinuous { + return streamMessagesContinuous(cmd, client, sessionID) + } if msgArgs.follow { return streamMessages(cmd, client, sessionID) } @@ -416,6 +424,50 @@ func streamMessages(cmd *cobra.Command, client *sdkclient.Client, sessionID stri return renderSSEStream(stream, cmd.OutOrStdout(), msgArgs.followJSON, false) } +func streamMessagesContinuous(cmd *cobra.Command, client *sdkclient.Client, sessionID string) error { + ctx, cancel := signal.NotifyContext(cmd.Context(), os.Interrupt) + defer cancel() + + out := cmd.OutOrStdout() + fmt.Fprintf(out, "Continuously following session %s (Ctrl+C to stop)...\n\n", sessionID) + + const reconnectDelay = 3 * time.Second + + for { + stream, err := client.Sessions().StreamEvents(ctx, sessionID) + if err != nil { + if ctx.Err() != nil { + return nil + } + fmt.Fprintf(out, "[reconnect] stream not available: %v — retrying in %s\n", err, reconnectDelay) + select { + case <-ctx.Done(): + return nil + case <-time.After(reconnectDelay): + continue + } + } + + streamErr := renderSSEStream(stream, out, msgArgs.followJSON, false) + stream.Close() + + if ctx.Err() != nil { + return nil + } + + if streamErr != nil { + fmt.Fprintf(out, "\n[reconnect] stream ended: %v — reconnecting in %s\n", streamErr, reconnectDelay) + select { + case <-ctx.Done(): + return nil + case <-time.After(reconnectDelay): + } + } else { + fmt.Fprintf(out, "\n[reconnect] stream ended cleanly — reconnecting immediately\n") + } + } +} + func renderSSEStream(stream io.Reader, out io.Writer, jsonMode, exitOnRunFinished bool) error { scanner := bufio.NewScanner(stream) var reasoningBuf strings.Builder diff --git a/components/ambient-cli/cmd/acpctl/session/send.go b/components/ambient-cli/cmd/acpctl/session/send.go index 76c8008e0..7c52b2a27 100644 --- a/components/ambient-cli/cmd/acpctl/session/send.go +++ b/components/ambient-cli/cmd/acpctl/session/send.go @@ -3,7 +3,6 @@ package session import ( "context" "fmt" - "io" "os" "os/signal" @@ -54,19 +53,6 @@ func runSend(cmd *cobra.Command, args []string) error { ctx, cancel := context.WithTimeout(cmd.Context(), cfg.GetRequestTimeout()) defer cancel() - streamCtx, streamCancel := signal.NotifyContext(cmd.Context(), os.Interrupt) - defer streamCancel() - - var stream io.ReadCloser - if sendFollow { - s, err := client.Sessions().StreamEvents(streamCtx, sessionID) - if err != nil { - return fmt.Errorf("stream events: %w", err) - } - stream = s - defer stream.Close() - } - msg, err := client.Sessions().PushMessage(ctx, sessionID, payload) if err != nil { return fmt.Errorf("send message: %w", err) @@ -78,5 +64,19 @@ func runSend(cmd *cobra.Command, args []string) error { return nil } + // NOTE: We intentionally open the SSE stream AFTER sending the message. + // The api-server SSE proxy does not return HTTP 200 until the runner has + // an active run, so subscribing first would deadlock. The event gap is + // negligible in practice because the runner takes time to start producing + // events after receiving the message. + streamCtx, streamCancel := signal.NotifyContext(cmd.Context(), os.Interrupt) + defer streamCancel() + + stream, err := client.Sessions().StreamEvents(streamCtx, sessionID) + if err != nil { + return fmt.Errorf("stream events: %w", err) + } + defer stream.Close() + return renderSSEStream(stream, cmd.OutOrStdout(), sendFollowJSON, true) } diff --git a/components/ambient-cli/demo-remote.sh b/components/ambient-cli/demo-remote.sh new file mode 100755 index 000000000..770fa0e92 --- /dev/null +++ b/components/ambient-cli/demo-remote.sh @@ -0,0 +1,438 @@ +#!/usr/bin/env bash +# demo-remote.sh — acpctl two-agent conversation demo against a remote deployment +# +# Layout: 2-pane tmux — left = main demo, right = live message watch. +# Creates two sessions ("historian" and "poet"). They have an extended +# conversation about Charleston, SC by exchanging messages via push_message. +# The script relays each agent's response to the other agent. +# +# Usage: +# ./demo-remote.sh https://ambient-api-server-ambient-code--ambient-s5.apps.int.spoke.prod.us-west-2.aws.paas.redhat.com/ +# AMBIENT_API_URL=https://... ./demo-remote.sh +# +# Optional env: +# AMBIENT_API_URL — API server URL (or pass as positional arg) +# AMBIENT_TOKEN — pre-existing token (skips browser login) +# ACPCTL — path to acpctl binary (default: acpctl from PATH) +# PAUSE — seconds between demo steps (default: 2) +# SESSION_READY_TIMEOUT — seconds to wait for Running (default: 180) +# MESSAGE_WAIT_TIMEOUT — seconds to wait for messages (default: 120) +# INSECURE_TLS — set to 1 to skip TLS verification (default: unset) + +set -euo pipefail + +# ── tmux layout bootstrap ────────────────────────────────────────────────────── + +TMUX_SESSION="ambient-demo" +DEMO_SCRIPT="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)/$(basename "${BASH_SOURCE[0]}")" +DEMO_ARGS=("${@}") + +if [[ -z "${TMUX:-}" ]]; then + tmux kill-session -t "$TMUX_SESSION" 2>/dev/null || true + tmux new-session -d -s "$TMUX_SESSION" -x 220 -y 50 + + tmux split-window -h -t "${TMUX_SESSION}:0" -p 40 + + tmux send-keys -t "${TMUX_SESSION}:0.1" "printf '\\033[2m[watch panel — waiting for session]\\033[0m\\n'" Enter + + ESCAPED_ARGS="" + for arg in "${DEMO_ARGS[@]}"; do + ESCAPED_ARGS+="$(printf ' %q' "$arg")" + done + tmux send-keys -t "${TMUX_SESSION}:0.0" \ + "TMUX_SESSION=$(printf '%q' "$TMUX_SESSION") INSIDE_DEMO_TMUX=1 bash $(printf '%q' "$DEMO_SCRIPT")${ESCAPED_ARGS}" Enter + + tmux select-pane -t "${TMUX_SESSION}:0.0" + tmux attach-session -t "$TMUX_SESSION" + exit 0 +fi + +WATCH_PANE="1" + +attach_watch() { + local session_id="$1" + tmux send-keys -t "${TMUX_SESSION}:0.${WATCH_PANE}" \ + "$(printf '%q' "${ACPCTL:-acpctl}") session messages $(printf '%q' "$session_id") -F" Enter +} + +# ── config ──────────────────────────────────────────────────────────────────── + +ACPCTL="${ACPCTL:-acpctl}" +PAUSE="${PAUSE:-2}" +SESSION_READY_TIMEOUT="${SESSION_READY_TIMEOUT:-180}" +MESSAGE_WAIT_TIMEOUT="${MESSAGE_WAIT_TIMEOUT:-120}" +INSECURE_TLS="${INSECURE_TLS:-}" + +if [[ $# -ge 1 ]]; then + AMBIENT_API_URL="${1}" +fi +AMBIENT_API_URL="${AMBIENT_API_URL:-}" + +if [[ -z "${AMBIENT_API_URL}" ]]; then + AMBIENT_API_URL=$("$ACPCTL" config get api_url 2>/dev/null || true) +fi + +if [[ -z "${AMBIENT_API_URL}" ]]; then + printf '\033[31merror: no API URL. Pass as argument or set AMBIENT_API_URL.\033[0m\n' >&2 + exit 1 +fi + +AMBIENT_API_URL="${AMBIENT_API_URL%/}" + +# ── helpers ──────────────────────────────────────────────────────────────────── + +bold() { printf '\033[1m%s\033[0m\n' "$*"; } +dim() { printf '\033[2m%s\033[0m\n' "$*"; } +cyan() { printf '\033[36m%s\033[0m\n' "$*"; } +green() { printf '\033[32m%s\033[0m\n' "$*"; } +yellow(){ printf '\033[33m%s\033[0m\n' "$*"; } +red() { printf '\033[31m%s\033[0m\n' "$*"; } +sep() { printf '\033[2m%s\033[0m\n' "──────────────────────────────────────────────────"; } + +step() { + local description="$1" + shift + echo + sep + bold "▶ $description" + printf '\033[38;5;214m $ %s\033[0m\n' "$*" + sleep "$PAUSE" + "$@" + echo +} + +announce() { + echo + sep + cyan "━━ $*" + sep + sleep "$PAUSE" +} + +# ── preflight ────────────────────────────────────────────────────────────────── + +if ! command -v "$ACPCTL" &>/dev/null; then + red "error: ${ACPCTL} not found. Set ACPCTL=/path/to/acpctl or add to PATH." >&2; exit 1 +fi + +cleanup() { + : +} +trap cleanup EXIT + +# ── intro ───────────────────────────────────────────────────────────────────── + +echo +bold "Ambient CLI Demo — Two-Agent Conversation (remote)" +dim " API: ${AMBIENT_API_URL}" + +echo +sep +bold "What this demo will do:" +echo +printf ' %s\n' "1. Log in and create a project" +printf ' %s\n' "2. Create two sessions: historian and poet" +printf ' %s\n' "3. Wait for both to reach Running" +printf ' %s\n' "4. Attach live message watch for the poet (right panel)" +printf ' %s\n' "5. Seed the historian with Charleston context" +printf ' %s\n' "6. Relay the historian's response to the poet via push_message" +printf ' %s\n' "7. Relay the poet's response back to the historian" +printf ' %s\n' "8. Continue the relay for an extended conversation" +printf ' %s\n' "9. Show final state and clean up" +echo +printf ' \033[38;5;214m%-38s\033[0m %s\n' "Orange text like this" "= a terminal command being run" +echo +sep +if [[ "${PAUSE}" -gt 0 ]] 2>/dev/null; then + bold " Press Enter to begin..." + read -r +fi + +# ── session helpers ─────────────────────────────────────────────────────────── + +wait_for_phase() { + local session_id="$1" target_phase="$2" + local deadline=$(( $(date +%s) + SESSION_READY_TIMEOUT )) + local last_phase="" + printf ' waiting for %s (timeout %ds)...\n' "${target_phase}" "${SESSION_READY_TIMEOUT}" + while true; do + local phase + phase=$( + "$ACPCTL" get session "$session_id" -o json 2>/dev/null \ + | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('phase',''))" 2>/dev/null || true + ) + if [[ "$phase" != "$last_phase" ]]; then + printf ' phase: %s\n' "$phase" + last_phase="$phase" + fi + [[ "$phase" == "$target_phase" ]] && { green " ✓ session is ${target_phase}"; return 0; } + [[ "$phase" == "Failed" || "$phase" == "Stopped" ]] && { red " ✗ session is ${phase}"; return 1; } + [[ $(date +%s) -ge $deadline ]] && { yellow " ✗ timed out (phase=${phase:-unknown})"; return 1; } + sleep 3 + done +} + +create_session() { + local name="$1" + local json + json=$( + "$ACPCTL" create session \ + --name "$name" \ + -o json 2>/dev/null + ) + local sid + sid=$(echo "$json" | python3 -c "import sys,json; print(json.load(sys.stdin)['id'])" 2>/dev/null) + if [[ -z "$sid" ]]; then + red " ✗ failed to parse session ID for ${name}" + return 1 + fi + echo "$sid" +} + +extract_last_assistant_text() { + local session_id="$1" + "$ACPCTL" session messages "${session_id}" -o json 2>/dev/null \ + | python3 -c " +import sys, json +try: + msgs = json.load(sys.stdin) + for m in reversed(msgs): + if m.get('event_type') == 'MESSAGES_SNAPSHOT': + snapshot = json.loads(m.get('payload', '[]')) + for msg in reversed(snapshot): + if msg.get('role') == 'assistant': + content = msg.get('content', '') + if isinstance(content, list): + parts = [] + for p in content: + if isinstance(p, dict) and p.get('type') == 'text': + parts.append(p.get('text', '')) + content = ' '.join(parts) + if content.strip(): + print(content.strip()) + sys.exit(0) +except Exception: + pass +" 2>/dev/null || true +} + +send_and_capture() { + local session_id="$1" label="$2" msg="$3" + + echo; sep + bold "▶ → ${label}: sending message" + if [[ ${#msg} -gt 120 ]]; then + dim " ${msg:0:117}..." + else + dim " ${msg}" + fi + sleep "$PAUSE" + + bold "▶ → ${label}: waiting for response..." + if timeout "${MESSAGE_WAIT_TIMEOUT}" "$ACPCTL" session send "${session_id}" "$msg" -f 2>&1; then + green " ✓ ${label} responded" + else + red " ✗ ${label}: send -f failed or timed out — aborting demo" + exit 1 + fi + echo + + LAST_RESPONSE=$(extract_last_assistant_text "${session_id}") + + bold "▶ ${label} says:" + if [[ -n "${LAST_RESPONSE}" ]]; then + echo " ${LAST_RESPONSE}" | fold -s -w 76 | sed 's/^/ /' + else + dim " (no response captured)" + fi + echo + sleep "$PAUSE" +} + +# ── section 1: login ────────────────────────────────────────────────────────── + +announce "1 · Log in" + +if [[ -n "${AMBIENT_TOKEN:-}" ]]; then + TLS_FLAG="" + if [[ -n "${INSECURE_TLS}" ]]; then + TLS_FLAG="--insecure-skip-tls-verify" + fi + step "Log in with existing token" \ + "$ACPCTL" login "${AMBIENT_API_URL}" \ + --token "${AMBIENT_TOKEN}" \ + ${TLS_FLAG} +else + TLS_FLAG="" + if [[ -n "${INSECURE_TLS}" ]]; then + TLS_FLAG="--insecure-skip-tls-verify" + fi + step "Log in via browser (Red Hat SSO)" \ + "$ACPCTL" login --use-auth-code \ + --url "${AMBIENT_API_URL}" \ + ${TLS_FLAG} +fi + +step "Show authenticated user" \ + "$ACPCTL" whoami + +# ── section 2: project ─────────────────────────────────────────────────────── + +announce "2 · Create project" + +RUN_ID=$(date +%s | tail -c5) +PROJECT_NAME="demo-${RUN_ID}" + +step "Create project: ${PROJECT_NAME}" \ + "$ACPCTL" create project \ + --name "${PROJECT_NAME}" \ + --display-name "Demo Project ${RUN_ID}" \ + --description "two-agent conversation demo" + +step "Set project context" \ + "$ACPCTL" project "${PROJECT_NAME}" + +step "Confirm project context" \ + "$ACPCTL" project current + +# ── section 3: create both sessions ────────────────────────────────────────── + +announce "3 · Create two sessions" + +sep; bold "▶ Create session: historian"; sleep "$PAUSE" +HISTORIAN_ID=$(create_session "historian") +dim " historian ID: ${HISTORIAN_ID}"; echo + +sep; bold "▶ Create session: poet"; sleep "$PAUSE" +POET_ID=$(create_session "poet") +dim " poet ID: ${POET_ID}"; echo + +step "List sessions" \ + "$ACPCTL" get sessions + +# ── section 4: wait for both to reach Running ──────────────────────────────── + +announce "4 · Wait for both sessions to reach Running" + +bold "▶ Waiting for historian..." +wait_for_phase "${HISTORIAN_ID}" "Running" || { red " historian did not reach Running"; exit 1; } +echo + +bold "▶ Waiting for poet..." +wait_for_phase "${POET_ID}" "Running" || { red " poet did not reach Running"; exit 1; } +echo + +# ── attach watch for poet in the right panel ───────────────────────────────── + +announce "5 · Attach live watch for poet session" + +dim " attaching poet message watch to right panel..." +sleep 2 +attach_watch "${POET_ID}" +sleep 3 +green " ✓ right panel watching poet messages (Ctrl+C to stop)" + +# ── section 6: seed the historian ───────────────────────────────────────────── + +announce "6 · Seed the historian with Charleston context" + +LAST_RESPONSE="" +send_and_capture "${HISTORIAN_ID}" "historian" \ + "You are a Charleston, SC historian. Tell me about what makes Charleston's geography and ecology unique — the salt marshes, barrier islands, tidal creeks, and how they shape life in the Lowcountry. Keep your response to 2-3 sentences." + +HISTORIAN_OPENING="${LAST_RESPONSE}" + +# ── section 7: relay to poet ───────────────────────────────────────────────── + +announce "7 · Relay historian's response → poet (watch the right panel!)" + +dim " Forwarding the historian's words to the poet via push_message..." +sleep "$PAUSE" + +send_and_capture "${POET_ID}" "poet" \ + "You are a Charleston poet. A historian just told you this about Charleston: \"${HISTORIAN_OPENING}\" — Respond with your own poetic perspective on what the historian described. How does this landscape feel, smell, sound? 2-3 sentences, evocative language." + +POET_RESPONSE="${LAST_RESPONSE}" + +# ── section 8: relay back to historian ──────────────────────────────────────── + +announce "8 · Relay poet's response → historian" + +dim " Forwarding the poet's words back to the historian..." +sleep "$PAUSE" + +send_and_capture "${HISTORIAN_ID}" "historian" \ + "A poet responded to your description with this: \"${POET_RESPONSE}\" — Now tell me about one specific historical event or tradition in Charleston that connects to the marshes or the sea. 2-3 sentences." + +HISTORIAN_HISTORY="${LAST_RESPONSE}" + +# ── section 9: relay history back to poet ───────────────────────────────────── + +announce "9 · Relay historian's history → poet for a final poem" + +dim " Forwarding the historian's story back to the poet..." +sleep "$PAUSE" + +send_and_capture "${POET_ID}" "poet" \ + "The historian shared this story: \"${HISTORIAN_HISTORY}\" — Write a 4-line poem inspired by what the historian told you. Mention the marshes and the sea." + +# ── section 10: final summary ───────────────────────────────────────────────── + +announce "10 · Conversation summary" + +echo +bold "The two agents had the following exchange about Charleston:" +echo +cyan " HISTORIAN (opening):" +echo " ${HISTORIAN_OPENING}" | fold -s -w 76 | sed 's/^/ /' +echo +cyan " POET (response):" +echo " ${POET_RESPONSE}" | fold -s -w 76 | sed 's/^/ /' +echo +cyan " HISTORIAN (history):" +echo " ${HISTORIAN_HISTORY}" | fold -s -w 76 | sed 's/^/ /' +echo +cyan " POET (final poem):" +echo " ${LAST_RESPONSE}" | fold -s -w 76 | sed 's/^/ /' +echo + +sep +sleep "$PAUSE" + +step "Historian — all messages" \ + "$ACPCTL" session messages "${HISTORIAN_ID}" + +step "Poet — all messages" \ + "$ACPCTL" session messages "${POET_ID}" + +# ── section 11: cleanup ────────────────────────────────────────────────────── + +announce "11 · Stop and clean up" + +sep; bold "▶ Stop historian"; sleep "$PAUSE" +"$ACPCTL" stop "${HISTORIAN_ID}" || true; echo + +sep; bold "▶ Stop poet"; sleep "$PAUSE" +"$ACPCTL" stop "${POET_ID}" || true; echo + +sleep 3 + +step "Delete historian session" \ + "$ACPCTL" delete session "${HISTORIAN_ID}" -y + +step "Delete poet session" \ + "$ACPCTL" delete session "${POET_ID}" -y + +step "Delete project ${PROJECT_NAME}" \ + "$ACPCTL" delete project "${PROJECT_NAME}" -y + +step "Confirm cleanup" \ + "$ACPCTL" get projects + +# ── done ────────────────────────────────────────────────────────────────────── + +echo +sep +green " Demo complete ✓" +sep +echo diff --git a/components/ambient-cli/pkg/config/config.go b/components/ambient-cli/pkg/config/config.go index 4b09ca0ed..5c7003346 100644 --- a/components/ambient-cli/pkg/config/config.go +++ b/components/ambient-cli/pkg/config/config.go @@ -13,6 +13,9 @@ import ( type Config struct { APIUrl string `json:"api_url,omitempty"` AccessToken string `json:"access_token,omitempty"` + RefreshToken string `json:"refresh_token,omitempty"` + IssuerURL string `json:"issuer_url,omitempty"` + ClientID string `json:"client_id,omitempty"` Project string `json:"project,omitempty"` Pager string `json:"pager,omitempty"` // TODO: Wire pager support into output commands (e.g. pipe through less) RequestTimeout int `json:"request_timeout,omitempty"` // Request timeout in seconds @@ -109,6 +112,41 @@ func (c *Config) GetToken() string { return c.AccessToken } +func (c *Config) GetTokenWithRefresh() (string, error) { + if env := os.Getenv("AMBIENT_TOKEN"); env != "" { + return env, nil + } + + if c.AccessToken == "" { + return "", nil + } + + expired, err := IsTokenExpired(c.AccessToken) + if err != nil || !expired { + return c.AccessToken, nil + } + + if c.RefreshToken == "" || c.IssuerURL == "" || c.ClientID == "" { + return c.AccessToken, nil + } + + newAccess, newRefresh, refreshErr := RefreshAccessToken(c.IssuerURL, c.ClientID, c.RefreshToken) + if refreshErr != nil { + return c.AccessToken, nil + } + + c.AccessToken = newAccess + if newRefresh != "" { + c.RefreshToken = newRefresh + } + + if saveErr := Save(c); saveErr != nil { + fmt.Fprintf(os.Stderr, "warning: failed to persist refreshed token: %v\n", saveErr) + } + + return c.AccessToken, nil +} + // GetRequestTimeout returns the request timeout duration with fallback to default func (c *Config) GetRequestTimeout() time.Duration { if env := os.Getenv("AMBIENT_REQUEST_TIMEOUT"); env != "" { diff --git a/components/ambient-cli/pkg/config/token.go b/components/ambient-cli/pkg/config/token.go index 69dea93c3..98ba95950 100644 --- a/components/ambient-cli/pkg/config/token.go +++ b/components/ambient-cli/pkg/config/token.go @@ -1,7 +1,11 @@ package config import ( + "encoding/json" "fmt" + "io" + "net/http" + "net/url" "strings" "time" @@ -47,3 +51,49 @@ func IsTokenExpired(tokenStr string) (bool, error) { return time.Now().After(expiry), nil } + +func RefreshAccessToken(issuerURL, clientID, refreshToken string) (accessToken, newRefreshToken string, err error) { + tokenURL := strings.TrimRight(issuerURL, "/") + "/protocol/openid-connect/token" + + params := url.Values{ + "grant_type": {"refresh_token"}, + "client_id": {clientID}, + "refresh_token": {refreshToken}, + } + + httpClient := &http.Client{Timeout: 30 * time.Second} + resp, err := httpClient.PostForm(tokenURL, params) + if err != nil { + return "", "", fmt.Errorf("POST to token endpoint: %w", err) + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return "", "", fmt.Errorf("read token response: %w", err) + } + + if resp.StatusCode != http.StatusOK { + var errResp struct { + Error string `json:"error"` + ErrorDescription string `json:"error_description"` + } + if jsonErr := json.Unmarshal(body, &errResp); jsonErr == nil && errResp.ErrorDescription != "" { + return "", "", fmt.Errorf("token refresh: %s", errResp.ErrorDescription) + } + return "", "", fmt.Errorf("token refresh: HTTP %d", resp.StatusCode) + } + + var tokenResp struct { + AccessToken string `json:"access_token"` + RefreshToken string `json:"refresh_token"` + } + if err := json.Unmarshal(body, &tokenResp); err != nil { + return "", "", fmt.Errorf("parse token response: %w", err) + } + if tokenResp.AccessToken == "" { + return "", "", fmt.Errorf("no access_token in refresh response") + } + + return tokenResp.AccessToken, tokenResp.RefreshToken, nil +} diff --git a/components/ambient-cli/pkg/config/token_test.go b/components/ambient-cli/pkg/config/token_test.go index ee4a3fb53..fc0fadeb6 100644 --- a/components/ambient-cli/pkg/config/token_test.go +++ b/components/ambient-cli/pkg/config/token_test.go @@ -1,6 +1,10 @@ package config import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" "testing" "time" @@ -96,3 +100,143 @@ func TestIsTokenExpiredNoExp(t *testing.T) { t.Error("expected non-expired for token without exp claim") } } + +func TestRefreshAccessToken_Success(t *testing.T) { + newAccessToken := makeJWT(jwt.MapClaims{"exp": float64(time.Now().Add(1 * time.Hour).Unix())}) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Errorf("expected POST, got %s", r.Method) + } + if err := r.ParseForm(); err != nil { + t.Fatalf("parse form: %v", err) + } + if r.FormValue("grant_type") != "refresh_token" { + t.Errorf("expected grant_type=refresh_token, got %q", r.FormValue("grant_type")) + } + if r.FormValue("refresh_token") != "my-refresh-token" { + t.Errorf("expected refresh_token=my-refresh-token, got %q", r.FormValue("refresh_token")) + } + if r.FormValue("client_id") != "test-client" { + t.Errorf("expected client_id=test-client, got %q", r.FormValue("client_id")) + } + w.Header().Set("Content-Type", "application/json") + fmt.Fprintf(w, `{"access_token":%q,"refresh_token":"new-refresh"}`, newAccessToken) + })) + defer srv.Close() + + access, refresh, err := RefreshAccessToken(srv.URL, "test-client", "my-refresh-token") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if access != newAccessToken { + t.Errorf("access token mismatch") + } + if refresh != "new-refresh" { + t.Errorf("expected new-refresh, got %q", refresh) + } +} + +func TestRefreshAccessToken_ErrorResponse(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusBadRequest) + fmt.Fprint(w, `{"error":"invalid_grant","error_description":"Token is expired"}`) + })) + defer srv.Close() + + _, _, err := RefreshAccessToken(srv.URL, "test-client", "bad-refresh") + if err == nil { + t.Fatal("expected error for expired refresh token") + } +} + +func TestRefreshAccessToken_NoAccessToken(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `{"refresh_token":"new-refresh"}`) + })) + defer srv.Close() + + _, _, err := RefreshAccessToken(srv.URL, "test-client", "my-refresh") + if err == nil { + t.Fatal("expected error for missing access_token") + } +} + +func TestGetTokenWithRefresh_ValidToken(t *testing.T) { + t.Setenv("AMBIENT_TOKEN", "") + validToken := makeJWT(jwt.MapClaims{"exp": float64(time.Now().Add(1 * time.Hour).Unix())}) + cfg := &Config{AccessToken: validToken} + + token, err := cfg.GetTokenWithRefresh() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if token != validToken { + t.Error("expected valid token returned as-is") + } +} + +func TestGetTokenWithRefresh_ExpiredNoRefreshToken(t *testing.T) { + t.Setenv("AMBIENT_TOKEN", "") + expiredToken := makeJWT(jwt.MapClaims{"exp": float64(time.Now().Add(-1 * time.Hour).Unix())}) + cfg := &Config{AccessToken: expiredToken} + + token, err := cfg.GetTokenWithRefresh() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if token != expiredToken { + t.Error("expected expired token returned when no refresh available") + } +} + +func TestGetTokenWithRefresh_ExpiredWithRefresh(t *testing.T) { + t.Setenv("AMBIENT_TOKEN", "") + expiredToken := makeJWT(jwt.MapClaims{"exp": float64(time.Now().Add(-1 * time.Hour).Unix())}) + newAccessToken := makeJWT(jwt.MapClaims{"exp": float64(time.Now().Add(1 * time.Hour).Unix())}) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + resp, _ := json.Marshal(map[string]string{ + "access_token": newAccessToken, + "refresh_token": "rotated-refresh", + }) + w.Write(resp) + })) + defer srv.Close() + + dir := t.TempDir() + t.Setenv("AMBIENT_CONFIG", dir+"/config.json") + + cfg := &Config{ + AccessToken: expiredToken, + RefreshToken: "old-refresh", + IssuerURL: srv.URL, + ClientID: "test-client", + } + + token, err := cfg.GetTokenWithRefresh() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if token != newAccessToken { + t.Error("expected refreshed access token") + } + if cfg.RefreshToken != "rotated-refresh" { + t.Errorf("expected rotated refresh token, got %q", cfg.RefreshToken) + } +} + +func TestGetTokenWithRefresh_EnvOverride(t *testing.T) { + t.Setenv("AMBIENT_TOKEN", "env-token-at-least-20chars") + cfg := &Config{AccessToken: "config-token"} + + token, err := cfg.GetTokenWithRefresh() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if token != "env-token-at-least-20chars" { + t.Errorf("expected env token, got %q", token) + } +} diff --git a/components/ambient-cli/pkg/connection/connection.go b/components/ambient-cli/pkg/connection/connection.go index 71c2a4261..beb4b9328 100644 --- a/components/ambient-cli/pkg/connection/connection.go +++ b/components/ambient-cli/pkg/connection/connection.go @@ -62,7 +62,10 @@ func NewClientFactory() (*ClientFactory, error) { return nil, fmt.Errorf("load config: %w", err) } - token := cfg.GetToken() + token, err := cfg.GetTokenWithRefresh() + if err != nil { + return nil, fmt.Errorf("token refresh: %w", err) + } if token == "" { return nil, fmt.Errorf("not logged in; run 'acpctl login' first") } diff --git a/components/ambient-control-plane/internal/reconciler/kube_reconciler.go b/components/ambient-control-plane/internal/reconciler/kube_reconciler.go index c4ef2bd94..a74fc7a86 100644 --- a/components/ambient-control-plane/internal/reconciler/kube_reconciler.go +++ b/components/ambient-control-plane/internal/reconciler/kube_reconciler.go @@ -408,7 +408,10 @@ func (r *SimpleKubeReconciler) ensurePod(ctx context.Context, namespace string, } labels := sessionLabels(session.ID, session.ProjectID) - useMCPSidecar := r.cfg.MCPImage != "" + useMCPSidecar := r.cfg.MCPImage != "" && r.cfg.CPTokenURL != "" && r.cfg.CPTokenPublicKey != "" + if r.cfg.MCPImage != "" && !useMCPSidecar { + r.logger.Warn().Str("session_id", session.ID).Msg("MCP sidecar disabled: CP_TOKEN_URL or CPTokenPublicKey not configured") + } containers := []interface{}{ map[string]interface{}{ @@ -444,12 +447,8 @@ func (r *SimpleKubeReconciler) ensurePod(ctx context.Context, namespace string, } if useMCPSidecar { - if r.cfg.CPTokenURL == "" || r.cfg.CPTokenPublicKey == "" { - r.logger.Warn().Str("session_id", session.ID).Msg("MCP sidecar skipped: CP_TOKEN_URL or CPTokenPublicKey not configured") - } else { - containers = append(containers, r.buildMCPSidecar(session.ID)) - r.logger.Info().Str("session_id", session.ID).Msg("MCP sidecar enabled for session") - } + containers = append(containers, r.buildMCPSidecar(session.ID)) + r.logger.Info().Str("session_id", session.ID).Msg("MCP sidecar enabled for session") } pod := &unstructured.Unstructured{