diff --git a/.dockerignore b/.dockerignore index fb6923e..80fa18f 100644 --- a/.dockerignore +++ b/.dockerignore @@ -6,5 +6,4 @@ !cmd/ !cmd/** !internal/ -!internal/terminalws/ -!internal/terminalws/** +!internal/** diff --git a/.github/workflows/cli.yml b/.github/workflows/cli.yml index 21cedf9..305393e 100644 --- a/.github/workflows/cli.yml +++ b/.github/workflows/cli.yml @@ -6,6 +6,7 @@ on: - main paths: - "cmd/**" + - "internal/**" - "go.mod" - "go.sum" - "Dockerfile.ssh-gateway" @@ -13,6 +14,7 @@ on: pull_request: paths: - "cmd/**" + - "internal/**" - "go.mod" - "go.sum" - "Dockerfile.ssh-gateway" diff --git a/CHANGELOG.md b/CHANGELOG.md index 3bb10d3..2a76917 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## Unreleased +- De-duplicate the Go CLI and SSH gateway around shared control-plane models, authentication, lifecycle semantics, API calls, terminal operations, and terminal-safe session rendering. - Unify managed terminal clients on the multiplex `/api/terminal/ws` protocol, remove direct PTY routes, and share one framed Go transport across the CLI and SSH gateway. - Connect Crabfleet lifecycle and terminal traffic to Crabbox through a Cloudflare service binding and deploy an identical route-scoped credential atomically across both coordinators. - Make OpenClaw room trees recoverable with idempotent Crabbox creation and root-level admission freeze plus recursive stop. diff --git a/Dockerfile.ssh-gateway b/Dockerfile.ssh-gateway index 1899fd5..57efad8 100644 --- a/Dockerfile.ssh-gateway +++ b/Dockerfile.ssh-gateway @@ -3,7 +3,7 @@ WORKDIR /src COPY go.mod go.sum ./ RUN go mod download COPY cmd ./cmd -COPY internal/terminalws ./internal/terminalws +COPY internal ./internal RUN CGO_ENABLED=0 go build -trimpath -ldflags="-s -w" -o /out/crabbox-ssh-gateway ./cmd/crabbox-ssh-gateway FROM alpine:3.22 diff --git a/README.md b/README.md index d82d3e1..215f363 100644 --- a/README.md +++ b/README.md @@ -396,6 +396,11 @@ crabfleet/ ├── migrations/ # D1 database migrations ├── scripts/ # Build scripts │ └── generate-assets.mjs +├── cmd/ # Go CLI and SSH gateway entry points +├── internal/ +│ ├── fleetapi/ # Shared Go control-plane client and domain contracts +│ ├── fleettext/ # Shared terminal-safe session rendering +│ └── terminalws/ # Shared multiplex terminal protocol client ├── vite.config.mjs # Preact/Vite app bundle config ├── docs/ # Documentation (GitHub Pages) │ ├── CNAME # docs.crabfleet.ai custom domain diff --git a/cmd/crabbox-ssh-gateway/main.go b/cmd/crabbox-ssh-gateway/main.go index 13adf5d..944b5f7 100644 --- a/cmd/crabbox-ssh-gateway/main.go +++ b/cmd/crabbox-ssh-gateway/main.go @@ -13,12 +13,12 @@ import ( "log" "net" "net/http" - "net/url" "os" "strings" "time" - "github.com/openclaw/crabfleet/internal/terminalws" + "github.com/openclaw/crabfleet/internal/fleetapi" + "github.com/openclaw/crabfleet/internal/fleettext" "golang.org/x/crypto/ssh" ) @@ -29,136 +29,23 @@ type apiClient struct { } type authResponse struct { - Authorized bool `json:"authorized"` - LinkURL string `json:"linkUrl"` - User user `json:"user"` -} - -type user struct { - Login string `json:"login"` - Email string `json:"email"` - Subject string `json:"subject"` - Role string `json:"role"` -} - -type stateResponse struct { - User user `json:"user"` - Repos []string `json:"repos"` - InteractiveSessions []interactiveSession `json:"interactiveSessions"` - Cards []card `json:"cards"` -} - -type interactiveSession struct { - ID string `json:"id"` - ParentSessionID string `json:"parentSessionId"` - RootSessionID string `json:"rootSessionId"` - Repo string `json:"repo"` - Branch string `json:"branch"` - Runtime string `json:"runtime"` - Adapter string `json:"adapter"` - Status string `json:"status"` - Owner string `json:"owner"` - CreatedBy string `json:"createdBy"` - Purpose string `json:"purpose"` - Summary string `json:"summary"` - Capabilities *sessionCapabilities `json:"capabilities"` - PtyAvailable bool `json:"ptyAvailable"` - LeaseID string `json:"leaseId"` - AttachURL string `json:"attachUrl"` - VNCURL string `json:"vncUrl"` - LastEvent string `json:"lastEvent"` - LogArchive logArchive `json:"logArchive"` -} - -func legacyProviderCleanupMayBeRequired(session interactiveSession) bool { - if session.Adapter == "runtime-v1" || session.Runtime == "github_actions" { - return false - } - switch session.Status { - case "stopping", "stopped", "expired": - return true - default: - return false - } -} - -func lifecycleStopNote(session interactiveSession) string { - if session.Runtime == "github_actions" && session.Status == "stopped" { - return "GitHub Actions workflow run was not canceled and may continue on GitHub" - } - if legacyProviderCleanupMayBeRequired(session) { - return "provider deletion was not confirmed; legacy runtimes may require separate cleanup" - } - return "" -} - -type sessionCapabilities struct { - Terminal bool `json:"terminal"` -} - -type card struct { - ID string `json:"id"` - Title string `json:"title"` - Repo string `json:"repo"` - Lane string `json:"lane"` - LastEvent string `json:"lastEvent"` -} - -type createSessionRequest struct { - Repo string `json:"repo,omitempty"` - Branch string `json:"branch,omitempty"` - Runtime string `json:"runtime,omitempty"` - Profile string `json:"profile,omitempty"` - Command string `json:"command,omitempty"` - Prompt string `json:"prompt,omitempty"` - ParentSessionID string `json:"parentSessionId,omitempty"` - RootSessionID string `json:"rootSessionId,omitempty"` - Purpose string `json:"purpose,omitempty"` - Summary string `json:"summary,omitempty"` + Authorized bool `json:"authorized"` + LinkURL string `json:"linkUrl"` + User fleetapi.User `json:"user"` } type createArgs struct { - request createSessionRequest + request fleetapi.CreateSessionRequest detach bool vnc bool } -type createSessionResponse struct { - Session interactiveSession `json:"session"` -} - -type sessionResponse struct { - Session interactiveSession `json:"session"` -} - -type sessionLogResponse struct { - Session interactiveSession `json:"session"` - Events []sessionLogEvent `json:"events"` - Archive logArchive `json:"archive"` -} - -type sessionLogEvent struct { - Actor string `json:"actor"` - Message string `json:"message"` - CreatedAt int64 `json:"createdAt"` -} - -type logArchive struct { - SessionID string `json:"sessionId"` - EventCount int `json:"eventCount"` - EventsKey string `json:"eventsKey"` - TranscriptKey string `json:"transcriptKey"` - SummaryKey string `json:"summaryKey"` - ArchivedAt int64 `json:"archivedAt"` - UpdatedAt int64 `json:"updatedAt"` -} - type keyAuth struct { authorized bool fingerprint string publicKey string linkURL string - user user + user fleetapi.User } type sessionPTY struct { @@ -325,7 +212,7 @@ func runCommand(ctx context.Context, out io.ReadWriter, perms *ssh.Permissions, fingerprint: perms.Extensions["fingerprint"], publicKey: perms.Extensions["public_key"], linkURL: perms.Extensions["link_url"], - user: user{ + user: fleetapi.User{ Login: perms.Extensions["login"], Role: perms.Extensions["role"], }, @@ -334,6 +221,7 @@ func runCommand(ctx context.Context, out io.ReadWriter, perms *ssh.Permissions, fmt.Fprintf(out, "Crabfleet SSH key not linked.\n\nOpen this URL to connect it:\n%s\n\nThen run ssh again.\n", auth.linkURL) return 1 } + api := client.controlPlane(auth.fingerprint) args, err := splitCommand(command) if err != nil { @@ -349,7 +237,7 @@ func runCommand(ctx context.Context, out io.ReadWriter, perms *ssh.Permissions, printHelp(out, auth.user) return 0 case "whoami": - state, err := client.state(ctx, auth.fingerprint) + state, err := api.State(ctx) if err != nil { fmt.Fprintf(out, "error: %v\n", err) return 1 @@ -357,13 +245,13 @@ func runCommand(ctx context.Context, out io.ReadWriter, perms *ssh.Permissions, fmt.Fprintf( out, "login: %s\nrole: %s\nfingerprint: %s\n", - terminalSafe(displayUser(state.User)), - terminalSafe(state.User.Role), - terminalSafe(auth.fingerprint), + fleettext.Safe(fleettext.DisplayUser(state.User)), + fleettext.Safe(state.User.Role), + fleettext.Safe(auth.fingerprint), ) return 0 case "list", "ls": - state, err := client.state(ctx, auth.fingerprint) + state, err := api.State(ctx) if err != nil { fmt.Fprintf(out, "error: %v\n", err) return 1 @@ -371,18 +259,18 @@ func runCommand(ctx context.Context, out io.ReadWriter, perms *ssh.Permissions, printList(out, state) return 0 case "new": - create := parseCreate(args[1:], client, auth.fingerprint) - session, err := client.createSession(ctx, auth.fingerprint, create.request) + create := parseCreate(args[1:], api) + session, err := api.CreateSession(ctx, create.request) if err != nil { fmt.Fprintf(out, "error: %v\n", err) return 1 } fmt.Fprintf(out, "session: %s\nrepo: %s\nstatus: %s\n", session.ID, session.Repo, session.Status) - if attachable(session) { + if session.Attachable() { fmt.Fprintf(out, "attach: ssh crabfleet attach %s\n", session.ID) } if session.VNCURL != "" { - fmt.Fprintf(out, "vnc: %s\n", terminalSafe(session.VNCURL)) + fmt.Fprintf(out, "vnc: %s\n", fleettext.Safe(session.VNCURL)) } if create.vnc { if session.VNCURL == "" { @@ -390,22 +278,22 @@ func runCommand(ctx context.Context, out io.ReadWriter, perms *ssh.Permissions, } return 0 } - if create.detach || !attachable(session) { + if create.detach || !session.Attachable() { return 0 } - return client.attach(ctx, auth.fingerprint, session.ID, out, pty) + return attach(ctx, out, api, session.ID, pty) case "attach": if len(args) < 2 { fmt.Fprintln(out, "usage: attach SESSION_ID") return 2 } - return client.attach(ctx, auth.fingerprint, args[1], out, pty) + return attach(ctx, out, api, args[1], pty) case "vnc": if len(args) < 2 { fmt.Fprintln(out, "usage: vnc SESSION_ID") return 2 } - state, err := client.state(ctx, auth.fingerprint) + state, err := api.State(ctx) if err != nil { fmt.Fprintf(out, "error: %v\n", err) return 1 @@ -415,26 +303,26 @@ func runCommand(ctx context.Context, out io.ReadWriter, perms *ssh.Permissions, continue } if session.VNCURL == "" { - fmt.Fprintf(out, "session %s has no WebVNC URL yet\n", terminalSafe(args[1])) + fmt.Fprintf(out, "session %s has no WebVNC URL yet\n", fleettext.Safe(args[1])) return 1 } - fmt.Fprintln(out, terminalSafe(session.VNCURL)) + fmt.Fprintln(out, fleettext.Safe(session.VNCURL)) return 0 } - fmt.Fprintf(out, "session %s not found\n", terminalSafe(args[1])) + fmt.Fprintf(out, "session %s not found\n", fleettext.Safe(args[1])) return 1 case "delete", "stop": if len(args) != 2 { fmt.Fprintln(out, "usage: delete SESSION_ID") return 2 } - session, err := client.action(ctx, auth.fingerprint, args[1], "stop") + session, err := api.Action(ctx, args[1], "stop") if err != nil { fmt.Fprintf(out, "error: %v\n", err) return 1 } - fmt.Fprintf(out, "session: %s\nstatus: %s\n", terminalSafe(session.ID), terminalSafe(session.Status)) - if note := lifecycleStopNote(session); note != "" { + fmt.Fprintf(out, "session: %s\nstatus: %s\n", fleettext.Safe(session.ID), fleettext.Safe(session.Status)) + if note := session.LifecycleStopNote(); note != "" { fmt.Fprintf(out, "note: %s\n", note) } return 0 @@ -443,19 +331,19 @@ func runCommand(ctx context.Context, out io.ReadWriter, perms *ssh.Permissions, fmt.Fprintln(out, "usage: logs SESSION_ID") return 2 } - logs, err := client.logs(ctx, auth.fingerprint, args[1]) + logs, err := api.Logs(ctx, args[1]) if err != nil { fmt.Fprintf(out, "error: %v\n", err) return 1 } - printSessionLogs(out, logs) + fleettext.WriteSessionLogs(out, logs) return 0 case "transcript": if len(args) < 2 { fmt.Fprintln(out, "usage: transcript SESSION_ID") return 2 } - transcript, err := client.transcript(ctx, auth.fingerprint, args[1]) + transcript, err := api.Transcript(ctx, args[1]) if err != nil { fmt.Fprintf(out, "error: %v\n", err) return 1 @@ -475,11 +363,11 @@ func runCommand(ctx context.Context, out io.ReadWriter, perms *ssh.Permissions, fmt.Fprintln(out, "usage: message SESSION_ID [--no-enter] TEXT") return 2 } - if err := client.message(ctx, auth.fingerprint, args[1], message.text, !message.noEnter, pty); err != nil { + if err := api.Message(ctx, args[1], message.text, !message.noEnter, pty.cols, pty.rows); err != nil { fmt.Fprintf(out, "error: %v\n", err) return 1 } - fmt.Fprintf(out, "sent: %s\n", terminalSafe(args[1])) + fmt.Fprintf(out, "sent: %s\n", fleettext.Safe(args[1])) return 0 case "summary": if len(args) < 2 { @@ -488,26 +376,26 @@ func runCommand(ctx context.Context, out io.ReadWriter, perms *ssh.Permissions, } update := parseSummary(args[2:]) if update.summary == "" && update.purpose == "" { - state, err := client.state(ctx, auth.fingerprint) + state, err := api.State(ctx) if err != nil { fmt.Fprintf(out, "error: %v\n", err) return 1 } for _, session := range state.InteractiveSessions { if session.ID == args[1] { - printSessionSummary(out, session) + fleettext.WriteSessionSummary(out, session) return 0 } } - fmt.Fprintf(out, "session %s not found\n", terminalSafe(args[1])) + fmt.Fprintf(out, "session %s not found\n", fleettext.Safe(args[1])) return 1 } - session, err := client.updateSummary(ctx, auth.fingerprint, args[1], update.summary, update.purpose) + session, err := api.UpdateSummary(ctx, args[1], update.summary, update.purpose) if err != nil { fmt.Fprintf(out, "error: %v\n", err) return 1 } - printSessionSummary(out, session) + fleettext.WriteSessionSummary(out, session) return 0 case "open": fmt.Fprintf(out, "%s/app/\n", client.baseURL) @@ -519,28 +407,13 @@ func runCommand(ctx context.Context, out io.ReadWriter, perms *ssh.Permissions, } } -func terminalCapable(session interactiveSession) bool { - return session.Capabilities == nil || session.Capabilities.Terminal -} - -func attachable(session interactiveSession) bool { - if !terminalCapable(session) || !ptyAttachable(session) { - return false - } - switch session.Status { - case "ready", "attached", "detached": - return true - default: - return false - } -} - -func ptyAttachable(session interactiveSession) bool { - return session.PtyAvailable -} - -func printHelp(out io.Writer, user user) { - fmt.Fprintf(out, "Crabfleet SSH\nlogin: %s\nrole: %s\n\n", terminalSafe(displayUser(user)), terminalSafe(user.Role)) +func printHelp(out io.Writer, user fleetapi.User) { + fmt.Fprintf( + out, + "Crabfleet SSH\nlogin: %s\nrole: %s\n\n", + fleettext.Safe(fleettext.DisplayUser(user)), + fleettext.Safe(user.Role), + ) fmt.Fprintln(out, "commands:") fmt.Fprintln(out, " whoami") fmt.Fprintln(out, " list") @@ -557,14 +430,17 @@ func printHelp(out io.Writer, user user) { fmt.Fprintln(out, " open") } -func printList(out io.Writer, state stateResponse) { - fmt.Fprintf(out, "user: %s (%s)\n", terminalSafe(displayUser(state.User)), terminalSafe(state.User.Role)) - fmt.Fprintf(out, "repos: %s\n", compactList(state.Repos, 12)) +func printList(out io.Writer, state fleetapi.State) { + fmt.Fprintf( + out, + "user: %s (%s)\n", + fleettext.Safe(fleettext.DisplayUser(state.User)), + fleettext.Safe(state.User.Role), + ) + fmt.Fprintf(out, "repos: %s\n", fleettext.CompactList(state.Repos, 12)) fmt.Fprintln(out, "\nsessions:") - if len(state.InteractiveSessions) == 0 { + if !fleettext.WriteSessionGroups(out, state.InteractiveSessions, " ") { fmt.Fprintln(out, " none") - } else { - printSessionGroups(out, state.InteractiveSessions, " ") } fmt.Fprintln(out, "\ncards:") if len(state.Cards) == 0 { @@ -575,175 +451,14 @@ func printList(out io.Writer, state stateResponse) { fmt.Fprintf( out, " %s: %s %s %s\n", - terminalSafe(c.ID), - terminalSafe(c.Lane), - terminalSafe(c.Repo), - terminalSafe(c.Title), - ) - } -} - -func printSessionGroups(out io.Writer, sessions []interactiveSession, indent string) { - groups := map[string][]interactiveSession{} - var owners []string - for _, session := range sessions { - owner := session.Owner - if owner == "" { - owner = "unassigned" - } - if _, ok := groups[owner]; !ok { - owners = append(owners, owner) - } - groups[owner] = append(groups[owner], session) - } - sortStrings(owners) - for _, owner := range owners { - fmt.Fprintf(out, "%s%s:\n", indent, terminalSafe(owner)) - printSessionTree(out, groups[owner], indent+" ") - } -} - -func printSessionTree(out io.Writer, sessions []interactiveSession, indent string) { - byParent := map[string][]interactiveSession{} - known := map[string]bool{} - seen := map[string]bool{} - for _, session := range sessions { - known[session.ID] = true - byParent[session.ParentSessionID] = append(byParent[session.ParentSessionID], session) - } - for parent := range byParent { - sortSessions(byParent[parent]) - } - var roots []interactiveSession - for _, session := range sessions { - if session.ParentSessionID == "" || !known[session.ParentSessionID] { - roots = append(roots, session) - } - } - sortSessions(roots) - var walk func(interactiveSession, string) - walk = func(session interactiveSession, prefix string) { - if seen[session.ID] { - return - } - seen[session.ID] = true - fmt.Fprintf(out, "%s%s\n", prefix, sessionLine(session)) - for _, child := range byParent[session.ID] { - walk(child, prefix+" ") - } - } - for _, root := range roots { - walk(root, indent) - } - for _, session := range sessions { - if !seen[session.ID] { - walk(session, indent) - } - } -} - -func sortStrings(values []string) { - for i := 1; i < len(values); i++ { - for j := i; j > 0 && values[j-1] > values[j]; j-- { - values[j-1], values[j] = values[j], values[j-1] - } - } -} - -func sortSessions(sessions []interactiveSession) { - for i := 1; i < len(sessions); i++ { - for j := i; j > 0 && sessions[j-1].ID > sessions[j].ID; j-- { - sessions[j-1], sessions[j] = sessions[j], sessions[j-1] - } - } -} - -func sessionLine(session interactiveSession) string { - parts := []string{ - terminalSafe(session.ID), - terminalSafe(session.Status), - terminalSafe(session.Runtime), - terminalSafe(session.Repo), - } - if summary := sessionSummary(session); summary != "" { - parts = append(parts, "- "+terminalSafe(summary)) - } - return strings.Join(parts, " ") -} - -func sessionSummary(session interactiveSession) string { - if session.Summary != "" { - return session.Summary - } - if session.Purpose != "" { - return session.Purpose - } - return session.LastEvent -} - -func printSessionLogs(out io.Writer, logs sessionLogResponse) { - fmt.Fprintf( - out, - "session: %s\nrepo: %s\nstatus: %s\n", - terminalSafe(logs.Session.ID), - terminalSafe(logs.Session.Repo), - terminalSafe(logs.Session.Status), - ) - if logs.Archive.EventCount > 0 { - fmt.Fprintf(out, "archive: %d events\n", logs.Archive.EventCount) - } - for _, event := range logs.Events { - timestamp := time.UnixMilli(event.CreatedAt).Format("15:04:05") - fmt.Fprintf( - out, - "%s %s %s\n", - timestamp, - terminalSafe(event.Actor), - terminalSafe(event.Message), + fleettext.Safe(c.ID), + fleettext.Safe(c.Lane), + fleettext.Safe(c.Repo), + fleettext.Safe(c.Title), ) } } -func printSessionSummary(out io.Writer, session interactiveSession) { - fmt.Fprintf(out, "session: %s\n", terminalSafe(session.ID)) - if session.Purpose != "" { - fmt.Fprintf(out, "purpose: %s\n", terminalSafe(session.Purpose)) - } - if session.Summary != "" { - fmt.Fprintf(out, "summary: %s\n", terminalSafe(session.Summary)) - } -} - -func compactList(values []string, limit int) string { - if len(values) == 0 { - return "none" - } - if len(values) <= limit { - return strings.Join(terminalSafeSlice(values), ", ") - } - return fmt.Sprintf("%s, +%d more", strings.Join(terminalSafeSlice(values[:limit]), ", "), len(values)-limit) -} - -func terminalSafeSlice(values []string) []string { - safe := make([]string, len(values)) - for i, value := range values { - safe[i] = terminalSafe(value) - } - return safe -} - -func terminalSafe(value string) string { - return strings.Map(func(r rune) rune { - if r == '\n' || r == '\r' || r == '\t' { - return ' ' - } - if r < 0x20 || r == 0x7f || (r >= 0x80 && r <= 0x9f) { - return -1 - } - return r - }, value) -} - func splitCommand(command string) ([]string, error) { var args []string var current strings.Builder @@ -809,10 +524,10 @@ func splitCommand(command string) ([]string, error) { return args, nil } -func parseCreate(args []string, client *apiClient, fingerprint string) createArgs { +func parseCreate(args []string, api *fleetapi.Client) createArgs { fs := flag.NewFlagSet("new", flag.ContinueOnError) fs.SetOutput(io.Discard) - var req createSessionRequest + var req fleetapi.CreateSessionRequest var detach bool var vnc bool fs.StringVar(&req.Repo, "repo", "", "repo") @@ -828,8 +543,8 @@ func parseCreate(args []string, client *apiClient, fingerprint string) createArg fs.BoolVar(&vnc, "vnc", false, "print vnc URL without attaching") _ = fs.Parse(args) req.Prompt = strings.Join(fs.Args(), " ") - if req.Repo == "" { - if state, err := client.state(context.Background(), fingerprint); err == nil && len(state.Repos) > 0 { + if req.Repo == "" && api != nil { + if state, err := api.State(context.Background()); err == nil && len(state.Repos) > 0 { req.Repo = state.Repos[0] } } @@ -886,97 +601,18 @@ func (c *apiClient) auth(ctx context.Context, key ssh.PublicKey, sshUser string, }, err } -func (c *apiClient) state(ctx context.Context, fingerprint string) (stateResponse, error) { - var response stateResponse - err := c.do(ctx, http.MethodGet, "/api/ssh/state", fingerprint, nil, &response) - return response, err -} - -func (c *apiClient) createSession(ctx context.Context, fingerprint string, request createSessionRequest) (interactiveSession, error) { - var response createSessionResponse - err := c.do(ctx, http.MethodPost, "/api/ssh/interactive-sessions", fingerprint, request, &response) - return response.Session, err -} - -func (c *apiClient) action(ctx context.Context, fingerprint string, id string, action string) (interactiveSession, error) { - var response sessionResponse - err := c.do( - ctx, - http.MethodPost, - "/api/ssh/interactive-sessions/"+url.PathEscape(id)+"/actions", - fingerprint, - map[string]string{"action": action}, - &response, - ) - return response.Session, err -} - -func (c *apiClient) logs(ctx context.Context, fingerprint string, id string) (sessionLogResponse, error) { - var response sessionLogResponse - err := c.do(ctx, http.MethodGet, "/api/ssh/interactive-sessions/"+url.PathEscape(id)+"/logs", fingerprint, nil, &response) - return response, err -} - -func (c *apiClient) transcript(ctx context.Context, fingerprint string, id string) (string, error) { - data, err := c.raw(ctx, http.MethodGet, "/api/ssh/interactive-sessions/"+url.PathEscape(id)+"/transcript", fingerprint, nil) - return string(data), err +func (c *apiClient) controlPlane(fingerprint string) *fleetapi.Client { + return fleetapi.NewClient(c.baseURL, c.client, fleetapi.SSHAuth(c.token, fingerprint)) } -func (c *apiClient) updateSummary(ctx context.Context, fingerprint string, id string, summary string, purpose string) (interactiveSession, error) { - var response sessionResponse - err := c.do(ctx, http.MethodPost, "/api/ssh/interactive-sessions/"+url.PathEscape(id)+"/summary", fingerprint, map[string]string{ - "summary": summary, - "purpose": purpose, - }, &response) - return response.Session, err -} - -func (c *apiClient) message(ctx context.Context, fingerprint string, id string, message string, enter bool, pty sessionPTY) error { - endpoint, err := terminalws.Endpoint(c.baseURL) - if err != nil { - return err - } - - headers := http.Header{} - headers.Set("Authorization", "Bearer "+c.token) - headers.Set("X-Crabfleet-SSH-Fingerprint", fingerprint) - client, err := terminalws.Dial(ctx, endpoint, id, terminalws.Options{ - Header: headers, - Cols: pty.cols, - Rows: pty.rows, - }) - if err != nil { - return err - } - defer client.Close() - payload := message - if enter { - payload += "\n" - } - return client.SendInput(ctx, []byte(payload)) -} - -func (c *apiClient) attach(ctx context.Context, fingerprint string, id string, terminal io.ReadWriter, pty sessionPTY) uint32 { - endpoint, err := terminalws.Endpoint(c.baseURL) - if err != nil { - fmt.Fprintf(terminal, "error: %v\n", err) - return 1 - } - - headers := http.Header{} - headers.Set("Authorization", "Bearer "+c.token) - headers.Set("X-Crabfleet-SSH-Fingerprint", fingerprint) - client, err := terminalws.Dial(ctx, endpoint, id, terminalws.Options{ - Header: headers, - Cols: pty.cols, - Rows: pty.rows, - }) - if err != nil { - fmt.Fprintf(terminal, "attach failed: %v\n", err) - return 1 - } - defer client.Close() - err = client.Attach(ctx, terminal) +func attach( + ctx context.Context, + terminal io.ReadWriter, + api *fleetapi.Client, + id string, + pty sessionPTY, +) uint32 { + err := api.Attach(ctx, id, terminal, pty.cols, pty.rows) if err != nil && !errors.Is(err, net.ErrClosed) && !strings.Contains(err.Error(), "closed") { fmt.Fprintf(terminal, "\nattach closed: %v\n", err) return 1 @@ -1035,19 +671,6 @@ func replyExit(channel ssh.Channel, code uint32) { _, _ = channel.SendRequest("exit-status", false, ssh.Marshal(struct{ Status uint32 }{code})) } -func displayUser(u user) string { - if u.Login != "" { - return "@" + u.Login - } - if u.Email != "" { - return u.Email - } - if u.Subject != "" { - return u.Subject - } - return "unknown" -} - func loadHostKey(path string, allowEphemeral bool) (ssh.Signer, error) { if path != "" { data, err := os.ReadFile(path) diff --git a/cmd/crabbox-ssh-gateway/main_test.go b/cmd/crabbox-ssh-gateway/main_test.go index 901a071..c2f952b 100644 --- a/cmd/crabbox-ssh-gateway/main_test.go +++ b/cmd/crabbox-ssh-gateway/main_test.go @@ -10,6 +10,7 @@ import ( "strings" "testing" + "github.com/openclaw/crabfleet/internal/fleetapi" "golang.org/x/crypto/ssh" ) @@ -53,7 +54,6 @@ func TestParseCreateKeepsLineageAndSummaryFlags(t *testing.T) { "continue work", }, nil, - "", ) if got, want := create.request.ParentSessionID, "IS-1"; got != want { t.Fatalf("parent = %q, want %q", got, want) @@ -83,7 +83,7 @@ func TestParseMessageKeepsNoEnterAndText(t *testing.T) { } func TestParseCreateLeavesRuntimeToDeploymentDefault(t *testing.T) { - create := parseCreate([]string{"--repo", "openclaw/crabfleet", "fix it"}, nil, "") + create := parseCreate([]string{"--repo", "openclaw/crabfleet", "fix it"}, nil) if create.request.Runtime != "" { t.Fatalf("runtime = %q, want deployment default", create.request.Runtime) } @@ -91,7 +91,6 @@ func TestParseCreateLeavesRuntimeToDeploymentDefault(t *testing.T) { create = parseCreate( []string{"--repo", "openclaw/crabfleet", "--runtime", "container", "fix it"}, nil, - "", ) if create.request.Runtime != "container" { t.Fatalf("runtime = %q, want explicit override", create.request.Runtime) @@ -102,39 +101,12 @@ func TestParseCreateAcceptsProfileOverride(t *testing.T) { create := parseCreate( []string{"--repo", "openclaw/crabfleet", "--profile", "desktop-a", "fix it"}, nil, - "", ) if create.request.Profile != "desktop-a" { t.Fatalf("profile = %q, want explicit override", create.request.Profile) } } -func TestTerminalCapabilityWithdrawalSuppressesAttach(t *testing.T) { - if !terminalCapable(interactiveSession{}) { - t.Fatal("legacy session without capabilities should remain attachable") - } - if terminalCapable(interactiveSession{ - Capabilities: &sessionCapabilities{Terminal: false}, - }) { - t.Fatal("explicit terminal capability withdrawal should suppress attach") - } -} - -func TestCreateAutoAttachRequiresReadyResolvablePTY(t *testing.T) { - if attachable(interactiveSession{Status: "provisioning", PtyAvailable: true}) { - t.Fatal("provisioning create must succeed without auto-attach") - } - if attachable(interactiveSession{Status: "ready", PtyAvailable: false}) { - t.Fatal("ready session without a PTY route must not auto-attach") - } - if !attachable(interactiveSession{Status: "ready", PtyAvailable: true}) { - t.Fatal("ready session with a PTY route should auto-attach") - } - if attachable(interactiveSession{Status: "detached", AttachURL: "/api/terminal/ws"}) { - t.Fatal("attach URL must not override missing PTY availability") - } -} - func TestDeleteCommandAndStopAliasUseWorkspaceStopAction(t *testing.T) { var action string server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -200,7 +172,7 @@ func TestDeleteCommandAndStopAliasUseWorkspaceStopAction(t *testing.T) { func TestHelpNamesDeleteAsCanonicalCommand(t *testing.T) { var output bytes.Buffer - printHelp(&output, user{Login: "operator", Role: "owner"}) + printHelp(&output, fleetapi.User{Login: "operator", Role: "owner"}) if got := output.String(); !strings.Contains(got, "delete SESSION_ID") || strings.Contains(got, "stop SESSION_ID") { t.Fatalf("help = %q", got) } @@ -208,40 +180,19 @@ func TestHelpNamesDeleteAsCanonicalCommand(t *testing.T) { func TestHelpDocumentsProfileOverride(t *testing.T) { var output bytes.Buffer - printHelp(&output, user{Login: "operator", Role: "owner"}) + printHelp(&output, fleetapi.User{Login: "operator", Role: "owner"}) if got := output.String(); !strings.Contains(got, "[--profile name]") || !strings.Contains(got, "--profile overrides the deployment default") { t.Fatalf("help = %q", got) } } -func TestLegacyProviderCleanupWarningRequiresConfirmedLegacyStop(t *testing.T) { - if !legacyProviderCleanupMayBeRequired(interactiveSession{Status: "stopped"}) { - t.Fatal("confirmed legacy stop should retain the cleanup warning") - } - for _, session := range []interactiveSession{ - {Status: "failed"}, - {Status: "stopped", Adapter: "runtime-v1"}, - {Status: "stopped", Runtime: "github_actions"}, - } { - if legacyProviderCleanupMayBeRequired(session) { - t.Fatalf("session %#v must not recommend provider cleanup", session) - } - } - if got := lifecycleStopNote(interactiveSession{Status: "stopped", Runtime: "github_actions"}); !strings.Contains(got, "not canceled") { - t.Fatalf("GitHub Actions note = %q", got) - } - if got := lifecycleStopNote(interactiveSession{Status: "failed"}); got != "" { - t.Fatalf("failed unowned workspace note = %q", got) - } -} - func TestPrintListShowsOwnersAndSessionTree(t *testing.T) { var out bytes.Buffer - printList(&out, stateResponse{ - User: user{Login: "steipete", Role: "owner"}, + printList(&out, fleetapi.State{ + User: fleetapi.User{Login: "steipete", Role: "owner"}, Repos: []string{"openclaw/crabfleet"}, - InteractiveSessions: []interactiveSession{ + InteractiveSessions: []fleetapi.Session{ { ID: "IS-2", Owner: "steipete", diff --git a/cmd/crabfleet/main.go b/cmd/crabfleet/main.go index 63b2bc6..da257cf 100644 --- a/cmd/crabfleet/main.go +++ b/cmd/crabfleet/main.go @@ -1,23 +1,21 @@ package main import ( - "bytes" "context" "encoding/json" "errors" "fmt" "io" "net/http" - "net/url" "os" "os/exec" "runtime" - "sort" "strings" "time" "github.com/alecthomas/kong" - "github.com/openclaw/crabfleet/internal/terminalws" + "github.com/openclaw/crabfleet/internal/fleetapi" + "github.com/openclaw/crabfleet/internal/fleettext" ) const defaultAPIURL = "https://crabfleet.openclaw.ai" @@ -129,141 +127,6 @@ type summaryCmd struct { type openCmd struct{} -type apiClient struct { - baseURL string - token string - fingerprint string - agentToken string - agentSessionID string - http *http.Client -} - -type stateResponse struct { - User user `json:"user"` - Repos []string `json:"repos"` - InteractiveSessions []interactiveSession `json:"interactiveSessions"` -} - -type user struct { - Login string `json:"login"` - Email string `json:"email"` - Subject string `json:"subject"` - Role string `json:"role"` -} - -type interactiveSession struct { - ID string `json:"id"` - ParentSessionID string `json:"parentSessionId"` - RootSessionID string `json:"rootSessionId"` - Repo string `json:"repo"` - Branch string `json:"branch"` - Runtime string `json:"runtime"` - Adapter string `json:"adapter"` - Status string `json:"status"` - Owner string `json:"owner"` - CreatedBy string `json:"createdBy"` - Purpose string `json:"purpose"` - Summary string `json:"summary"` - Capabilities *sessionCapabilities `json:"capabilities"` - PtyAvailable bool `json:"ptyAvailable"` - LeaseID string `json:"leaseId"` - AttachURL string `json:"attachUrl"` - VNCURL string `json:"vncUrl"` - LastEvent string `json:"lastEvent"` - LogArchive logArchive `json:"logArchive"` -} - -func legacyProviderCleanupMayBeRequired(session interactiveSession) bool { - if session.Adapter == "runtime-v1" || session.Runtime == "github_actions" { - return false - } - switch session.Status { - case "stopping", "stopped", "expired": - return true - default: - return false - } -} - -func lifecycleStopNote(session interactiveSession) string { - if session.Runtime == "github_actions" && session.Status == "stopped" { - return "GitHub Actions workflow run was not canceled and may continue on GitHub" - } - if legacyProviderCleanupMayBeRequired(session) { - return "provider deletion was not confirmed; legacy runtimes may require separate cleanup" - } - return "" -} - -type sessionCapabilities struct { - Terminal bool `json:"terminal"` -} - -type createSessionRequest struct { - Repo string `json:"repo,omitempty"` - Branch string `json:"branch,omitempty"` - Runtime string `json:"runtime,omitempty"` - Profile string `json:"profile,omitempty"` - Command string `json:"command,omitempty"` - Prompt string `json:"prompt,omitempty"` - ParentSessionID string `json:"parentSessionId,omitempty"` - RootSessionID string `json:"rootSessionId,omitempty"` - Purpose string `json:"purpose,omitempty"` - Summary string `json:"summary,omitempty"` -} - -type createSessionResponse struct { - Session interactiveSession `json:"session"` -} - -type sessionResponse struct { - Session interactiveSession `json:"session"` -} - -type checkpointResponse struct { - Session interactiveSession `json:"session"` - Checkpoint checkpoint `json:"checkpoint"` -} - -type checkpointsResponse struct { - Session interactiveSession `json:"session"` - Checkpoints []checkpoint `json:"checkpoints"` -} - -type actionResponse struct { - Session interactiveSession `json:"session"` -} - -type sessionLogResponse struct { - Session interactiveSession `json:"session"` - Events []sessionLogEvent `json:"events"` - Archive logArchive `json:"archive"` -} - -type sessionLogEvent struct { - Actor string `json:"actor"` - Message string `json:"message"` - CreatedAt int64 `json:"createdAt"` -} - -type logArchive struct { - SessionID string `json:"sessionId"` - EventCount int `json:"eventCount"` - EventsKey string `json:"eventsKey"` - TranscriptKey string `json:"transcriptKey"` - SummaryKey string `json:"summaryKey"` - ArchivedAt int64 `json:"archivedAt"` - UpdatedAt int64 `json:"updatedAt"` -} - -type checkpoint struct { - ID string `json:"id"` - Name string `json:"name"` - SessionID string `json:"sessionId"` - Workdir string `json:"workdir"` - CreatedAt int64 `json:"createdAt"` -} - func main() { var app cli ctx := kong.Parse( @@ -277,18 +140,15 @@ func main() { ctx.FatalIfErrorf(err) } -func (c *cli) apiClient() *apiClient { - return &apiClient{ - baseURL: strings.TrimRight(c.API, "/"), - token: c.Token, - fingerprint: c.Fingerprint, - agentToken: c.AgentToken, - agentSessionID: c.AgentID, - http: &http.Client{Timeout: 2 * time.Minute}, +func (c *cli) apiClient() *fleetapi.Client { + auth := fleetapi.SSHAuth(c.Token, c.Fingerprint) + if c.Token == "" || c.Fingerprint == "" { + auth = fleetapi.AgentAuth(c.AgentToken, c.AgentID) } + return fleetapi.NewClient(c.API, &http.Client{Timeout: 2 * time.Minute}, auth) } -func (loginCmd) Run(app *cli, _ *apiClient) error { +func (loginCmd) Run(app *cli, _ *fleetapi.Client) error { if app.JSON { return json.NewEncoder(os.Stdout).Encode(map[string]string{ "ssh": fmt.Sprintf("ssh link@%s", app.SSHHost), @@ -299,8 +159,8 @@ func (loginCmd) Run(app *cli, _ *apiClient) error { return nil } -func (whoamiCmd) Run(app *cli, api *apiClient) error { - state, err := api.state(context.Background()) +func (whoamiCmd) Run(app *cli, api *fleetapi.Client) error { + state, err := api.State(context.Background()) if err != nil { if app.NoInput || app.JSON { return err @@ -310,12 +170,12 @@ func (whoamiCmd) Run(app *cli, api *apiClient) error { if app.JSON { return json.NewEncoder(os.Stdout).Encode(state.User) } - fmt.Fprintf(os.Stdout, "login: %s\nrole: %s\n", displayUser(state.User), state.User.Role) + fmt.Fprintf(os.Stdout, "login: %s\nrole: %s\n", fleettext.DisplayUser(state.User), state.User.Role) return nil } -func (listCmd) Run(app *cli, api *apiClient) error { - state, err := api.state(context.Background()) +func (listCmd) Run(app *cli, api *fleetapi.Client) error { + state, err := api.State(context.Background()) if err != nil { if app.NoInput || app.JSON { return err @@ -325,13 +185,15 @@ func (listCmd) Run(app *cli, api *apiClient) error { if app.JSON { return json.NewEncoder(os.Stdout).Encode(state) } - printFleet(os.Stdout, state.InteractiveSessions) + if !fleettext.WriteSessionGroups(os.Stdout, state.InteractiveSessions, "") { + fmt.Fprintln(os.Stdout, "crabboxes: none") + } return nil } -func (cmd newCmd) Run(app *cli, api *apiClient) error { +func (cmd newCmd) Run(app *cli, api *fleetapi.Client) error { req := cmd.sessionRequest(app) - session, err := api.createSession(context.Background(), req) + session, err := api.CreateSession(context.Background(), req) if err != nil { if app.NoInput || app.JSON { return err @@ -357,15 +219,15 @@ func (cmd newCmd) Run(app *cli, api *apiClient) error { } fmt.Fprintf(os.Stdout, "session: %s\nrepo: %s\nstatus: %s\n", session.ID, session.Repo, session.Status) if session.ParentSessionID != "" { - fmt.Fprintf(os.Stdout, "parent: %s\n", terminalSafe(session.ParentSessionID)) + fmt.Fprintf(os.Stdout, "parent: %s\n", fleettext.Safe(session.ParentSessionID)) } if session.RootSessionID != "" && session.RootSessionID != session.ID { - fmt.Fprintf(os.Stdout, "root: %s\n", terminalSafe(session.RootSessionID)) + fmt.Fprintf(os.Stdout, "root: %s\n", fleettext.Safe(session.RootSessionID)) } if session.Summary != "" { - fmt.Fprintf(os.Stdout, "summary: %s\n", terminalSafe(session.Summary)) + fmt.Fprintf(os.Stdout, "summary: %s\n", fleettext.Safe(session.Summary)) } - if attachable(session) { + if session.Attachable() { fmt.Fprintf(os.Stdout, "attach: crabfleet attach %s\n", session.ID) } if session.VNCURL != "" { @@ -374,13 +236,13 @@ func (cmd newCmd) Run(app *cli, api *apiClient) error { if cmd.VNC && session.VNCURL != "" { return openURL(session.VNCURL) } - if !cmd.Detach && !app.NoInput && isTerminal(os.Stdin) && isTerminal(os.Stdout) && attachable(session) { + if !cmd.Detach && !app.NoInput && isTerminal(os.Stdin) && isTerminal(os.Stdout) && session.Attachable() { return runSSH(app, "attach", session.ID) } return nil } -func (cmd newCmd) sessionRequest(app *cli) createSessionRequest { +func (cmd newCmd) sessionRequest(app *cli) fleetapi.CreateSessionRequest { prompt := strings.Join(cmd.Prompt, " ") parent := cmd.Parent if parent == "" { @@ -394,7 +256,7 @@ func (cmd newCmd) sessionRequest(app *cli) createSessionRequest { if cmd.Runtime != nil { runtime = *cmd.Runtime } - return createSessionRequest{ + return fleetapi.CreateSessionRequest{ Repo: cmd.Repo, Branch: cmd.Branch, Runtime: runtime, @@ -408,7 +270,7 @@ func (cmd newCmd) sessionRequest(app *cli) createSessionRequest { } } -func (cmd newCmd) sshCreateArgs(req createSessionRequest) []string { +func (cmd newCmd) sshCreateArgs(req fleetapi.CreateSessionRequest) []string { args := []string{"new", "--branch", req.Branch} if req.Runtime != "" { args = append(args, "--runtime", req.Runtime) @@ -446,12 +308,12 @@ func (cmd newCmd) sshCreateArgs(req createSessionRequest) []string { return args } -func (cmd attachCmd) Run(app *cli, _ *apiClient) error { +func (cmd attachCmd) Run(app *cli, _ *fleetapi.Client) error { return runSSH(app, "attach", cmd.ID) } -func (cmd statusCmd) Run(app *cli, api *apiClient) error { - session, err := api.session(context.Background(), cmd.ID) +func (cmd statusCmd) Run(app *cli, api *fleetapi.Client) error { + session, err := api.Session(context.Background(), cmd.ID) if err != nil { if app.NoInput || app.JSON { return err @@ -461,12 +323,12 @@ func (cmd statusCmd) Run(app *cli, api *apiClient) error { if app.JSON { return json.NewEncoder(os.Stdout).Encode(session) } - printSessionStatus(os.Stdout, session) + fleettext.WriteSessionStatus(os.Stdout, session) return nil } -func (cmd deleteCmd) Run(app *cli, api *apiClient) error { - session, err := api.action(context.Background(), cmd.ID, "stop") +func (cmd deleteCmd) Run(app *cli, api *fleetapi.Client) error { + session, err := api.Action(context.Background(), cmd.ID, "stop") if err != nil { if app.NoInput || app.JSON { return err @@ -477,28 +339,28 @@ func (cmd deleteCmd) Run(app *cli, api *apiClient) error { return json.NewEncoder(os.Stdout).Encode(session) } fmt.Fprintf(os.Stdout, "session: %s\nstatus: %s\n", session.ID, session.Status) - if note := lifecycleStopNote(session); note != "" { + if note := session.LifecycleStopNote(); note != "" { fmt.Fprintf(os.Stdout, "note: %s\n", note) } return nil } -func (doctorCmd) Run(app *cli, api *apiClient) error { +func (doctorCmd) Run(app *cli, api *fleetapi.Client) error { result := map[string]string{ "api": "unknown", "auth": "unknown", } - if err := api.health(context.Background()); err != nil { + if err := api.Health(context.Background()); err != nil { result["api"] = "failed: " + err.Error() } else { result["api"] = "ok" } - state, err := api.state(context.Background()) + state, err := api.State(context.Background()) if err != nil { result["auth"] = "failed: " + err.Error() } else { result["auth"] = "ok" - result["user"] = displayUser(state.User) + result["user"] = fleettext.DisplayUser(state.User) result["role"] = state.User.Role result["sessions"] = fmt.Sprintf("%d", len(state.InteractiveSessions)) } @@ -514,8 +376,8 @@ func (doctorCmd) Run(app *cli, api *apiClient) error { return nil } -func (cmd checkpointsCmd) Run(app *cli, api *apiClient) error { - checkpoints, err := api.checkpoints(context.Background(), cmd.ID) +func (cmd checkpointsCmd) Run(app *cli, api *fleetapi.Client) error { + checkpoints, err := api.Checkpoints(context.Background(), cmd.ID) if err != nil { if app.NoInput || app.JSON { return err @@ -542,8 +404,8 @@ func (cmd checkpointsCmd) Run(app *cli, api *apiClient) error { return nil } -func (cmd checkpointCmd) Run(app *cli, api *apiClient) error { - checkpoint, err := api.checkpoint(context.Background(), cmd.ID) +func (cmd checkpointCmd) Run(app *cli, api *fleetapi.Client) error { + checkpoint, err := api.Checkpoint(context.Background(), cmd.ID) if err != nil { if app.NoInput || app.JSON { return err @@ -557,8 +419,8 @@ func (cmd checkpointCmd) Run(app *cli, api *apiClient) error { return nil } -func (cmd restoreCmd) Run(app *cli, api *apiClient) error { - checkpoint, err := api.restore(context.Background(), cmd.ID, cmd.Checkpoint) +func (cmd restoreCmd) Run(app *cli, api *fleetapi.Client) error { + checkpoint, err := api.Restore(context.Background(), cmd.ID, cmd.Checkpoint) if err != nil { if app.NoInput || app.JSON { return err @@ -572,8 +434,8 @@ func (cmd restoreCmd) Run(app *cli, api *apiClient) error { return nil } -func (cmd vncCmd) Run(app *cli, api *apiClient) error { - state, err := api.state(context.Background()) +func (cmd vncCmd) Run(app *cli, api *fleetapi.Client) error { + state, err := api.State(context.Background()) if err != nil { if app.NoInput || app.JSON { return err @@ -607,8 +469,8 @@ func (cmd vncCmd) Run(app *cli, api *apiClient) error { return fmt.Errorf("session %s not found", cmd.ID) } -func (cmd logsCmd) Run(app *cli, api *apiClient) error { - logs, err := api.logs(context.Background(), cmd.ID) +func (cmd logsCmd) Run(app *cli, api *fleetapi.Client) error { + logs, err := api.Logs(context.Background(), cmd.ID) if err != nil { if app.NoInput || app.JSON { return err @@ -618,12 +480,12 @@ func (cmd logsCmd) Run(app *cli, api *apiClient) error { if app.JSON { return json.NewEncoder(os.Stdout).Encode(logs) } - printSessionLogs(os.Stdout, logs) + fleettext.WriteSessionLogs(os.Stdout, logs) return nil } -func (cmd transcriptCmd) Run(app *cli, api *apiClient) error { - transcript, err := api.transcript(context.Background(), cmd.ID) +func (cmd transcriptCmd) Run(app *cli, api *fleetapi.Client) error { + transcript, err := api.Transcript(context.Background(), cmd.ID) if err != nil { if app.NoInput || app.JSON { return err @@ -643,7 +505,7 @@ func (cmd transcriptCmd) Run(app *cli, api *apiClient) error { return nil } -func (cmd messageCmd) Run(app *cli, api *apiClient) error { +func (cmd messageCmd) Run(app *cli, api *fleetapi.Client) error { message := strings.Join(cmd.Text, " ") if message == "" && !isTerminal(os.Stdin) { data, err := io.ReadAll(io.LimitReader(os.Stdin, 64*1024)) @@ -655,7 +517,7 @@ func (cmd messageCmd) Run(app *cli, api *apiClient) error { if message == "" { return errors.New("message text is required") } - if err := api.message(context.Background(), cmd.ID, message, !cmd.NoEnter); err != nil { + if err := api.Message(context.Background(), cmd.ID, message, !cmd.NoEnter, 120, 34); err != nil { if app.NoInput || app.JSON { return err } @@ -672,14 +534,14 @@ func (cmd messageCmd) Run(app *cli, api *apiClient) error { "sent": true, }) } - fmt.Fprintf(os.Stdout, "sent: %s\n", terminalSafe(cmd.ID)) + fmt.Fprintf(os.Stdout, "sent: %s\n", fleettext.Safe(cmd.ID)) return nil } -func (cmd summaryCmd) Run(app *cli, api *apiClient) error { +func (cmd summaryCmd) Run(app *cli, api *fleetapi.Client) error { summary := strings.Join(cmd.Text, " ") if summary == "" && cmd.Purpose == "" { - session, err := api.session(context.Background(), cmd.ID) + session, err := api.Session(context.Background(), cmd.ID) if err != nil { if app.NoInput || app.JSON { return err @@ -689,10 +551,10 @@ func (cmd summaryCmd) Run(app *cli, api *apiClient) error { if app.JSON { return json.NewEncoder(os.Stdout).Encode(session) } - printSessionSummary(os.Stdout, session) + fleettext.WriteSessionSummary(os.Stdout, session) return nil } - session, err := api.updateSummary(context.Background(), cmd.ID, summary, cmd.Purpose) + session, err := api.UpdateSummary(context.Background(), cmd.ID, summary, cmd.Purpose) if err != nil { if app.NoInput || app.JSON { return err @@ -709,370 +571,14 @@ func (cmd summaryCmd) Run(app *cli, api *apiClient) error { if app.JSON { return json.NewEncoder(os.Stdout).Encode(session) } - printSessionSummary(os.Stdout, session) + fleettext.WriteSessionSummary(os.Stdout, session) return nil } -func (openCmd) Run(app *cli, _ *apiClient) error { +func (openCmd) Run(app *cli, _ *fleetapi.Client) error { return openURL(app.API + "/app/") } -func (c *apiClient) state(ctx context.Context) (stateResponse, error) { - var out stateResponse - err := c.do(ctx, http.MethodGet, "/api/ssh/state", nil, &out) - return out, err -} - -func (c *apiClient) health(ctx context.Context) error { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/healthz", nil) - if err != nil { - return err - } - resp, err := c.http.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - return fmt.Errorf("api %s", resp.Status) - } - return nil -} - -func (c *apiClient) session(ctx context.Context, id string) (interactiveSession, error) { - var out sessionResponse - err := c.do(ctx, http.MethodGet, "/api/ssh/interactive-sessions/"+url.PathEscape(id), nil, &out) - return out.Session, err -} - -func (c *apiClient) createSession(ctx context.Context, req createSessionRequest) (interactiveSession, error) { - var out createSessionResponse - err := c.do(ctx, http.MethodPost, "/api/ssh/interactive-sessions", req, &out) - return out.Session, err -} - -func (c *apiClient) action(ctx context.Context, id string, action string) (interactiveSession, error) { - var out actionResponse - err := c.do( - ctx, - http.MethodPost, - "/api/ssh/interactive-sessions/"+url.PathEscape(id)+"/actions", - map[string]string{"action": action}, - &out, - ) - return out.Session, err -} - -func (c *apiClient) checkpoints(ctx context.Context, id string) (checkpointsResponse, error) { - var out checkpointsResponse - err := c.do(ctx, http.MethodGet, "/api/ssh/interactive-sessions/"+url.PathEscape(id)+"/checkpoints", nil, &out) - return out, err -} - -func (c *apiClient) checkpoint(ctx context.Context, id string) (checkpointResponse, error) { - var out checkpointResponse - err := c.do(ctx, http.MethodPost, "/api/ssh/interactive-sessions/"+url.PathEscape(id)+"/checkpoints", nil, &out) - return out, err -} - -func (c *apiClient) restore(ctx context.Context, id string, checkpoint string) (checkpointResponse, error) { - var out checkpointResponse - err := c.do( - ctx, - http.MethodPost, - "/api/ssh/interactive-sessions/"+url.PathEscape(id)+"/checkpoints/"+url.PathEscape(checkpoint)+"/restore", - nil, - &out, - ) - return out, err -} - -func (c *apiClient) logs(ctx context.Context, id string) (sessionLogResponse, error) { - var out sessionLogResponse - err := c.do(ctx, http.MethodGet, "/api/ssh/interactive-sessions/"+url.PathEscape(id)+"/logs", nil, &out) - return out, err -} - -func (c *apiClient) transcript(ctx context.Context, id string) (string, error) { - return c.text(ctx, http.MethodGet, "/api/ssh/interactive-sessions/"+url.PathEscape(id)+"/transcript", nil) -} - -func (c *apiClient) message(ctx context.Context, id string, message string, enter bool) error { - _, authMode, err := c.authenticatedPath("/api/terminal/ws") - if err != nil { - return err - } - endpoint, err := terminalws.Endpoint(c.baseURL) - if err != nil { - return err - } - - headers := http.Header{} - c.setAuthHeaders(headers, authMode) - client, err := terminalws.Dial(ctx, endpoint, id, terminalws.Options{ - Header: headers, - Cols: 120, - Rows: 34, - }) - if err != nil { - return err - } - defer client.Close() - payload := message - if enter { - payload += "\n" - } - return client.SendInput(ctx, []byte(payload)) -} - -func (c *apiClient) updateSummary(ctx context.Context, id string, summary string, purpose string) (interactiveSession, error) { - var out sessionResponse - err := c.do(ctx, http.MethodPost, "/api/ssh/interactive-sessions/"+url.PathEscape(id)+"/summary", map[string]string{ - "summary": summary, - "purpose": purpose, - }, &out) - return out.Session, err -} - -func (c *apiClient) do(ctx context.Context, method string, path string, body any, out any) error { - resp, err := c.request(ctx, method, path, body, "application/json") - if err != nil { - return err - } - defer resp.Body.Close() - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - text, _ := io.ReadAll(io.LimitReader(resp.Body, 512)) - return fmt.Errorf("api %s: %s", resp.Status, strings.TrimSpace(string(text))) - } - if out == nil { - return nil - } - return json.NewDecoder(resp.Body).Decode(out) -} - -func (c *apiClient) text(ctx context.Context, method string, path string, body any) (string, error) { - resp, err := c.request(ctx, method, path, body, "text/markdown") - if err != nil { - return "", err - } - defer resp.Body.Close() - data, readErr := io.ReadAll(io.LimitReader(resp.Body, 4*1024*1024)) - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - if readErr != nil { - return "", readErr - } - return "", fmt.Errorf("api %s: %s", resp.Status, strings.TrimSpace(string(data))) - } - if readErr != nil { - return "", readErr - } - return string(data), nil -} - -func (c *apiClient) request(ctx context.Context, method string, path string, body any, accept string) (*http.Response, error) { - apiPath, authMode, err := c.authenticatedPath(path) - if err != nil { - return nil, err - } - var reader io.Reader - if body != nil { - payload, err := json.Marshal(body) - if err != nil { - return nil, err - } - reader = bytes.NewReader(payload) - } - req, err := http.NewRequestWithContext(ctx, method, c.baseURL+apiPath, reader) - if err != nil { - return nil, err - } - req.Header.Set("Accept", accept) - c.setAuthHeaders(req.Header, authMode) - if body != nil { - req.Header.Set("Content-Type", "application/json") - } - return c.http.Do(req) -} - -func (c *apiClient) setAuthHeaders(headers http.Header, authMode string) { - if authMode == "ssh" { - headers.Set("Authorization", "Bearer "+c.token) - headers.Set("X-Crabfleet-SSH-Fingerprint", c.fingerprint) - return - } - headers.Set("Authorization", "Bearer "+c.agentToken) - headers.Set("X-Crabfleet-Session-ID", c.agentSessionID) -} - -func (c *apiClient) authenticatedPath(path string) (string, string, error) { - if c.token != "" && c.fingerprint != "" { - return path, "ssh", nil - } - if c.agentToken != "" && c.agentSessionID != "" { - return strings.Replace(path, "/api/ssh/", "/api/agent/", 1), "agent", nil - } - return "", "", errors.New("API mode requires CRABFLEET_SSH_GATEWAY_TOKEN + CRABFLEET_SSH_FINGERPRINT or CRABFLEET_AGENT_TOKEN + CRABFLEET_SESSION_ID") -} - -func printFleet(out io.Writer, sessions []interactiveSession) { - groups := map[string][]interactiveSession{} - for _, session := range sessions { - owner := session.Owner - if owner == "" { - owner = "unassigned" - } - groups[owner] = append(groups[owner], session) - } - owners := make([]string, 0, len(groups)) - for owner := range groups { - owners = append(owners, owner) - } - sort.Strings(owners) - if len(owners) == 0 { - fmt.Fprintln(out, "crabboxes: none") - return - } - for _, owner := range owners { - fmt.Fprintf(out, "%s:\n", terminalSafe(owner)) - printSessionTree(out, groups[owner], " ") - } -} - -func printSessionTree(out io.Writer, sessions []interactiveSession, indent string) { - byParent := map[string][]interactiveSession{} - known := map[string]bool{} - seen := map[string]bool{} - for _, session := range sessions { - known[session.ID] = true - byParent[session.ParentSessionID] = append(byParent[session.ParentSessionID], session) - } - for parent := range byParent { - sort.SliceStable(byParent[parent], func(i, j int) bool { - return byParent[parent][i].ID < byParent[parent][j].ID - }) - } - var roots []interactiveSession - for _, session := range sessions { - if session.ParentSessionID == "" || !known[session.ParentSessionID] { - roots = append(roots, session) - } - } - sort.SliceStable(roots, func(i, j int) bool { - return roots[i].ID < roots[j].ID - }) - var walk func(interactiveSession, string) - walk = func(session interactiveSession, prefix string) { - if seen[session.ID] { - return - } - seen[session.ID] = true - fmt.Fprintf(out, "%s%s\n", prefix, sessionLine(session)) - for _, child := range byParent[session.ID] { - walk(child, prefix+" ") - } - } - for _, root := range roots { - walk(root, indent) - } - for _, session := range sessions { - if !seen[session.ID] { - walk(session, indent) - } - } -} - -func sessionLine(session interactiveSession) string { - parts := []string{ - terminalSafe(session.ID), - terminalSafe(session.Status), - terminalSafe(session.Runtime), - terminalSafe(session.Repo), - } - if summary := sessionSummary(session); summary != "" { - parts = append(parts, "- "+terminalSafe(summary)) - } - return strings.Join(parts, " ") -} - -func sessionSummary(session interactiveSession) string { - if session.Summary != "" { - return session.Summary - } - if session.Purpose != "" { - return session.Purpose - } - return session.LastEvent -} - -func printSessionLogs(out io.Writer, logs sessionLogResponse) { - fmt.Fprintf( - out, - "session: %s\nrepo: %s\nstatus: %s\n", - terminalSafe(logs.Session.ID), - terminalSafe(logs.Session.Repo), - terminalSafe(logs.Session.Status), - ) - if logs.Archive.EventCount > 0 { - fmt.Fprintf(out, "archive: %d events\n", logs.Archive.EventCount) - } - for _, event := range logs.Events { - timestamp := time.UnixMilli(event.CreatedAt).Format("15:04:05") - fmt.Fprintf( - out, - "%s %s %s\n", - timestamp, - terminalSafe(event.Actor), - terminalSafe(event.Message), - ) - } -} - -func printSessionStatus(out io.Writer, session interactiveSession) { - fmt.Fprintf(out, "session: %s\n", terminalSafe(session.ID)) - fmt.Fprintf(out, "repo: %s\n", terminalSafe(session.Repo)) - fmt.Fprintf(out, "branch: %s\n", terminalSafe(session.Branch)) - fmt.Fprintf(out, "runtime: %s\n", terminalSafe(session.Runtime)) - fmt.Fprintf(out, "status: %s\n", terminalSafe(session.Status)) - fmt.Fprintf(out, "owner: %s\n", terminalSafe(session.Owner)) - if session.LeaseID != "" { - fmt.Fprintf(out, "lease: %s\n", terminalSafe(session.LeaseID)) - } - if session.ParentSessionID != "" { - fmt.Fprintf(out, "parent: %s\n", terminalSafe(session.ParentSessionID)) - } - if session.RootSessionID != "" { - fmt.Fprintf(out, "root: %s\n", terminalSafe(session.RootSessionID)) - } - if session.CreatedBy != "" { - fmt.Fprintf(out, "created-by: %s\n", terminalSafe(session.CreatedBy)) - } - if session.Purpose != "" { - fmt.Fprintf(out, "purpose: %s\n", terminalSafe(session.Purpose)) - } - if session.Summary != "" { - fmt.Fprintf(out, "summary: %s\n", terminalSafe(session.Summary)) - } - if session.AttachURL != "" { - fmt.Fprintf(out, "attach: %s\n", terminalSafe(session.AttachURL)) - } - if session.VNCURL != "" { - fmt.Fprintf(out, "vnc: %s\n", terminalSafe(session.VNCURL)) - } - if session.LastEvent != "" { - fmt.Fprintf(out, "event: %s\n", terminalSafe(session.LastEvent)) - } -} - -func printSessionSummary(out io.Writer, session interactiveSession) { - fmt.Fprintf(out, "session: %s\n", terminalSafe(session.ID)) - if session.Purpose != "" { - fmt.Fprintf(out, "purpose: %s\n", terminalSafe(session.Purpose)) - } - if session.Summary != "" { - fmt.Fprintf(out, "summary: %s\n", terminalSafe(session.Summary)) - } -} - func runSSH(app *cli, args ...string) error { sshArgs := append([]string{app.SSHHost}, args...) cmd := exec.Command("ssh", sshArgs...) @@ -1153,55 +659,7 @@ func vncURLFromOutput(output string) string { return "" } -func terminalSafe(value string) string { - return strings.Map(func(r rune) rune { - if r == '\n' || r == '\r' || r == '\t' { - return ' ' - } - if r < 0x20 || r == 0x7f || (r >= 0x80 && r <= 0x9f) { - return -1 - } - return r - }, value) -} - -func attachable(session interactiveSession) bool { - if !terminalCapable(session) { - return false - } - if !ptyAttachable(session) { - return false - } - switch session.Status { - case "ready", "attached", "detached": - return true - default: - return false - } -} - -func terminalCapable(session interactiveSession) bool { - return session.Capabilities == nil || session.Capabilities.Terminal -} - -func ptyAttachable(session interactiveSession) bool { - return session.PtyAvailable -} - func isTerminal(file *os.File) bool { info, err := file.Stat() return err == nil && (info.Mode()&os.ModeCharDevice) != 0 } - -func displayUser(u user) string { - if u.Login != "" { - return "@" + u.Login - } - if u.Email != "" { - return u.Email - } - if u.Subject != "" { - return u.Subject - } - return "unknown" -} diff --git a/cmd/crabfleet/main_test.go b/cmd/crabfleet/main_test.go index bbcdf8a..8d3874b 100644 --- a/cmd/crabfleet/main_test.go +++ b/cmd/crabfleet/main_test.go @@ -5,7 +5,6 @@ import ( "encoding/json" "net/http" "net/http/httptest" - "strings" "testing" "github.com/alecthomas/kong" @@ -185,88 +184,6 @@ func TestCLIUsesDeleteCanonicalNameWithStopAlias(t *testing.T) { } } -func TestLegacyProviderCleanupWarningRequiresConfirmedLegacyStop(t *testing.T) { - if !legacyProviderCleanupMayBeRequired(interactiveSession{Status: "stopped"}) { - t.Fatal("confirmed legacy stop should retain the cleanup warning") - } - for _, session := range []interactiveSession{ - {Status: "failed"}, - {Status: "stopped", Adapter: "runtime-v1"}, - {Status: "stopped", Runtime: "github_actions"}, - } { - if legacyProviderCleanupMayBeRequired(session) { - t.Fatalf("session %#v must not recommend provider cleanup", session) - } - } - if got := lifecycleStopNote(interactiveSession{Status: "stopped", Runtime: "github_actions"}); !strings.Contains(got, "not canceled") { - t.Fatalf("GitHub Actions note = %q", got) - } - if got := lifecycleStopNote(interactiveSession{Status: "failed"}); got != "" { - t.Fatalf("failed unowned workspace note = %q", got) - } -} - -func TestAttachableRequiresAuthoritativePTYAvailability(t *testing.T) { - if !attachable(interactiveSession{Status: "ready", PtyAvailable: true}) { - t.Fatal("ready session with an available PTY should be attachable") - } - if attachable(interactiveSession{Status: "pending_adapter", LeaseID: "sandbox:test"}) { - t.Fatal("pending session should not be attachable") - } - if attachable(interactiveSession{Status: "ready", AttachURL: "/api/terminal/ws"}) { - t.Fatal("attach URL must not override missing PTY availability") - } - if attachable(interactiveSession{ - Status: "ready", - LeaseID: "sandbox:test", - PtyAvailable: true, - Capabilities: &sessionCapabilities{Terminal: false}, - }) { - t.Fatal("session with withdrawn terminal capability should not be attachable") - } - if attachable(interactiveSession{ - Status: "ready", - LeaseID: "sandbox:test", - PtyAvailable: false, - }) { - t.Fatal("server PTY availability should be authoritative") - } -} - -func TestPrintFleetShowsOwnerSessionTreeAndSummaries(t *testing.T) { - var out bytes.Buffer - printFleet(&out, []interactiveSession{ - { - ID: "IS-2", - Owner: "steipete", - Repo: "openclaw/crabfleet", - Runtime: "container", - Status: "ready", - Summary: "child mission", - ParentSessionID: "IS-1", - }, - { - ID: "IS-1", - Owner: "steipete", - Repo: "openclaw/crabfleet", - Runtime: "container", - Status: "ready", - Purpose: "root mission", - }, - }) - - text := out.String() - for _, want := range []string{ - "steipete:", - " IS-1 ready container openclaw/crabfleet - root mission", - " IS-2 ready container openclaw/crabfleet - child mission", - } { - if !strings.Contains(text, want) { - t.Fatalf("output missing %q:\n%s", want, text) - } - } -} - func splitForTest(command string) ([]string, error) { var args []string var current []rune diff --git a/internal/fleetapi/client.go b/internal/fleetapi/client.go new file mode 100644 index 0000000..bee2e49 --- /dev/null +++ b/internal/fleetapi/client.go @@ -0,0 +1,323 @@ +package fleetapi + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "strings" + + "github.com/openclaw/crabfleet/internal/terminalws" +) + +const maxResponseBytes = 4 * 1024 * 1024 +const maxErrorBytes = 512 + +type authMode uint8 + +const ( + authNone authMode = iota + authSSH + authAgent +) + +type Auth struct { + mode authMode + token string + principal string +} + +func SSHAuth(token string, fingerprint string) Auth { + return Auth{mode: authSSH, token: token, principal: fingerprint} +} + +func AgentAuth(token string, sessionID string) Auth { + return Auth{mode: authAgent, token: token, principal: sessionID} +} + +type Client struct { + baseURL string + auth Auth + http *http.Client +} + +func NewClient(baseURL string, httpClient *http.Client, auth Auth) *Client { + if httpClient == nil { + httpClient = http.DefaultClient + } + return &Client{ + baseURL: strings.TrimRight(baseURL, "/"), + auth: auth, + http: httpClient, + } +} + +func (c *Client) BaseURL() string { + return c.baseURL +} + +func (c *Client) Health(ctx context.Context) error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/healthz", nil) + if err != nil { + return err + } + resp, err := c.http.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("crabfleet API %s", resp.Status) + } + return nil +} + +func (c *Client) State(ctx context.Context) (State, error) { + var out State + err := c.doJSON(ctx, http.MethodGet, "/api/ssh/state", nil, &out) + return out, err +} + +func (c *Client) Session(ctx context.Context, id string) (Session, error) { + var out struct { + Session Session `json:"session"` + } + err := c.doJSON(ctx, http.MethodGet, sessionPath(id), nil, &out) + return out.Session, err +} + +func (c *Client) CreateSession(ctx context.Context, request CreateSessionRequest) (Session, error) { + var out struct { + Session Session `json:"session"` + } + err := c.doJSON(ctx, http.MethodPost, "/api/ssh/interactive-sessions", request, &out) + return out.Session, err +} + +func (c *Client) Action(ctx context.Context, id string, action string) (Session, error) { + var out struct { + Session Session `json:"session"` + } + err := c.doJSON( + ctx, + http.MethodPost, + sessionPath(id)+"/actions", + map[string]string{"action": action}, + &out, + ) + return out.Session, err +} + +func (c *Client) Checkpoints(ctx context.Context, id string) (CheckpointsResult, error) { + var out CheckpointsResult + err := c.doJSON(ctx, http.MethodGet, sessionPath(id)+"/checkpoints", nil, &out) + return out, err +} + +func (c *Client) Checkpoint(ctx context.Context, id string) (CheckpointResult, error) { + var out CheckpointResult + err := c.doJSON(ctx, http.MethodPost, sessionPath(id)+"/checkpoints", nil, &out) + return out, err +} + +func (c *Client) Restore(ctx context.Context, id string, checkpointID string) (CheckpointResult, error) { + var out CheckpointResult + err := c.doJSON( + ctx, + http.MethodPost, + sessionPath(id)+"/checkpoints/"+url.PathEscape(checkpointID)+"/restore", + nil, + &out, + ) + return out, err +} + +func (c *Client) Logs(ctx context.Context, id string) (SessionLogs, error) { + var out SessionLogs + err := c.doJSON(ctx, http.MethodGet, sessionPath(id)+"/logs", nil, &out) + return out, err +} + +func (c *Client) Transcript(ctx context.Context, id string) (string, error) { + resp, err := c.open(ctx, http.MethodGet, sessionPath(id)+"/transcript", nil, "text/markdown") + if err != nil { + return "", err + } + defer resp.Body.Close() + if err := responseError(resp); err != nil { + return "", err + } + data, err := io.ReadAll(io.LimitReader(resp.Body, maxResponseBytes)) + return string(data), err +} + +func (c *Client) UpdateSummary(ctx context.Context, id string, summary string, purpose string) (Session, error) { + var out struct { + Session Session `json:"session"` + } + err := c.doJSON(ctx, http.MethodPost, sessionPath(id)+"/summary", map[string]string{ + "summary": summary, + "purpose": purpose, + }, &out) + return out.Session, err +} + +func (c *Client) Message( + ctx context.Context, + id string, + message string, + enter bool, + cols uint32, + rows uint32, +) error { + client, err := c.terminal(ctx, id, cols, rows) + if err != nil { + return err + } + defer client.Close() + if enter { + message += "\n" + } + return client.SendInput(ctx, []byte(message)) +} + +func (c *Client) Attach( + ctx context.Context, + id string, + terminal io.ReadWriter, + cols uint32, + rows uint32, +) error { + client, err := c.terminal(ctx, id, cols, rows) + if err != nil { + return err + } + defer client.Close() + return client.Attach(ctx, terminal) +} + +func (c *Client) terminal(ctx context.Context, id string, cols uint32, rows uint32) (*terminalws.Client, error) { + endpoint, err := terminalws.Endpoint(c.baseURL) + if err != nil { + return nil, err + } + headers, err := c.auth.headers() + if err != nil { + return nil, err + } + return terminalws.Dial(ctx, endpoint, id, terminalws.Options{ + Header: headers, + Cols: cols, + Rows: rows, + }) +} + +func (c *Client) doJSON(ctx context.Context, method string, path string, body any, out any) error { + resp, err := c.open(ctx, method, path, body, "application/json") + if err != nil { + return err + } + defer resp.Body.Close() + if err := responseError(resp); err != nil { + return err + } + if out == nil { + return nil + } + return json.NewDecoder(resp.Body).Decode(out) +} + +func (c *Client) open( + ctx context.Context, + method string, + path string, + body any, + accept string, +) (*http.Response, error) { + apiPath, err := c.auth.path(path) + if err != nil { + return nil, err + } + var reader io.Reader + if body != nil { + payload, err := json.Marshal(body) + if err != nil { + return nil, err + } + reader = bytes.NewReader(payload) + } + req, err := http.NewRequestWithContext(ctx, method, c.baseURL+apiPath, reader) + if err != nil { + return nil, err + } + req.Header.Set("Accept", accept) + if err := c.auth.apply(req.Header); err != nil { + return nil, err + } + if body != nil { + req.Header.Set("Content-Type", "application/json") + } + resp, err := c.http.Do(req) + if err != nil { + return nil, err + } + return resp, nil +} + +func responseError(resp *http.Response) error { + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + data, _ := io.ReadAll(io.LimitReader(resp.Body, maxErrorBytes)) + return fmt.Errorf("crabfleet API %s: %s", resp.Status, strings.TrimSpace(string(data))) + } + return nil +} + +func (a Auth) path(path string) (string, error) { + if err := a.validate(); err != nil { + return "", err + } + if a.mode == authAgent { + return strings.Replace(path, "/api/ssh/", "/api/agent/", 1), nil + } + return path, nil +} + +func (a Auth) headers() (http.Header, error) { + headers := http.Header{} + if err := a.apply(headers); err != nil { + return nil, err + } + return headers, nil +} + +func (a Auth) apply(headers http.Header) error { + if err := a.validate(); err != nil { + return err + } + headers.Set("Authorization", "Bearer "+a.token) + switch a.mode { + case authSSH: + headers.Set("X-Crabfleet-SSH-Fingerprint", a.principal) + case authAgent: + headers.Set("X-Crabfleet-Session-ID", a.principal) + } + return nil +} + +func (a Auth) validate() error { + if a.mode == authSSH && a.token != "" && a.principal != "" { + return nil + } + if a.mode == authAgent && a.token != "" && a.principal != "" { + return nil + } + return errors.New("API mode requires SSH gateway token + fingerprint or agent token + session ID") +} + +func sessionPath(id string) string { + return "/api/ssh/interactive-sessions/" + url.PathEscape(id) +} diff --git a/internal/fleetapi/client_test.go b/internal/fleetapi/client_test.go new file mode 100644 index 0000000..7128920 --- /dev/null +++ b/internal/fleetapi/client_test.go @@ -0,0 +1,86 @@ +package fleetapi + +import ( + "context" + "net/http" + "net/http/httptest" + "strings" + "testing" +) + +func TestClientUsesSSHAuthentication(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/api/ssh/state" { + t.Errorf("path = %q", r.URL.Path) + } + if got := r.Header.Get("Authorization"); got != "Bearer gateway-token" { + t.Errorf("authorization = %q", got) + } + if got := r.Header.Get("X-Crabfleet-SSH-Fingerprint"); got != "SHA256:test" { + t.Errorf("fingerprint = %q", got) + } + _, _ = w.Write([]byte(`{"user":{"login":"operator"}}`)) + })) + defer server.Close() + + client := NewClient(server.URL, server.Client(), SSHAuth("gateway-token", "SHA256:test")) + state, err := client.State(context.Background()) + if err != nil { + t.Fatal(err) + } + if state.User.Login != "operator" { + t.Fatalf("login = %q", state.User.Login) + } +} + +func TestClientRoutesAgentAuthentication(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/api/agent/interactive-sessions/IS-7" { + t.Errorf("path = %q", r.URL.Path) + } + if got := r.Header.Get("Authorization"); got != "Bearer agent-token" { + t.Errorf("authorization = %q", got) + } + if got := r.Header.Get("X-Crabfleet-Session-ID"); got != "IS-parent" { + t.Errorf("session id = %q", got) + } + _, _ = w.Write([]byte(`{"session":{"id":"IS-7"}}`)) + })) + defer server.Close() + + client := NewClient(server.URL, server.Client(), AgentAuth("agent-token", "IS-parent")) + session, err := client.Session(context.Background(), "IS-7") + if err != nil { + t.Fatal(err) + } + if session.ID != "IS-7" { + t.Fatalf("session id = %q", session.ID) + } +} + +func TestClientRejectsIncompleteAuthentication(t *testing.T) { + client := NewClient("https://example.com", http.DefaultClient, SSHAuth("token", "")) + _, err := client.State(context.Background()) + if err == nil || !strings.Contains(err.Error(), "requires SSH gateway token") { + t.Fatalf("error = %v", err) + } +} + +func TestClientStreamsLargeJSONResponses(t *testing.T) { + largeLogin := strings.Repeat("a", maxResponseBytes+1) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, _ = w.Write([]byte(`{"user":{"login":"`)) + _, _ = w.Write([]byte(largeLogin)) + _, _ = w.Write([]byte(`"}}`)) + })) + defer server.Close() + + client := NewClient(server.URL, server.Client(), SSHAuth("gateway-token", "SHA256:test")) + state, err := client.State(context.Background()) + if err != nil { + t.Fatal(err) + } + if state.User.Login != largeLogin { + t.Fatalf("login length = %d, want %d", len(state.User.Login), len(largeLogin)) + } +} diff --git a/internal/fleetapi/types.go b/internal/fleetapi/types.go new file mode 100644 index 0000000..a06f836 --- /dev/null +++ b/internal/fleetapi/types.go @@ -0,0 +1,141 @@ +package fleetapi + +type User struct { + Login string `json:"login"` + Email string `json:"email"` + Subject string `json:"subject"` + Role string `json:"role"` +} + +type State struct { + User User `json:"user"` + Repos []string `json:"repos"` + InteractiveSessions []Session `json:"interactiveSessions"` + Cards []Card `json:"cards"` +} + +type Session struct { + ID string `json:"id"` + ParentSessionID string `json:"parentSessionId"` + RootSessionID string `json:"rootSessionId"` + Repo string `json:"repo"` + Branch string `json:"branch"` + Runtime string `json:"runtime"` + Adapter string `json:"adapter"` + Status string `json:"status"` + Owner string `json:"owner"` + CreatedBy string `json:"createdBy"` + Purpose string `json:"purpose"` + Summary string `json:"summary"` + Capabilities *SessionCapabilities `json:"capabilities"` + PtyAvailable bool `json:"ptyAvailable"` + LeaseID string `json:"leaseId"` + AttachURL string `json:"attachUrl"` + VNCURL string `json:"vncUrl"` + LastEvent string `json:"lastEvent"` + LogArchive LogArchive `json:"logArchive"` +} + +type SessionCapabilities struct { + Terminal bool `json:"terminal"` +} + +type Card struct { + ID string `json:"id"` + Title string `json:"title"` + Repo string `json:"repo"` + Lane string `json:"lane"` + LastEvent string `json:"lastEvent"` +} + +type CreateSessionRequest struct { + Repo string `json:"repo,omitempty"` + Branch string `json:"branch,omitempty"` + Runtime string `json:"runtime,omitempty"` + Profile string `json:"profile,omitempty"` + Command string `json:"command,omitempty"` + Prompt string `json:"prompt,omitempty"` + ParentSessionID string `json:"parentSessionId,omitempty"` + RootSessionID string `json:"rootSessionId,omitempty"` + Purpose string `json:"purpose,omitempty"` + Summary string `json:"summary,omitempty"` +} + +type CheckpointResult struct { + Session Session `json:"session"` + Checkpoint Checkpoint `json:"checkpoint"` +} + +type CheckpointsResult struct { + Session Session `json:"session"` + Checkpoints []Checkpoint `json:"checkpoints"` +} + +type SessionLogs struct { + Session Session `json:"session"` + Events []LogEvent `json:"events"` + Archive LogArchive `json:"archive"` +} + +type LogEvent struct { + Actor string `json:"actor"` + Message string `json:"message"` + CreatedAt int64 `json:"createdAt"` +} + +type LogArchive struct { + SessionID string `json:"sessionId"` + EventCount int `json:"eventCount"` + EventsKey string `json:"eventsKey"` + TranscriptKey string `json:"transcriptKey"` + SummaryKey string `json:"summaryKey"` + ArchivedAt int64 `json:"archivedAt"` + UpdatedAt int64 `json:"updatedAt"` +} + +type Checkpoint struct { + ID string `json:"id"` + Name string `json:"name"` + SessionID string `json:"sessionId"` + Workdir string `json:"workdir"` + CreatedAt int64 `json:"createdAt"` +} + +func (s Session) TerminalCapable() bool { + return s.Capabilities == nil || s.Capabilities.Terminal +} + +func (s Session) Attachable() bool { + if !s.TerminalCapable() || !s.PtyAvailable { + return false + } + switch s.Status { + case "ready", "attached", "detached": + return true + default: + return false + } +} + +func (s Session) LifecycleStopNote() string { + if s.Runtime == "github_actions" && s.Status == "stopped" { + return "GitHub Actions workflow run was not canceled and may continue on GitHub" + } + if s.Adapter != "runtime-v1" && s.Runtime != "github_actions" { + switch s.Status { + case "stopping", "stopped", "expired": + return "provider deletion was not confirmed; legacy runtimes may require separate cleanup" + } + } + return "" +} + +func (s Session) SummaryText() string { + if s.Summary != "" { + return s.Summary + } + if s.Purpose != "" { + return s.Purpose + } + return s.LastEvent +} diff --git a/internal/fleetapi/types_test.go b/internal/fleetapi/types_test.go new file mode 100644 index 0000000..dd04248 --- /dev/null +++ b/internal/fleetapi/types_test.go @@ -0,0 +1,57 @@ +package fleetapi + +import ( + "strings" + "testing" +) + +func TestSessionAttachableRequiresAuthoritativePTYAvailability(t *testing.T) { + if !(Session{Status: "ready", PtyAvailable: true}).Attachable() { + t.Fatal("ready session with an available PTY should be attachable") + } + for _, session := range []Session{ + {Status: "provisioning", PtyAvailable: true}, + {Status: "ready", AttachURL: "/api/terminal/ws"}, + { + Status: "ready", + PtyAvailable: true, + Capabilities: &SessionCapabilities{Terminal: false}, + }, + } { + if session.Attachable() { + t.Fatalf("session %#v must not be attachable", session) + } + } +} + +func TestSessionLifecycleStopNote(t *testing.T) { + if got := (Session{Status: "stopped"}).LifecycleStopNote(); !strings.Contains(got, "provider deletion") { + t.Fatalf("legacy stop note = %q", got) + } + for _, session := range []Session{ + {Status: "failed"}, + {Status: "stopped", Adapter: "runtime-v1"}, + } { + if got := session.LifecycleStopNote(); got != "" { + t.Fatalf("session %#v note = %q", session, got) + } + } + if got := (Session{Status: "stopped", Runtime: "github_actions"}).LifecycleStopNote(); !strings.Contains(got, "not canceled") { + t.Fatalf("GitHub Actions note = %q", got) + } +} + +func TestSessionSummaryTextPrefersSummaryThenPurpose(t *testing.T) { + session := Session{Summary: "summary", Purpose: "purpose", LastEvent: "event"} + if got := session.SummaryText(); got != "summary" { + t.Fatalf("summary = %q", got) + } + session.Summary = "" + if got := session.SummaryText(); got != "purpose" { + t.Fatalf("purpose = %q", got) + } + session.Purpose = "" + if got := session.SummaryText(); got != "event" { + t.Fatalf("event = %q", got) + } +} diff --git a/internal/fleettext/text.go b/internal/fleettext/text.go new file mode 100644 index 0000000..1f0eab3 --- /dev/null +++ b/internal/fleettext/text.go @@ -0,0 +1,199 @@ +package fleettext + +import ( + "fmt" + "io" + "sort" + "strings" + "time" + + "github.com/openclaw/crabfleet/internal/fleetapi" +) + +func Safe(value string) string { + return strings.Map(func(r rune) rune { + if r == '\n' || r == '\r' || r == '\t' { + return ' ' + } + if r < 0x20 || r == 0x7f || (r >= 0x80 && r <= 0x9f) { + return -1 + } + return r + }, value) +} + +func DisplayUser(user fleetapi.User) string { + if user.Login != "" { + return "@" + user.Login + } + if user.Email != "" { + return user.Email + } + if user.Subject != "" { + return user.Subject + } + return "unknown" +} + +func WriteSessionGroups(out io.Writer, sessions []fleetapi.Session, indent string) bool { + if len(sessions) == 0 { + return false + } + groups := map[string][]fleetapi.Session{} + owners := make([]string, 0, len(sessions)) + for _, session := range sessions { + owner := session.Owner + if owner == "" { + owner = "unassigned" + } + if _, ok := groups[owner]; !ok { + owners = append(owners, owner) + } + groups[owner] = append(groups[owner], session) + } + sort.Strings(owners) + for _, owner := range owners { + fmt.Fprintf(out, "%s%s:\n", indent, Safe(owner)) + writeSessionTree(out, groups[owner], indent+" ") + } + return true +} + +func WriteSessionLogs(out io.Writer, logs fleetapi.SessionLogs) { + fmt.Fprintf( + out, + "session: %s\nrepo: %s\nstatus: %s\n", + Safe(logs.Session.ID), + Safe(logs.Session.Repo), + Safe(logs.Session.Status), + ) + if logs.Archive.EventCount > 0 { + fmt.Fprintf(out, "archive: %d events\n", logs.Archive.EventCount) + } + for _, event := range logs.Events { + timestamp := time.UnixMilli(event.CreatedAt).Format("15:04:05") + fmt.Fprintf(out, "%s %s %s\n", timestamp, Safe(event.Actor), Safe(event.Message)) + } +} + +func WriteSessionStatus(out io.Writer, session fleetapi.Session) { + fmt.Fprintf(out, "session: %s\n", Safe(session.ID)) + fmt.Fprintf(out, "repo: %s\n", Safe(session.Repo)) + fmt.Fprintf(out, "branch: %s\n", Safe(session.Branch)) + fmt.Fprintf(out, "runtime: %s\n", Safe(session.Runtime)) + fmt.Fprintf(out, "status: %s\n", Safe(session.Status)) + fmt.Fprintf(out, "owner: %s\n", Safe(session.Owner)) + if session.LeaseID != "" { + fmt.Fprintf(out, "lease: %s\n", Safe(session.LeaseID)) + } + if session.ParentSessionID != "" { + fmt.Fprintf(out, "parent: %s\n", Safe(session.ParentSessionID)) + } + if session.RootSessionID != "" { + fmt.Fprintf(out, "root: %s\n", Safe(session.RootSessionID)) + } + if session.CreatedBy != "" { + fmt.Fprintf(out, "created-by: %s\n", Safe(session.CreatedBy)) + } + if session.Purpose != "" { + fmt.Fprintf(out, "purpose: %s\n", Safe(session.Purpose)) + } + if session.Summary != "" { + fmt.Fprintf(out, "summary: %s\n", Safe(session.Summary)) + } + if session.AttachURL != "" { + fmt.Fprintf(out, "attach: %s\n", Safe(session.AttachURL)) + } + if session.VNCURL != "" { + fmt.Fprintf(out, "vnc: %s\n", Safe(session.VNCURL)) + } + if session.LastEvent != "" { + fmt.Fprintf(out, "event: %s\n", Safe(session.LastEvent)) + } +} + +func WriteSessionSummary(out io.Writer, session fleetapi.Session) { + fmt.Fprintf(out, "session: %s\n", Safe(session.ID)) + if session.Purpose != "" { + fmt.Fprintf(out, "purpose: %s\n", Safe(session.Purpose)) + } + if session.Summary != "" { + fmt.Fprintf(out, "summary: %s\n", Safe(session.Summary)) + } +} + +func CompactList(values []string, limit int) string { + if len(values) == 0 { + return "none" + } + if len(values) > limit { + return fmt.Sprintf("%s, +%d more", strings.Join(safeSlice(values[:limit]), ", "), len(values)-limit) + } + return strings.Join(safeSlice(values), ", ") +} + +func writeSessionTree(out io.Writer, sessions []fleetapi.Session, indent string) { + byParent := map[string][]fleetapi.Session{} + known := map[string]bool{} + seen := map[string]bool{} + for _, session := range sessions { + known[session.ID] = true + byParent[session.ParentSessionID] = append(byParent[session.ParentSessionID], session) + } + for parent := range byParent { + sortSessions(byParent[parent]) + } + roots := make([]fleetapi.Session, 0, len(sessions)) + for _, session := range sessions { + if session.ParentSessionID == "" || !known[session.ParentSessionID] { + roots = append(roots, session) + } + } + sortSessions(roots) + var walk func(fleetapi.Session, string) + walk = func(session fleetapi.Session, prefix string) { + if seen[session.ID] { + return + } + seen[session.ID] = true + fmt.Fprintf(out, "%s%s\n", prefix, sessionLine(session)) + for _, child := range byParent[session.ID] { + walk(child, prefix+" ") + } + } + for _, root := range roots { + walk(root, indent) + } + for _, session := range sessions { + if !seen[session.ID] { + walk(session, indent) + } + } +} + +func sessionLine(session fleetapi.Session) string { + parts := []string{ + Safe(session.ID), + Safe(session.Status), + Safe(session.Runtime), + Safe(session.Repo), + } + if summary := session.SummaryText(); summary != "" { + parts = append(parts, "- "+Safe(summary)) + } + return strings.Join(parts, " ") +} + +func sortSessions(sessions []fleetapi.Session) { + sort.SliceStable(sessions, func(i, j int) bool { + return sessions[i].ID < sessions[j].ID + }) +} + +func safeSlice(values []string) []string { + safe := make([]string, len(values)) + for i, value := range values { + safe[i] = Safe(value) + } + return safe +} diff --git a/internal/fleettext/text_test.go b/internal/fleettext/text_test.go new file mode 100644 index 0000000..62b6688 --- /dev/null +++ b/internal/fleettext/text_test.go @@ -0,0 +1,50 @@ +package fleettext + +import ( + "bytes" + "strings" + "testing" + + "github.com/openclaw/crabfleet/internal/fleetapi" +) + +func TestWriteSessionGroupsShowsOwnerTreesAndSummaries(t *testing.T) { + var out bytes.Buffer + written := WriteSessionGroups(&out, []fleetapi.Session{ + { + ID: "IS-2", + Owner: "steipete", + Repo: "openclaw/crabfleet", + Runtime: "container", + Status: "ready", + Summary: "child mission", + ParentSessionID: "IS-1", + }, + { + ID: "IS-1", + Owner: "steipete", + Repo: "openclaw/crabfleet", + Runtime: "container", + Status: "ready", + Purpose: "root mission", + }, + }, " ") + if !written { + t.Fatal("expected sessions to be written") + } + for _, want := range []string{ + " steipete:", + " IS-1 ready container openclaw/crabfleet - root mission", + " IS-2 ready container openclaw/crabfleet - child mission", + } { + if !strings.Contains(out.String(), want) { + t.Fatalf("output missing %q:\n%s", want, out.String()) + } + } +} + +func TestSafeRemovesTerminalControlCharacters(t *testing.T) { + if got, want := Safe("hello\nworld\x1b"), "hello world"; got != want { + t.Fatalf("safe = %q, want %q", got, want) + } +}