From 07098b09fa5b961c6cf9c58b68a02de26926e12d Mon Sep 17 00:00:00 2001 From: Ambient Code Bot Date: Thu, 16 Apr 2026 13:36:51 -0400 Subject: [PATCH 1/5] feat(cli): add -F (--follow-continuous) to session messages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a continuous follow mode that reconnects to the SSE stream after each turn ends, stopping only on Ctrl+C. Unlike -f which exits when the server closes the connection, -F loops with a 3s reconnect delay for persistent session observation. ๐Ÿค– Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../ambient-cli/cmd/acpctl/session/cmd.go | 3 +- .../cmd/acpctl/session/messages.go | 73 ++++++++++++++++--- 2 files changed, 65 insertions(+), 11 deletions(-) diff --git a/components/ambient-cli/cmd/acpctl/session/cmd.go b/components/ambient-cli/cmd/acpctl/session/cmd.go index 267c9fa75..0dbe9f802 100644 --- a/components/ambient-cli/cmd/acpctl/session/cmd.go +++ b/components/ambient-cli/cmd/acpctl/session/cmd.go @@ -12,7 +12,8 @@ var Cmd = &cobra.Command{ Examples: acpctl session messages # list messages (snapshot) - acpctl session messages -f # live SSE stream (reconnectable) + acpctl session messages -f # live SSE stream (ends at turn end) + acpctl session messages -F # continuous follow (Ctrl+C to stop) acpctl session send "Hello!" # send a message acpctl session send "Hello!" -f # send and stream until done acpctl session events # raw AG-UI event stream`, diff --git a/components/ambient-cli/cmd/acpctl/session/messages.go b/components/ambient-cli/cmd/acpctl/session/messages.go index 388c4289e..c6679ed70 100644 --- a/components/ambient-cli/cmd/acpctl/session/messages.go +++ b/components/ambient-cli/cmd/acpctl/session/messages.go @@ -20,10 +20,11 @@ import ( ) var msgArgs struct { - follow bool - followJSON bool - outputFormat string - afterSeq int + follow bool + followContinuous bool + followJSON bool + outputFormat string + afterSeq int } var messagesCmd = &cobra.Command{ @@ -33,13 +34,16 @@ var messagesCmd = &cobra.Command{ Without -f, fetches a snapshot of messages from the message store. With -f, connects to the live SSE event stream and renders events -as readable text. The stream stays open until Ctrl+C. -With -f --json, emits raw AG-UI JSON events instead of text. +as readable text. The stream ends when the current turn finishes. +With -F, continuously follows the stream, reconnecting after each +turn ends. The stream stays open until Ctrl+C. +With -f --json or -F --json, emits raw AG-UI JSON events instead of text. Examples: acpctl session messages # snapshot - acpctl session messages -f # live SSE stream (Ctrl+C to stop) - acpctl session messages -f --json # raw AG-UI JSON events + acpctl session messages -f # live stream (ends at turn end) + acpctl session messages -F # continuous follow (Ctrl+C to stop) + acpctl session messages -F --json # continuous raw AG-UI JSON events acpctl session messages -o json # JSON snapshot acpctl session messages --after 5 # messages after seq 5`, Args: cobra.ExactArgs(1), @@ -47,8 +51,9 @@ Examples: } func init() { - messagesCmd.Flags().BoolVarP(&msgArgs.follow, "follow", "f", false, "Stream live SSE events (same stream as send -f)") - messagesCmd.Flags().BoolVar(&msgArgs.followJSON, "json", false, "with -f: emit raw AG-UI JSON events instead of text") + messagesCmd.Flags().BoolVarP(&msgArgs.follow, "follow", "f", false, "Stream live SSE events until the current turn ends") + messagesCmd.Flags().BoolVarP(&msgArgs.followContinuous, "follow-continuous", "F", false, "Continuously follow SSE events, reconnecting between turns (Ctrl+C to stop)") + messagesCmd.Flags().BoolVar(&msgArgs.followJSON, "json", false, "with -f/-F: emit raw AG-UI JSON events instead of text") messagesCmd.Flags().StringVarP(&msgArgs.outputFormat, "output", "o", "", "Output format: json") messagesCmd.Flags().IntVar(&msgArgs.afterSeq, "after", 0, "Only show messages after this sequence number") } @@ -61,6 +66,9 @@ func runMessages(cmd *cobra.Command, args []string) error { return err } + if msgArgs.followContinuous { + return streamMessagesContinuous(cmd, client, sessionID) + } if msgArgs.follow { return streamMessages(cmd, client, sessionID) } @@ -416,6 +424,51 @@ func streamMessages(cmd *cobra.Command, client *sdkclient.Client, sessionID stri return renderSSEStream(stream, cmd.OutOrStdout(), msgArgs.followJSON, false) } +func streamMessagesContinuous(cmd *cobra.Command, client *sdkclient.Client, sessionID string) error { + ctx, cancel := signal.NotifyContext(cmd.Context(), os.Interrupt) + defer cancel() + + out := cmd.OutOrStdout() + fmt.Fprintf(out, "Continuously following session %s (Ctrl+C to stop)...\n\n", sessionID) + + const reconnectDelay = 3 * time.Second + + for { + stream, err := client.Sessions().StreamEvents(ctx, sessionID) + if err != nil { + if ctx.Err() != nil { + return nil + } + fmt.Fprintf(out, "[reconnect] stream not available: %v โ€” retrying in %s\n", err, reconnectDelay) + select { + case <-ctx.Done(): + return nil + case <-time.After(reconnectDelay): + continue + } + } + + streamErr := renderSSEStream(stream, out, msgArgs.followJSON, false) + stream.Close() + + if ctx.Err() != nil { + return nil + } + + if streamErr != nil { + fmt.Fprintf(out, "\n[reconnect] stream ended: %v โ€” reconnecting in %s\n", streamErr, reconnectDelay) + } else { + fmt.Fprintf(out, "\n[reconnect] stream ended โ€” reconnecting in %s\n", reconnectDelay) + } + + select { + case <-ctx.Done(): + return nil + case <-time.After(reconnectDelay): + } + } +} + func renderSSEStream(stream io.Reader, out io.Writer, jsonMode, exitOnRunFinished bool) error { scanner := bufio.NewScanner(stream) var reasoningBuf strings.Builder From 6709762b95b3a02d662b78db1f6188e1145144fc Mon Sep 17 00:00:00 2001 From: Ambient Code Bot Date: Thu, 16 Apr 2026 13:37:05 -0400 Subject: [PATCH 2/5] feat(cli): transparent OIDC token refresh via refresh_token MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Persists refresh_token, issuer_url, and client_id in the config file during auth-code login. GetTokenWithRefresh() silently refreshes expired access tokens using the stored refresh token before any API call, falling back gracefully to the expired token if refresh fails. ๐Ÿค– Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../ambient-cli/cmd/acpctl/login/authcode.go | 61 +++++--- .../cmd/acpctl/login/authcode_test.go | 65 ++++++++ .../ambient-cli/cmd/acpctl/login/cmd.go | 10 +- components/ambient-cli/pkg/config/config.go | 36 +++++ components/ambient-cli/pkg/config/token.go | 50 ++++++ .../ambient-cli/pkg/config/token_test.go | 144 ++++++++++++++++++ .../ambient-cli/pkg/connection/connection.go | 5 +- 7 files changed, 349 insertions(+), 22 deletions(-) diff --git a/components/ambient-cli/cmd/acpctl/login/authcode.go b/components/ambient-cli/cmd/acpctl/login/authcode.go index 4456d9d56..f7b9a594a 100644 --- a/components/ambient-cli/cmd/acpctl/login/authcode.go +++ b/components/ambient-cli/cmd/acpctl/login/authcode.go @@ -40,14 +40,19 @@ type authCodeResult struct { err error } -func runAuthCodeFlow(issuerURL, clientID, clientSecret string) (string, error) { +type tokenResult struct { + AccessToken string + RefreshToken string +} + +func runAuthCodeFlow(issuerURL, clientID, clientSecret string) (*tokenResult, error) { issuerURL = strings.TrimRight(issuerURL, "/") authorizeURL := issuerURL + "/protocol/openid-connect/auth" tokenURL := issuerURL + "/protocol/openid-connect/token" listener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { - return "", fmt.Errorf("start local callback listener: %w", err) + return nil, fmt.Errorf("start local callback listener: %w", err) } defer listener.Close() @@ -56,12 +61,12 @@ func runAuthCodeFlow(issuerURL, clientID, clientSecret string) (string, error) { state, err := generateRandomState() if err != nil { - return "", fmt.Errorf("generate state: %w", err) + return nil, fmt.Errorf("generate state: %w", err) } codeVerifier, codeChallenge, err := generatePKCE() if err != nil { - return "", fmt.Errorf("generate PKCE: %w", err) + return nil, fmt.Errorf("generate PKCE: %w", err) } authURL := buildAuthURL(authorizeURL, clientID, redirectURI, state, codeChallenge) @@ -91,19 +96,19 @@ func runAuthCodeFlow(issuerURL, clientID, clientSecret string) (string, error) { select { case result = <-resultCh: case <-ctx.Done(): - return "", fmt.Errorf("timed out waiting for authorization callback (%.0fs)", callbackTimeout.Seconds()) + return nil, fmt.Errorf("timed out waiting for authorization callback (%.0fs)", callbackTimeout.Seconds()) } if result.err != nil { - return "", fmt.Errorf("authorization failed: %w", result.err) + return nil, fmt.Errorf("authorization failed: %w", result.err) } - token, err := exchangeCodeForToken(tokenURL, clientID, clientSecret, result.code, redirectURI, codeVerifier) + tokens, err := exchangeCodeForTokens(tokenURL, clientID, clientSecret, result.code, redirectURI, codeVerifier) if err != nil { - return "", fmt.Errorf("exchange authorization code: %w", err) + return nil, fmt.Errorf("exchange authorization code: %w", err) } - return token, nil + return tokens, nil } func generateRandomState() (string, error) { @@ -182,6 +187,14 @@ func callbackHandler(expectedState string, resultCh chan<- authCodeResult) http. } func exchangeCodeForToken(tokenURL, clientID, clientSecret, code, redirectURI, codeVerifier string) (string, error) { + tokens, err := exchangeCodeForTokens(tokenURL, clientID, clientSecret, code, redirectURI, codeVerifier) + if err != nil { + return "", err + } + return tokens.AccessToken, nil +} + +func exchangeCodeForTokens(tokenURL, clientID, clientSecret, code, redirectURI, codeVerifier string) (*tokenResult, error) { params := url.Values{ "grant_type": {"authorization_code"}, "code": {code}, @@ -193,25 +206,23 @@ func exchangeCodeForToken(tokenURL, clientID, clientSecret, code, redirectURI, c params.Set("client_secret", clientSecret) } - // Raw net/http is intentional here: this call goes to an external OIDC provider - // (RH SSO), not the Ambient API server. The SDK client is for Ambient API calls only. httpClient := &http.Client{Timeout: 30 * time.Second} resp, err := httpClient.PostForm(tokenURL, params) if err != nil { - return "", fmt.Errorf("POST to token endpoint: %w", err) + return nil, fmt.Errorf("POST to token endpoint: %w", err) } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { - return "", fmt.Errorf("read token response: %w", err) + return nil, fmt.Errorf("read token response: %w", err) } if resp.StatusCode != http.StatusOK { - return "", tokenEndpointError(resp.StatusCode, body) + return nil, tokenEndpointError(resp.StatusCode, body) } - return parseTokenResponse(body) + return parseTokensResponse(body) } func tokenEndpointError(statusCode int, body []byte) error { @@ -231,16 +242,28 @@ func tokenEndpointError(statusCode int, body []byte) error { } func parseTokenResponse(body []byte) (string, error) { + tokens, err := parseTokensResponse(body) + if err != nil { + return "", err + } + return tokens.AccessToken, nil +} + +func parseTokensResponse(body []byte) (*tokenResult, error) { var resp struct { - AccessToken string `json:"access_token"` + AccessToken string `json:"access_token"` + RefreshToken string `json:"refresh_token"` } if err := json.Unmarshal(body, &resp); err != nil { - return "", fmt.Errorf("parse token response: %w", err) + return nil, fmt.Errorf("parse token response: %w", err) } if resp.AccessToken == "" { - return "", errors.New("no access_token in token response") + return nil, errors.New("no access_token in token response") } - return resp.AccessToken, nil + return &tokenResult{ + AccessToken: resp.AccessToken, + RefreshToken: resp.RefreshToken, + }, nil } func openBrowser(target string) error { diff --git a/components/ambient-cli/cmd/acpctl/login/authcode_test.go b/components/ambient-cli/cmd/acpctl/login/authcode_test.go index 3f03b7f56..22580c565 100644 --- a/components/ambient-cli/cmd/acpctl/login/authcode_test.go +++ b/components/ambient-cli/cmd/acpctl/login/authcode_test.go @@ -308,6 +308,71 @@ func TestExchangeCodeForToken_ErrorResponse(t *testing.T) { } } +func TestParseTokensResponse_WithRefreshToken(t *testing.T) { + body, _ := json.Marshal(map[string]string{ + "access_token": "my-access-token", + "refresh_token": "my-refresh-token", + "token_type": "Bearer", + }) + + result, err := parseTokensResponse(body) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result.AccessToken != "my-access-token" { + t.Errorf("expected %q, got %q", "my-access-token", result.AccessToken) + } + if result.RefreshToken != "my-refresh-token" { + t.Errorf("expected %q, got %q", "my-refresh-token", result.RefreshToken) + } +} + +func TestParseTokensResponse_NoRefreshToken(t *testing.T) { + body, _ := json.Marshal(map[string]string{ + "access_token": "my-access-token", + "token_type": "Bearer", + }) + + result, err := parseTokensResponse(body) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result.AccessToken != "my-access-token" { + t.Errorf("expected %q, got %q", "my-access-token", result.AccessToken) + } + if result.RefreshToken != "" { + t.Errorf("expected empty refresh token, got %q", result.RefreshToken) + } +} + +func TestParseTokensResponse_MissingAccessToken(t *testing.T) { + body, _ := json.Marshal(map[string]string{"refresh_token": "refresh-only"}) + + _, err := parseTokensResponse(body) + if err == nil { + t.Fatal("expected error for missing access_token") + } +} + +func TestExchangeCodeForTokens_ReturnsRefresh(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `{"access_token":"access-123","refresh_token":"refresh-456","token_type":"Bearer"}`) + })) + defer srv.Close() + + result, err := exchangeCodeForTokens(srv.URL, "client-id", "", "mycode", "http://127.0.0.1:9/callback", "myverifier") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result.AccessToken != "access-123" { + t.Errorf("expected %q, got %q", "access-123", result.AccessToken) + } + if result.RefreshToken != "refresh-456" { + t.Errorf("expected %q, got %q", "refresh-456", result.RefreshToken) + } +} + func TestCallbackHandler_OAuthErrorFallsBackToErrorCode(t *testing.T) { resultCh := make(chan authCodeResult, 1) handler := callbackHandler("state", resultCh) diff --git a/components/ambient-cli/cmd/acpctl/login/cmd.go b/components/ambient-cli/cmd/acpctl/login/cmd.go index 7328d2977..0c5e92725 100644 --- a/components/ambient-cli/cmd/acpctl/login/cmd.go +++ b/components/ambient-cli/cmd/acpctl/login/cmd.go @@ -84,13 +84,19 @@ func run(cmd *cobra.Command, positional []string) error { var accessToken string if args.useAuthCode { - token, err := runAuthCodeFlow(args.issuerURL, args.clientID, args.clientSecret) + tokens, err := runAuthCodeFlow(args.issuerURL, args.clientID, args.clientSecret) if err != nil { return fmt.Errorf("auth-code login: %w", err) } - accessToken = token + accessToken = tokens.AccessToken + cfg.RefreshToken = tokens.RefreshToken + cfg.IssuerURL = args.issuerURL + cfg.ClientID = args.clientID } else { accessToken = args.token + cfg.RefreshToken = "" + cfg.IssuerURL = "" + cfg.ClientID = "" } cfg.AccessToken = accessToken diff --git a/components/ambient-cli/pkg/config/config.go b/components/ambient-cli/pkg/config/config.go index 4b09ca0ed..c214ce1bf 100644 --- a/components/ambient-cli/pkg/config/config.go +++ b/components/ambient-cli/pkg/config/config.go @@ -13,6 +13,9 @@ import ( type Config struct { APIUrl string `json:"api_url,omitempty"` AccessToken string `json:"access_token,omitempty"` + RefreshToken string `json:"refresh_token,omitempty"` + IssuerURL string `json:"issuer_url,omitempty"` + ClientID string `json:"client_id,omitempty"` Project string `json:"project,omitempty"` Pager string `json:"pager,omitempty"` // TODO: Wire pager support into output commands (e.g. pipe through less) RequestTimeout int `json:"request_timeout,omitempty"` // Request timeout in seconds @@ -109,6 +112,39 @@ func (c *Config) GetToken() string { return c.AccessToken } +func (c *Config) GetTokenWithRefresh() (string, error) { + if env := os.Getenv("AMBIENT_TOKEN"); env != "" { + return env, nil + } + + if c.AccessToken == "" { + return "", nil + } + + expired, err := IsTokenExpired(c.AccessToken) + if err != nil || !expired { + return c.AccessToken, nil + } + + if c.RefreshToken == "" || c.IssuerURL == "" || c.ClientID == "" { + return c.AccessToken, nil + } + + newAccess, newRefresh, refreshErr := RefreshAccessToken(c.IssuerURL, c.ClientID, c.RefreshToken) + if refreshErr != nil { + return c.AccessToken, nil + } + + c.AccessToken = newAccess + if newRefresh != "" { + c.RefreshToken = newRefresh + } + + _ = Save(c) + + return c.AccessToken, nil +} + // GetRequestTimeout returns the request timeout duration with fallback to default func (c *Config) GetRequestTimeout() time.Duration { if env := os.Getenv("AMBIENT_REQUEST_TIMEOUT"); env != "" { diff --git a/components/ambient-cli/pkg/config/token.go b/components/ambient-cli/pkg/config/token.go index 69dea93c3..98ba95950 100644 --- a/components/ambient-cli/pkg/config/token.go +++ b/components/ambient-cli/pkg/config/token.go @@ -1,7 +1,11 @@ package config import ( + "encoding/json" "fmt" + "io" + "net/http" + "net/url" "strings" "time" @@ -47,3 +51,49 @@ func IsTokenExpired(tokenStr string) (bool, error) { return time.Now().After(expiry), nil } + +func RefreshAccessToken(issuerURL, clientID, refreshToken string) (accessToken, newRefreshToken string, err error) { + tokenURL := strings.TrimRight(issuerURL, "/") + "/protocol/openid-connect/token" + + params := url.Values{ + "grant_type": {"refresh_token"}, + "client_id": {clientID}, + "refresh_token": {refreshToken}, + } + + httpClient := &http.Client{Timeout: 30 * time.Second} + resp, err := httpClient.PostForm(tokenURL, params) + if err != nil { + return "", "", fmt.Errorf("POST to token endpoint: %w", err) + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return "", "", fmt.Errorf("read token response: %w", err) + } + + if resp.StatusCode != http.StatusOK { + var errResp struct { + Error string `json:"error"` + ErrorDescription string `json:"error_description"` + } + if jsonErr := json.Unmarshal(body, &errResp); jsonErr == nil && errResp.ErrorDescription != "" { + return "", "", fmt.Errorf("token refresh: %s", errResp.ErrorDescription) + } + return "", "", fmt.Errorf("token refresh: HTTP %d", resp.StatusCode) + } + + var tokenResp struct { + AccessToken string `json:"access_token"` + RefreshToken string `json:"refresh_token"` + } + if err := json.Unmarshal(body, &tokenResp); err != nil { + return "", "", fmt.Errorf("parse token response: %w", err) + } + if tokenResp.AccessToken == "" { + return "", "", fmt.Errorf("no access_token in refresh response") + } + + return tokenResp.AccessToken, tokenResp.RefreshToken, nil +} diff --git a/components/ambient-cli/pkg/config/token_test.go b/components/ambient-cli/pkg/config/token_test.go index ee4a3fb53..fc0fadeb6 100644 --- a/components/ambient-cli/pkg/config/token_test.go +++ b/components/ambient-cli/pkg/config/token_test.go @@ -1,6 +1,10 @@ package config import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" "testing" "time" @@ -96,3 +100,143 @@ func TestIsTokenExpiredNoExp(t *testing.T) { t.Error("expected non-expired for token without exp claim") } } + +func TestRefreshAccessToken_Success(t *testing.T) { + newAccessToken := makeJWT(jwt.MapClaims{"exp": float64(time.Now().Add(1 * time.Hour).Unix())}) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Errorf("expected POST, got %s", r.Method) + } + if err := r.ParseForm(); err != nil { + t.Fatalf("parse form: %v", err) + } + if r.FormValue("grant_type") != "refresh_token" { + t.Errorf("expected grant_type=refresh_token, got %q", r.FormValue("grant_type")) + } + if r.FormValue("refresh_token") != "my-refresh-token" { + t.Errorf("expected refresh_token=my-refresh-token, got %q", r.FormValue("refresh_token")) + } + if r.FormValue("client_id") != "test-client" { + t.Errorf("expected client_id=test-client, got %q", r.FormValue("client_id")) + } + w.Header().Set("Content-Type", "application/json") + fmt.Fprintf(w, `{"access_token":%q,"refresh_token":"new-refresh"}`, newAccessToken) + })) + defer srv.Close() + + access, refresh, err := RefreshAccessToken(srv.URL, "test-client", "my-refresh-token") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if access != newAccessToken { + t.Errorf("access token mismatch") + } + if refresh != "new-refresh" { + t.Errorf("expected new-refresh, got %q", refresh) + } +} + +func TestRefreshAccessToken_ErrorResponse(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusBadRequest) + fmt.Fprint(w, `{"error":"invalid_grant","error_description":"Token is expired"}`) + })) + defer srv.Close() + + _, _, err := RefreshAccessToken(srv.URL, "test-client", "bad-refresh") + if err == nil { + t.Fatal("expected error for expired refresh token") + } +} + +func TestRefreshAccessToken_NoAccessToken(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `{"refresh_token":"new-refresh"}`) + })) + defer srv.Close() + + _, _, err := RefreshAccessToken(srv.URL, "test-client", "my-refresh") + if err == nil { + t.Fatal("expected error for missing access_token") + } +} + +func TestGetTokenWithRefresh_ValidToken(t *testing.T) { + t.Setenv("AMBIENT_TOKEN", "") + validToken := makeJWT(jwt.MapClaims{"exp": float64(time.Now().Add(1 * time.Hour).Unix())}) + cfg := &Config{AccessToken: validToken} + + token, err := cfg.GetTokenWithRefresh() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if token != validToken { + t.Error("expected valid token returned as-is") + } +} + +func TestGetTokenWithRefresh_ExpiredNoRefreshToken(t *testing.T) { + t.Setenv("AMBIENT_TOKEN", "") + expiredToken := makeJWT(jwt.MapClaims{"exp": float64(time.Now().Add(-1 * time.Hour).Unix())}) + cfg := &Config{AccessToken: expiredToken} + + token, err := cfg.GetTokenWithRefresh() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if token != expiredToken { + t.Error("expected expired token returned when no refresh available") + } +} + +func TestGetTokenWithRefresh_ExpiredWithRefresh(t *testing.T) { + t.Setenv("AMBIENT_TOKEN", "") + expiredToken := makeJWT(jwt.MapClaims{"exp": float64(time.Now().Add(-1 * time.Hour).Unix())}) + newAccessToken := makeJWT(jwt.MapClaims{"exp": float64(time.Now().Add(1 * time.Hour).Unix())}) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + resp, _ := json.Marshal(map[string]string{ + "access_token": newAccessToken, + "refresh_token": "rotated-refresh", + }) + w.Write(resp) + })) + defer srv.Close() + + dir := t.TempDir() + t.Setenv("AMBIENT_CONFIG", dir+"/config.json") + + cfg := &Config{ + AccessToken: expiredToken, + RefreshToken: "old-refresh", + IssuerURL: srv.URL, + ClientID: "test-client", + } + + token, err := cfg.GetTokenWithRefresh() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if token != newAccessToken { + t.Error("expected refreshed access token") + } + if cfg.RefreshToken != "rotated-refresh" { + t.Errorf("expected rotated refresh token, got %q", cfg.RefreshToken) + } +} + +func TestGetTokenWithRefresh_EnvOverride(t *testing.T) { + t.Setenv("AMBIENT_TOKEN", "env-token-at-least-20chars") + cfg := &Config{AccessToken: "config-token"} + + token, err := cfg.GetTokenWithRefresh() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if token != "env-token-at-least-20chars" { + t.Errorf("expected env token, got %q", token) + } +} diff --git a/components/ambient-cli/pkg/connection/connection.go b/components/ambient-cli/pkg/connection/connection.go index 71c2a4261..beb4b9328 100644 --- a/components/ambient-cli/pkg/connection/connection.go +++ b/components/ambient-cli/pkg/connection/connection.go @@ -62,7 +62,10 @@ func NewClientFactory() (*ClientFactory, error) { return nil, fmt.Errorf("load config: %w", err) } - token := cfg.GetToken() + token, err := cfg.GetTokenWithRefresh() + if err != nil { + return nil, fmt.Errorf("token refresh: %w", err) + } if token == "" { return nil, fmt.Errorf("not logged in; run 'acpctl login' first") } From bec827bb374d7a106a17b580073e17d40b7fc27e Mon Sep 17 00:00:00 2001 From: Ambient Code Bot Date: Thu, 16 Apr 2026 14:58:36 -0400 Subject: [PATCH 3/5] feat(cli): add demo-remote.sh for remote two-agent demos MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two-agent push_message relay conversation demo (historian + poet) discussing Charleston, SC. Uses 2-pane tmux layout with live message watch. Script relays responses between agents via push_message for 4 exchanges, then cleans up project. ๐Ÿค– Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- components/ambient-cli/demo-remote.sh | 433 ++++++++++++++++++++++++++ 1 file changed, 433 insertions(+) create mode 100755 components/ambient-cli/demo-remote.sh diff --git a/components/ambient-cli/demo-remote.sh b/components/ambient-cli/demo-remote.sh new file mode 100755 index 000000000..8a74fc032 --- /dev/null +++ b/components/ambient-cli/demo-remote.sh @@ -0,0 +1,433 @@ +#!/usr/bin/env bash +# demo-remote.sh โ€” acpctl two-agent conversation demo against a remote deployment +# +# Layout: 2-pane tmux โ€” left = main demo, right = live message watch. +# Creates two sessions ("historian" and "poet"). They have an extended +# conversation about Charleston, SC by exchanging messages via push_message. +# The script relays each agent's response to the other agent. +# +# Usage: +# ./demo-remote.sh https://ambient-api-server-ambient-code--ambient-s5.apps.int.spoke.prod.us-west-2.aws.paas.redhat.com/ +# AMBIENT_API_URL=https://... ./demo-remote.sh +# +# Optional env: +# AMBIENT_API_URL โ€” API server URL (or pass as positional arg) +# AMBIENT_TOKEN โ€” pre-existing token (skips browser login) +# ACPCTL โ€” path to acpctl binary (default: acpctl from PATH) +# PAUSE โ€” seconds between demo steps (default: 2) +# SESSION_READY_TIMEOUT โ€” seconds to wait for Running (default: 180) +# MESSAGE_WAIT_TIMEOUT โ€” seconds to wait for messages (default: 120) +# INSECURE_TLS โ€” set to 1 to skip TLS verification (default: unset) + +set -euo pipefail + +# โ”€โ”€ tmux layout bootstrap โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +TMUX_SESSION="ambient-demo" +DEMO_SCRIPT="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)/$(basename "${BASH_SOURCE[0]}")" +DEMO_ARGS=("${@}") + +if [[ -z "${TMUX:-}" ]]; then + tmux kill-session -t "$TMUX_SESSION" 2>/dev/null || true + tmux new-session -d -s "$TMUX_SESSION" -x 220 -y 50 + + tmux split-window -h -t "${TMUX_SESSION}:0" -p 40 + + tmux send-keys -t "${TMUX_SESSION}:0.1" "printf '\\033[2m[watch panel โ€” waiting for session]\\033[0m\\n'" Enter + + tmux send-keys -t "${TMUX_SESSION}:0.0" \ + "TMUX_SESSION=${TMUX_SESSION} INSIDE_DEMO_TMUX=1 bash ${DEMO_SCRIPT} ${DEMO_ARGS[*]:-}" Enter + + tmux select-pane -t "${TMUX_SESSION}:0.0" + tmux attach-session -t "$TMUX_SESSION" + exit 0 +fi + +WATCH_PANE="1" + +attach_watch() { + local session_id="$1" + tmux send-keys -t "${TMUX_SESSION}:0.${WATCH_PANE}" \ + "${ACPCTL:-acpctl} session messages ${session_id} -F" Enter +} + +# โ”€โ”€ config โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +ACPCTL="${ACPCTL:-acpctl}" +PAUSE="${PAUSE:-2}" +SESSION_READY_TIMEOUT="${SESSION_READY_TIMEOUT:-180}" +MESSAGE_WAIT_TIMEOUT="${MESSAGE_WAIT_TIMEOUT:-120}" +INSECURE_TLS="${INSECURE_TLS:-}" + +if [[ $# -ge 1 ]]; then + AMBIENT_API_URL="${1}" +fi +AMBIENT_API_URL="${AMBIENT_API_URL:-}" + +if [[ -z "${AMBIENT_API_URL}" ]]; then + AMBIENT_API_URL=$("$ACPCTL" config get api_url 2>/dev/null || true) +fi + +if [[ -z "${AMBIENT_API_URL}" ]]; then + printf '\033[31merror: no API URL. Pass as argument or set AMBIENT_API_URL.\033[0m\n' >&2 + exit 1 +fi + +AMBIENT_API_URL="${AMBIENT_API_URL%/}" + +# โ”€โ”€ helpers โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +bold() { printf '\033[1m%s\033[0m\n' "$*"; } +dim() { printf '\033[2m%s\033[0m\n' "$*"; } +cyan() { printf '\033[36m%s\033[0m\n' "$*"; } +green() { printf '\033[32m%s\033[0m\n' "$*"; } +yellow(){ printf '\033[33m%s\033[0m\n' "$*"; } +red() { printf '\033[31m%s\033[0m\n' "$*"; } +sep() { printf '\033[2m%s\033[0m\n' "โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€"; } + +step() { + local description="$1" + shift + echo + sep + bold "โ–ถ $description" + printf '\033[38;5;214m $ %s\033[0m\n' "$*" + sleep "$PAUSE" + "$@" + echo +} + +announce() { + echo + sep + cyan "โ”โ” $*" + sep + sleep "$PAUSE" +} + +# โ”€โ”€ preflight โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +if ! command -v "$ACPCTL" &>/dev/null; then + red "error: ${ACPCTL} not found. Set ACPCTL=/path/to/acpctl or add to PATH." >&2; exit 1 +fi + +cleanup() { + : +} +trap cleanup EXIT + +# โ”€โ”€ intro โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +echo +bold "Ambient CLI Demo โ€” Two-Agent Conversation (remote)" +dim " API: ${AMBIENT_API_URL}" + +echo +sep +bold "What this demo will do:" +echo +printf ' %s\n' "1. Log in and create a project" +printf ' %s\n' "2. Create two sessions: historian and poet" +printf ' %s\n' "3. Wait for both to reach Running" +printf ' %s\n' "4. Attach live message watch for the poet (right panel)" +printf ' %s\n' "5. Seed the historian with Charleston context" +printf ' %s\n' "6. Relay the historian's response to the poet via push_message" +printf ' %s\n' "7. Relay the poet's response back to the historian" +printf ' %s\n' "8. Continue the relay for an extended conversation" +printf ' %s\n' "9. Show final state and clean up" +echo +printf ' \033[38;5;214m%-38s\033[0m %s\n' "Orange text like this" "= a terminal command being run" +echo +sep +if [[ "${PAUSE}" -gt 0 ]] 2>/dev/null; then + bold " Press Enter to begin..." + read -r +fi + +# โ”€โ”€ session helpers โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +wait_for_phase() { + local session_id="$1" target_phase="$2" + local deadline=$(( $(date +%s) + SESSION_READY_TIMEOUT )) + local last_phase="" + printf ' waiting for %s (timeout %ds)...\n' "${target_phase}" "${SESSION_READY_TIMEOUT}" + while true; do + local phase + phase=$( + "$ACPCTL" get session "$session_id" -o json 2>/dev/null \ + | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('phase',''))" 2>/dev/null || true + ) + if [[ "$phase" != "$last_phase" ]]; then + printf ' phase: %s\n' "$phase" + last_phase="$phase" + fi + [[ "$phase" == "$target_phase" ]] && { green " โœ“ session is ${target_phase}"; return 0; } + [[ "$phase" == "Failed" || "$phase" == "Stopped" ]] && { red " โœ— session is ${phase}"; return 1; } + [[ $(date +%s) -ge $deadline ]] && { yellow " โœ— timed out (phase=${phase:-unknown})"; return 1; } + sleep 3 + done +} + +create_session() { + local name="$1" + local json + json=$( + "$ACPCTL" create session \ + --name "$name" \ + -o json 2>/dev/null + ) + local sid + sid=$(echo "$json" | python3 -c "import sys,json; print(json.load(sys.stdin)['id'])" 2>/dev/null) + if [[ -z "$sid" ]]; then + red " โœ— failed to parse session ID for ${name}" + return 1 + fi + echo "$sid" +} + +extract_last_assistant_text() { + local session_id="$1" + "$ACPCTL" session messages "${session_id}" -o json 2>/dev/null \ + | python3 -c " +import sys, json +try: + msgs = json.load(sys.stdin) + for m in reversed(msgs): + if m.get('event_type') == 'MESSAGES_SNAPSHOT': + snapshot = json.loads(m.get('payload', '[]')) + for msg in reversed(snapshot): + if msg.get('role') == 'assistant': + content = msg.get('content', '') + if isinstance(content, list): + parts = [] + for p in content: + if isinstance(p, dict) and p.get('type') == 'text': + parts.append(p.get('text', '')) + content = ' '.join(parts) + if content.strip(): + print(content.strip()) + sys.exit(0) +except Exception: + pass +" 2>/dev/null || true +} + +send_and_capture() { + local session_id="$1" label="$2" msg="$3" + + echo; sep + bold "โ–ถ โ†’ ${label}: sending message" + if [[ ${#msg} -gt 120 ]]; then + dim " ${msg:0:117}..." + else + dim " ${msg}" + fi + sleep "$PAUSE" + + bold "โ–ถ โ†’ ${label}: waiting for response..." + if timeout "${MESSAGE_WAIT_TIMEOUT}" "$ACPCTL" session send "${session_id}" "$msg" -f 2>&1; then + green " โœ“ ${label} responded" + else + yellow " โœ— ${label}: send -f exited with error or timeout" + fi + echo + + LAST_RESPONSE=$(extract_last_assistant_text "${session_id}") + + bold "โ–ถ ${label} says:" + if [[ -n "${LAST_RESPONSE}" ]]; then + echo " ${LAST_RESPONSE}" | fold -s -w 76 | sed 's/^/ /' + else + dim " (no response captured)" + fi + echo + sleep "$PAUSE" +} + +# โ”€โ”€ section 1: login โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +announce "1 ยท Log in" + +if [[ -n "${AMBIENT_TOKEN:-}" ]]; then + TLS_FLAG="" + if [[ -n "${INSECURE_TLS}" ]]; then + TLS_FLAG="--insecure-skip-tls-verify" + fi + step "Log in with existing token" \ + "$ACPCTL" login "${AMBIENT_API_URL}" \ + --token "${AMBIENT_TOKEN}" \ + ${TLS_FLAG} +else + TLS_FLAG="" + if [[ -n "${INSECURE_TLS}" ]]; then + TLS_FLAG="--insecure-skip-tls-verify" + fi + step "Log in via browser (Red Hat SSO)" \ + "$ACPCTL" login --use-auth-code \ + --url "${AMBIENT_API_URL}" \ + ${TLS_FLAG} +fi + +step "Show authenticated user" \ + "$ACPCTL" whoami + +# โ”€โ”€ section 2: project โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +announce "2 ยท Create project" + +RUN_ID=$(date +%s | tail -c5) +PROJECT_NAME="demo-${RUN_ID}" + +step "Create project: ${PROJECT_NAME}" \ + "$ACPCTL" create project \ + --name "${PROJECT_NAME}" \ + --display-name "Demo Project ${RUN_ID}" \ + --description "two-agent conversation demo" + +step "Set project context" \ + "$ACPCTL" project "${PROJECT_NAME}" + +step "Confirm project context" \ + "$ACPCTL" project current + +# โ”€โ”€ section 3: create both sessions โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +announce "3 ยท Create two sessions" + +sep; bold "โ–ถ Create session: historian"; sleep "$PAUSE" +HISTORIAN_ID=$(create_session "historian") +dim " historian ID: ${HISTORIAN_ID}"; echo + +sep; bold "โ–ถ Create session: poet"; sleep "$PAUSE" +POET_ID=$(create_session "poet") +dim " poet ID: ${POET_ID}"; echo + +step "List sessions" \ + "$ACPCTL" get sessions + +# โ”€โ”€ section 4: wait for both to reach Running โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +announce "4 ยท Wait for both sessions to reach Running" + +bold "โ–ถ Waiting for historian..." +wait_for_phase "${HISTORIAN_ID}" "Running" || { red " historian did not reach Running"; exit 1; } +echo + +bold "โ–ถ Waiting for poet..." +wait_for_phase "${POET_ID}" "Running" || { red " poet did not reach Running"; exit 1; } +echo + +# โ”€โ”€ attach watch for poet in the right panel โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +announce "5 ยท Attach live watch for poet session" + +dim " attaching poet message watch to right panel..." +sleep 2 +attach_watch "${POET_ID}" +sleep 3 +green " โœ“ right panel watching poet messages (Ctrl+C to stop)" + +# โ”€โ”€ section 6: seed the historian โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +announce "6 ยท Seed the historian with Charleston context" + +LAST_RESPONSE="" +send_and_capture "${HISTORIAN_ID}" "historian" \ + "You are a Charleston, SC historian. Tell me about what makes Charleston's geography and ecology unique โ€” the salt marshes, barrier islands, tidal creeks, and how they shape life in the Lowcountry. Keep your response to 2-3 sentences." + +HISTORIAN_OPENING="${LAST_RESPONSE}" + +# โ”€โ”€ section 7: relay to poet โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +announce "7 ยท Relay historian's response โ†’ poet (watch the right panel!)" + +dim " Forwarding the historian's words to the poet via push_message..." +sleep "$PAUSE" + +send_and_capture "${POET_ID}" "poet" \ + "You are a Charleston poet. A historian just told you this about Charleston: \"${HISTORIAN_OPENING}\" โ€” Respond with your own poetic perspective on what the historian described. How does this landscape feel, smell, sound? 2-3 sentences, evocative language." + +POET_RESPONSE="${LAST_RESPONSE}" + +# โ”€โ”€ section 8: relay back to historian โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +announce "8 ยท Relay poet's response โ†’ historian" + +dim " Forwarding the poet's words back to the historian..." +sleep "$PAUSE" + +send_and_capture "${HISTORIAN_ID}" "historian" \ + "A poet responded to your description with this: \"${POET_RESPONSE}\" โ€” Now tell me about one specific historical event or tradition in Charleston that connects to the marshes or the sea. 2-3 sentences." + +HISTORIAN_HISTORY="${LAST_RESPONSE}" + +# โ”€โ”€ section 9: relay history back to poet โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +announce "9 ยท Relay historian's history โ†’ poet for a final poem" + +dim " Forwarding the historian's story back to the poet..." +sleep "$PAUSE" + +send_and_capture "${POET_ID}" "poet" \ + "The historian shared this story: \"${HISTORIAN_HISTORY}\" โ€” Write a 4-line poem inspired by what the historian told you. Mention the marshes and the sea." + +# โ”€โ”€ section 10: final summary โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +announce "10 ยท Conversation summary" + +echo +bold "The two agents had the following exchange about Charleston:" +echo +cyan " HISTORIAN (opening):" +echo " ${HISTORIAN_OPENING}" | fold -s -w 76 | sed 's/^/ /' +echo +cyan " POET (response):" +echo " ${POET_RESPONSE}" | fold -s -w 76 | sed 's/^/ /' +echo +cyan " HISTORIAN (history):" +echo " ${HISTORIAN_HISTORY}" | fold -s -w 76 | sed 's/^/ /' +echo +cyan " POET (final poem):" +echo " ${LAST_RESPONSE}" | fold -s -w 76 | sed 's/^/ /' +echo + +sep +sleep "$PAUSE" + +step "Historian โ€” all messages" \ + "$ACPCTL" session messages "${HISTORIAN_ID}" + +step "Poet โ€” all messages" \ + "$ACPCTL" session messages "${POET_ID}" + +# โ”€โ”€ section 11: cleanup โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +announce "11 ยท Stop and clean up" + +sep; bold "โ–ถ Stop historian"; sleep "$PAUSE" +"$ACPCTL" stop "${HISTORIAN_ID}" || true; echo + +sep; bold "โ–ถ Stop poet"; sleep "$PAUSE" +"$ACPCTL" stop "${POET_ID}" || true; echo + +sleep 3 + +step "Delete historian session" \ + "$ACPCTL" delete session "${HISTORIAN_ID}" -y + +step "Delete poet session" \ + "$ACPCTL" delete session "${POET_ID}" -y + +step "Delete project ${PROJECT_NAME}" \ + "$ACPCTL" delete project "${PROJECT_NAME}" -y + +step "Confirm cleanup" \ + "$ACPCTL" get projects + +# โ”€โ”€ done โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + +echo +sep +green " Demo complete โœ“" +sep +echo From 33fa6439fe67e01383b4166b6658463ec28d8697 Mon Sep 17 00:00:00 2001 From: Ambient Code Bot Date: Thu, 16 Apr 2026 14:58:44 -0400 Subject: [PATCH 4/5] fix(cli): resolve send -f deadlock when SSE proxy buffers idle sessions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reorder send.go to push the message before opening the SSE stream. The api-server SSE proxy blocks until the runner has an active run, so opening the stream first caused a deadlock. ๐Ÿค– Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../ambient-cli/cmd/acpctl/session/send.go | 23 ++++++++----------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/components/ambient-cli/cmd/acpctl/session/send.go b/components/ambient-cli/cmd/acpctl/session/send.go index 76c8008e0..75f50cf78 100644 --- a/components/ambient-cli/cmd/acpctl/session/send.go +++ b/components/ambient-cli/cmd/acpctl/session/send.go @@ -3,7 +3,6 @@ package session import ( "context" "fmt" - "io" "os" "os/signal" @@ -54,19 +53,6 @@ func runSend(cmd *cobra.Command, args []string) error { ctx, cancel := context.WithTimeout(cmd.Context(), cfg.GetRequestTimeout()) defer cancel() - streamCtx, streamCancel := signal.NotifyContext(cmd.Context(), os.Interrupt) - defer streamCancel() - - var stream io.ReadCloser - if sendFollow { - s, err := client.Sessions().StreamEvents(streamCtx, sessionID) - if err != nil { - return fmt.Errorf("stream events: %w", err) - } - stream = s - defer stream.Close() - } - msg, err := client.Sessions().PushMessage(ctx, sessionID, payload) if err != nil { return fmt.Errorf("send message: %w", err) @@ -78,5 +64,14 @@ func runSend(cmd *cobra.Command, args []string) error { return nil } + streamCtx, streamCancel := signal.NotifyContext(cmd.Context(), os.Interrupt) + defer streamCancel() + + stream, err := client.Sessions().StreamEvents(streamCtx, sessionID) + if err != nil { + return fmt.Errorf("stream events: %w", err) + } + defer stream.Close() + return renderSSEStream(stream, cmd.OutOrStdout(), sendFollowJSON, true) } From 7dcff3be6e77f4b4195bb7f2439aa89f74354774 Mon Sep 17 00:00:00 2001 From: Ambient Code Bot Date: Thu, 16 Apr 2026 20:55:55 -0400 Subject: [PATCH 5/5] fix: address CodeRabbit review findings on PR #1335 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - start_handler: handle ActiveByAgentID errors instead of discarding - service: distinguish ErrRecordNotFound from real DB errors - mock_dao: return newest active session matching production ORDER BY - messages -F: reconnect immediately after clean stream end - send -f: document intentional SSE-after-send ordering (proxy deadlock) - config: log warning when Save() fails after token refresh - kube_reconciler: decide useMCPSidecar before building env to avoid injecting AMBIENT_MCP_URL when sidecar is not created - demo-remote.sh: abort relay on send failure; escape tmux send-keys ๐Ÿค– Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../plugins/agents/start_handler.go | 7 ++++++- .../ambient-api-server/plugins/sessions/mock_dao.go | 8 +++++++- .../ambient-api-server/plugins/sessions/service.go | 7 ++++++- .../ambient-cli/cmd/acpctl/session/messages.go | 13 ++++++------- components/ambient-cli/cmd/acpctl/session/send.go | 5 +++++ components/ambient-cli/demo-remote.sh | 11 ++++++++--- components/ambient-cli/pkg/config/config.go | 4 +++- .../internal/reconciler/kube_reconciler.go | 13 ++++++------- 8 files changed, 47 insertions(+), 21 deletions(-) diff --git a/components/ambient-api-server/plugins/agents/start_handler.go b/components/ambient-api-server/plugins/agents/start_handler.go index ea8ea98e9..b2f92851a 100644 --- a/components/ambient-api-server/plugins/agents/start_handler.go +++ b/components/ambient-api-server/plugins/agents/start_handler.go @@ -56,7 +56,12 @@ func (h *startHandler) Start(w http.ResponseWriter, r *http.Request) { return } - if existing, _ := h.session.ActiveByAgentID(ctx, agentID); existing != nil { + existing, activeErr := h.session.ActiveByAgentID(ctx, agentID) + if activeErr != nil { + handlers.HandleError(ctx, w, activeErr) + return + } + if existing != nil { resp := &StartResponse{ Session: sessions.PresentSession(existing), } diff --git a/components/ambient-api-server/plugins/sessions/mock_dao.go b/components/ambient-api-server/plugins/sessions/mock_dao.go index c66593af1..a46f7d676 100644 --- a/components/ambient-api-server/plugins/sessions/mock_dao.go +++ b/components/ambient-api-server/plugins/sessions/mock_dao.go @@ -80,10 +80,16 @@ func (d *sessionDaoMock) AllByProjectId(ctx context.Context, projectId string) ( func (d *sessionDaoMock) ActiveByAgentID(ctx context.Context, agentID string) (*Session, error) { activePhases := map[string]bool{"Pending": true, "Creating": true, "Running": true} + var newest *Session for _, s := range d.sessions { if s.AgentId != nil && *s.AgentId == agentID && s.Phase != nil && activePhases[*s.Phase] { - return s, nil + if newest == nil || s.CreatedAt.After(newest.CreatedAt) { + newest = s + } } } + if newest != nil { + return newest, nil + } return nil, gorm.ErrRecordNotFound } diff --git a/components/ambient-api-server/plugins/sessions/service.go b/components/ambient-api-server/plugins/sessions/service.go index 1f04d5d47..73e573896 100644 --- a/components/ambient-api-server/plugins/sessions/service.go +++ b/components/ambient-api-server/plugins/sessions/service.go @@ -2,12 +2,14 @@ package sessions import ( "context" + stderrors "errors" "github.com/openshift-online/rh-trex-ai/pkg/api" "github.com/openshift-online/rh-trex-ai/pkg/db" "github.com/openshift-online/rh-trex-ai/pkg/errors" "github.com/openshift-online/rh-trex-ai/pkg/logger" "github.com/openshift-online/rh-trex-ai/pkg/services" + "gorm.io/gorm" ) const sessionsLockType db.LockType = "sessions" @@ -269,7 +271,10 @@ func (s *sqlSessionService) Start(ctx context.Context, id string) (*Session, *er func (s *sqlSessionService) ActiveByAgentID(ctx context.Context, agentID string) (*Session, *errors.ServiceError) { session, err := s.sessionDao.ActiveByAgentID(ctx, agentID) if err != nil { - return nil, nil + if stderrors.Is(err, gorm.ErrRecordNotFound) { + return nil, nil + } + return nil, errors.GeneralError("unable to look up active session for agent %s: %s", agentID, err) } return session, nil } diff --git a/components/ambient-cli/cmd/acpctl/session/messages.go b/components/ambient-cli/cmd/acpctl/session/messages.go index c6679ed70..c9d959e84 100644 --- a/components/ambient-cli/cmd/acpctl/session/messages.go +++ b/components/ambient-cli/cmd/acpctl/session/messages.go @@ -457,14 +457,13 @@ func streamMessagesContinuous(cmd *cobra.Command, client *sdkclient.Client, sess if streamErr != nil { fmt.Fprintf(out, "\n[reconnect] stream ended: %v โ€” reconnecting in %s\n", streamErr, reconnectDelay) + select { + case <-ctx.Done(): + return nil + case <-time.After(reconnectDelay): + } } else { - fmt.Fprintf(out, "\n[reconnect] stream ended โ€” reconnecting in %s\n", reconnectDelay) - } - - select { - case <-ctx.Done(): - return nil - case <-time.After(reconnectDelay): + fmt.Fprintf(out, "\n[reconnect] stream ended cleanly โ€” reconnecting immediately\n") } } } diff --git a/components/ambient-cli/cmd/acpctl/session/send.go b/components/ambient-cli/cmd/acpctl/session/send.go index 75f50cf78..7c52b2a27 100644 --- a/components/ambient-cli/cmd/acpctl/session/send.go +++ b/components/ambient-cli/cmd/acpctl/session/send.go @@ -64,6 +64,11 @@ func runSend(cmd *cobra.Command, args []string) error { return nil } + // NOTE: We intentionally open the SSE stream AFTER sending the message. + // The api-server SSE proxy does not return HTTP 200 until the runner has + // an active run, so subscribing first would deadlock. The event gap is + // negligible in practice because the runner takes time to start producing + // events after receiving the message. streamCtx, streamCancel := signal.NotifyContext(cmd.Context(), os.Interrupt) defer streamCancel() diff --git a/components/ambient-cli/demo-remote.sh b/components/ambient-cli/demo-remote.sh index 8a74fc032..770fa0e92 100755 --- a/components/ambient-cli/demo-remote.sh +++ b/components/ambient-cli/demo-remote.sh @@ -35,8 +35,12 @@ if [[ -z "${TMUX:-}" ]]; then tmux send-keys -t "${TMUX_SESSION}:0.1" "printf '\\033[2m[watch panel โ€” waiting for session]\\033[0m\\n'" Enter + ESCAPED_ARGS="" + for arg in "${DEMO_ARGS[@]}"; do + ESCAPED_ARGS+="$(printf ' %q' "$arg")" + done tmux send-keys -t "${TMUX_SESSION}:0.0" \ - "TMUX_SESSION=${TMUX_SESSION} INSIDE_DEMO_TMUX=1 bash ${DEMO_SCRIPT} ${DEMO_ARGS[*]:-}" Enter + "TMUX_SESSION=$(printf '%q' "$TMUX_SESSION") INSIDE_DEMO_TMUX=1 bash $(printf '%q' "$DEMO_SCRIPT")${ESCAPED_ARGS}" Enter tmux select-pane -t "${TMUX_SESSION}:0.0" tmux attach-session -t "$TMUX_SESSION" @@ -48,7 +52,7 @@ WATCH_PANE="1" attach_watch() { local session_id="$1" tmux send-keys -t "${TMUX_SESSION}:0.${WATCH_PANE}" \ - "${ACPCTL:-acpctl} session messages ${session_id} -F" Enter + "$(printf '%q' "${ACPCTL:-acpctl}") session messages $(printf '%q' "$session_id") -F" Enter } # โ”€โ”€ config โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ @@ -228,7 +232,8 @@ send_and_capture() { if timeout "${MESSAGE_WAIT_TIMEOUT}" "$ACPCTL" session send "${session_id}" "$msg" -f 2>&1; then green " โœ“ ${label} responded" else - yellow " โœ— ${label}: send -f exited with error or timeout" + red " โœ— ${label}: send -f failed or timed out โ€” aborting demo" + exit 1 fi echo diff --git a/components/ambient-cli/pkg/config/config.go b/components/ambient-cli/pkg/config/config.go index c214ce1bf..5c7003346 100644 --- a/components/ambient-cli/pkg/config/config.go +++ b/components/ambient-cli/pkg/config/config.go @@ -140,7 +140,9 @@ func (c *Config) GetTokenWithRefresh() (string, error) { c.RefreshToken = newRefresh } - _ = Save(c) + if saveErr := Save(c); saveErr != nil { + fmt.Fprintf(os.Stderr, "warning: failed to persist refreshed token: %v\n", saveErr) + } return c.AccessToken, nil } diff --git a/components/ambient-control-plane/internal/reconciler/kube_reconciler.go b/components/ambient-control-plane/internal/reconciler/kube_reconciler.go index c4ef2bd94..a74fc7a86 100644 --- a/components/ambient-control-plane/internal/reconciler/kube_reconciler.go +++ b/components/ambient-control-plane/internal/reconciler/kube_reconciler.go @@ -408,7 +408,10 @@ func (r *SimpleKubeReconciler) ensurePod(ctx context.Context, namespace string, } labels := sessionLabels(session.ID, session.ProjectID) - useMCPSidecar := r.cfg.MCPImage != "" + useMCPSidecar := r.cfg.MCPImage != "" && r.cfg.CPTokenURL != "" && r.cfg.CPTokenPublicKey != "" + if r.cfg.MCPImage != "" && !useMCPSidecar { + r.logger.Warn().Str("session_id", session.ID).Msg("MCP sidecar disabled: CP_TOKEN_URL or CPTokenPublicKey not configured") + } containers := []interface{}{ map[string]interface{}{ @@ -444,12 +447,8 @@ func (r *SimpleKubeReconciler) ensurePod(ctx context.Context, namespace string, } if useMCPSidecar { - if r.cfg.CPTokenURL == "" || r.cfg.CPTokenPublicKey == "" { - r.logger.Warn().Str("session_id", session.ID).Msg("MCP sidecar skipped: CP_TOKEN_URL or CPTokenPublicKey not configured") - } else { - containers = append(containers, r.buildMCPSidecar(session.ID)) - r.logger.Info().Str("session_id", session.ID).Msg("MCP sidecar enabled for session") - } + containers = append(containers, r.buildMCPSidecar(session.ID)) + r.logger.Info().Str("session_id", session.ID).Msg("MCP sidecar enabled for session") } pod := &unstructured.Unstructured{