From 25c7f4ff71b3a2a2653547a1f64323a053ab4571 Mon Sep 17 00:00:00 2001 From: Toubat Date: Tue, 2 Jun 2026 18:11:29 -0700 Subject: [PATCH 1/5] feat(cli): add `lk agent session` for headless text-mode agent runs Introduces a three-process model (ephemeral CLI command, detached singleton daemon, agent subprocess) that drives a Python/JS agent over TCP using the lk.agent.session protobuf protocol, with no audio/CGO dependency: - `lk agent session start `: re-execs the lk binary as a detached daemon bound to a fixed loopback port (singleton), which spawns the agent and applies text mode; rejects start if a session already runs. - `lk agent session say "..."`: streams a user turn and renders the agent reply, tool calls/outputs, and handoffs to the terminal. - `lk agent session end`: tears down the daemon and agent. The CLI<->daemon control protocol reuses pkg/ipc length-prefixed framing over the same TCP port, disambiguated from agent connections by a magic preamble. The headless renderer covers all ChatItem variants plus the FunctionToolsExecuted event. Drops the now-unnecessary U1000 file-ignore directives added while the helpers were unused. Co-authored-by: Cursor --- cmd/lk/agent_utils.go | 4 +- cmd/lk/main.go | 7 + cmd/lk/proc_unix.go | 2 - cmd/lk/session.go | 270 ++++++++++++++++++++++++ cmd/lk/session_daemon.go | 374 ++++++++++++++++++++++++++++++++++ cmd/lk/session_render.go | 176 ++++++++++++++++ cmd/lk/simulate_subprocess.go | 2 - 7 files changed, 828 insertions(+), 7 deletions(-) create mode 100644 cmd/lk/session.go create mode 100644 cmd/lk/session_daemon.go create mode 100644 cmd/lk/session_render.go diff --git a/cmd/lk/agent_utils.go b/cmd/lk/agent_utils.go index 3ea0522b..d6bca7db 100644 --- a/cmd/lk/agent_utils.go +++ b/cmd/lk/agent_utils.go @@ -14,8 +14,6 @@ package main -//lint:file-ignore U1000 consumed by console-tagged commands (hidden from the default tag-free lint build) and the lk session daemon (follow-up PR); remove once the daemon merges - import ( "fmt" "os" @@ -70,7 +68,7 @@ func detectProject(cmd *cli.Command) (string, agentfs.ProjectType, string, error } // buildConsoleArgs builds the agent subprocess argv for console mode, shared by -// `lk agent console` and the `lk session` daemon. +// `lk agent console` and the `lk agent session` daemon. func buildConsoleArgs(addr string, record bool) []string { args := []string{"console", "--connect-addr", addr} if record { diff --git a/cmd/lk/main.go b/cmd/lk/main.go index 5c491e6e..28086e1e 100644 --- a/cmd/lk/main.go +++ b/cmd/lk/main.go @@ -32,6 +32,13 @@ import ( ) func main() { + // When re-exec'd as the detached session daemon, run that and never reach + // the CLI framework (the daemon is not an exposed subcommand). + if os.Getenv(envSessionDaemon) == "1" { + runSessionDaemon() + return + } + app := &cli.Command{ Name: "lk", Usage: "CLI client to LiveKit", diff --git a/cmd/lk/proc_unix.go b/cmd/lk/proc_unix.go index fb379634..9cff6a7b 100644 --- a/cmd/lk/proc_unix.go +++ b/cmd/lk/proc_unix.go @@ -2,8 +2,6 @@ package main -//lint:file-ignore U1000 consumed by console-tagged commands (hidden from the default tag-free lint build) and the lk session daemon (follow-up PR); remove once the daemon merges - import ( "os/exec" "syscall" diff --git a/cmd/lk/session.go b/cmd/lk/session.go new file mode 100644 index 00000000..3e22aa95 --- /dev/null +++ b/cmd/lk/session.go @@ -0,0 +1,270 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "bufio" + "context" + "encoding/binary" + "encoding/json" + "fmt" + "io" + "net" + "os" + "os/exec" + "strconv" + "strings" + + "github.com/urfave/cli/v3" +) + +// Single-session model: the fixed loopback port is the singleton registry. +// The daemon binds it; whoever wins the bind() is "the session". start, say, +// and end all rendezvous on this one port. No session id, manifest, or dir. +const ( + sessionMagic = "LKCP" // 4-byte preamble that marks a control connection + sessionHost = "127.0.0.1" + defaultSessionPort = 8775 + + envSessionDaemon = "LK_SESSION_DAEMON" // "1" in the re-exec'd daemon child + envSessionPort = "LK_SESSION_PORT" // fixed port + envSessionDir = "LK_SESSION_DIR" // resolved project dir + envSessionEntry = "LK_SESSION_ENTRY" // resolved entrypoint (project-relative) + envSessionPType = "LK_SESSION_PTYPE" // agentfs.ProjectType string + envSessionReadyFD = "LK_SESSION_READY_FD" +) + +var sessionPortFlag = &cli.IntFlag{ + Name: "port", + Sources: cli.EnvVars(envSessionPort), + Value: defaultSessionPort, + Usage: "Fixed loopback port shared by the agent and control connections", +} + +func init() { + // Register under the `agent` group as `lk agent session`, mirroring how + // `lk agent console` attaches itself. Unlike console, this command is not + // gated behind the `console` build tag: it is CGO-free and ships in the + // default binary. + AgentCommands[0].Commands = append(AgentCommands[0].Commands, agentSessionCommand) +} + +var agentSessionCommand = &cli.Command{ + Name: "session", + Usage: "Drive a single local agent session in text mode (start/say/end)", + Category: "Core", + Commands: []*cli.Command{ + { + Name: "start", + Usage: "Start a detached agent session daemon", + ArgsUsage: "[entrypoint]", + Flags: []cli.Flag{sessionPortFlag}, + Action: runSessionStart, + }, + { + Name: "say", + Usage: "Send a text turn to the running session and print the reply", + ArgsUsage: "", + Flags: []cli.Flag{sessionPortFlag}, + Action: runSessionSay, + }, + { + Name: "end", + Usage: "Stop the running session and its agent", + Flags: []cli.Flag{sessionPortFlag}, + Action: runSessionEnd, + }, + }, +} + +func sessionAddr(port int) string { + return fmt.Sprintf("%s:%d", sessionHost, port) +} + +func runSessionStart(ctx context.Context, cmd *cli.Command) error { + projectDir, projectType, entrypoint, err := detectProject(cmd) + if err != nil { + return err + } + port := int(cmd.Int("port")) + + exe, err := os.Executable() + if err != nil { + return fmt.Errorf("could not resolve own binary: %w", err) + } + + // Pipe the daemon uses to report readiness (or a startup error) before we + // return. This avoids racing a TCP probe against the agent's own connect. + readyR, readyW, err := os.Pipe() + if err != nil { + return err + } + defer readyR.Close() + + // The daemon is detached, so its own stdout/stderr (panics etc.) go to a + // temp log rather than the user's terminal. + logFile, err := os.CreateTemp("", "lk-session-daemon-*.log") + if err != nil { + readyW.Close() + return err + } + + daemon := exec.Command(exe) + daemon.Env = append(os.Environ(), + envSessionDaemon+"=1", + envSessionPort+"="+strconv.Itoa(port), + envSessionDir+"="+projectDir, + envSessionEntry+"="+entrypoint, + envSessionPType+"="+string(projectType), + envSessionReadyFD+"=3", // ExtraFiles[0] is fd 3 in the child + ) + daemon.ExtraFiles = []*os.File{readyW} + daemon.Stdout = logFile + daemon.Stderr = logFile + setDetachedProcAttr(daemon) + + if err := daemon.Start(); err != nil { + readyW.Close() + logFile.Close() + return fmt.Errorf("failed to start session daemon: %w", err) + } + // Close our copy of the write end so the read below sees EOF if the daemon dies. + readyW.Close() + logFile.Close() + + status, _ := bufio.NewReader(readyR).ReadString('\n') + status = strings.TrimSpace(status) + switch { + case status == "ready": + fmt.Fprintf(os.Stderr, "Detected %s agent (%s in %s)\n", projectType.Lang(), entrypoint, projectDir) + fmt.Printf("Session started. Use `lk agent session say \"...\"` to talk, `lk agent session end` to stop.\n") + return nil + case strings.HasPrefix(status, "error:"): + return fmt.Errorf("%s", strings.TrimSpace(strings.TrimPrefix(status, "error:"))) + default: + return fmt.Errorf("session daemon exited before becoming ready (see %s)", logFile.Name()) + } +} + +func runSessionSay(ctx context.Context, cmd *cli.Command) error { + text := strings.TrimSpace(strings.Join(cmd.Args().Slice(), " ")) + if text == "" { + return fmt.Errorf("usage: lk agent session say ") + } + conn, err := dialControl(int(cmd.Int("port"))) + if err != nil { + return err + } + defer conn.Close() + + if err := writeControlFrame(conn, controlRequest{Cmd: "say", Text: text}); err != nil { + return err + } + return streamControlReplies(conn) +} + +func runSessionEnd(ctx context.Context, cmd *cli.Command) error { + conn, err := dialControl(int(cmd.Int("port"))) + if err != nil { + return err + } + defer conn.Close() + + if err := writeControlFrame(conn, controlRequest{Cmd: "end"}); err != nil { + return err + } + if err := streamControlReplies(conn); err != nil { + return err + } + fmt.Println("Session ended.") + return nil +} + +// dialControl connects to the session daemon and sends the control preamble. +func dialControl(port int) (net.Conn, error) { + conn, err := net.Dial("tcp", sessionAddr(port)) + if err != nil { + return nil, fmt.Errorf("no session running on %s (run `lk agent session start` first)", sessionAddr(port)) + } + if _, err := conn.Write([]byte(sessionMagic)); err != nil { + conn.Close() + return nil, err + } + return conn, nil +} + +func streamControlReplies(conn net.Conn) error { + for { + var reply controlReply + if err := readControlFrame(conn, &reply); err != nil { + if err == io.EOF { + return nil + } + return err + } + if reply.Line != "" { + fmt.Println(reply.Line) + } + if reply.Done { + if reply.Error != "" { + return fmt.Errorf("%s", reply.Error) + } + return nil + } + } +} + +// Control protocol: a 4-byte big-endian length prefix + a JSON payload, mirroring +// pkg/ipc's framing but with JSON instead of protobuf (no new protobufs needed). +type controlRequest struct { + Cmd string `json:"cmd"` + Text string `json:"text,omitempty"` +} + +type controlReply struct { + Line string `json:"line,omitempty"` + Done bool `json:"done,omitempty"` + Error string `json:"error,omitempty"` +} + +func writeControlFrame(w io.Writer, v any) error { + data, err := json.Marshal(v) + if err != nil { + return err + } + var hdr [4]byte + binary.BigEndian.PutUint32(hdr[:], uint32(len(data))) + if _, err := w.Write(hdr[:]); err != nil { + return err + } + _, err = w.Write(data) + return err +} + +func readControlFrame(r io.Reader, v any) error { + var hdr [4]byte + if _, err := io.ReadFull(r, hdr[:]); err != nil { + return err + } + length := binary.BigEndian.Uint32(hdr[:]) + if length > 1<<20 { + return fmt.Errorf("control frame too large: %d bytes", length) + } + data := make([]byte, length) + if _, err := io.ReadFull(r, data); err != nil { + return err + } + return json.Unmarshal(data, v) +} diff --git a/cmd/lk/session_daemon.go b/cmd/lk/session_daemon.go new file mode 100644 index 00000000..29c3d85d --- /dev/null +++ b/cmd/lk/session_daemon.go @@ -0,0 +1,374 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "bytes" + "fmt" + "io" + "net" + "os" + "strconv" + "sync" + "time" + + "github.com/livekit/livekit-cli/v2/pkg/agentfs" + "github.com/livekit/livekit-cli/v2/pkg/console" + "github.com/livekit/livekit-cli/v2/pkg/ipc" + + agent "github.com/livekit/protocol/livekit/agent" +) + +// runSessionDaemon is the entry point when the binary is re-exec'd with +// LK_SESSION_DAEMON=1. It never returns to the CLI framework. +func runSessionDaemon() { + ready := readyWriter() + port, _ := strconv.Atoi(os.Getenv(envSessionPort)) + + // The fixed port is the singleton: if the bind fails, a session already + // owns it, which is how `lk agent session start` learns to reject. + server, err := console.NewTCPServer(sessionAddr(port)) + if err != nil { + signalReady(ready, "error: a session is already running on "+sessionAddr(port)) + os.Exit(1) + } + defer server.Close() + + // TODO(node): detect a node/JS agent project and build the equivalent + // `node console --connect-addr ` argv. + agentProc, err := startAgent(AgentStartConfig{ + Dir: os.Getenv(envSessionDir), + Entrypoint: os.Getenv(envSessionEntry), + ProjectType: agentfs.ProjectType(os.Getenv(envSessionPType)), + CLIArgs: buildConsoleArgs(server.Addr().String(), false), + }) + if err != nil { + signalReady(ready, "error: failed to start agent: "+err.Error()) + os.Exit(1) + } + + d := &sessionDaemon{ + server: server, + agentProc: agentProc, + events: make(chan *agent.AgentSessionEvent, 64), + responses: make(chan *agent.SessionResponse, 8), + queue: make(chan *sessionCommand, 16), + agentReady: make(chan struct{}), + agentDone: make(chan struct{}), + shutdown: make(chan struct{}), + } + + go d.acceptLoop() + + select { + case <-d.agentReady: + d.setTextMode() + signalReady(ready, "ready") + case waitErr := <-agentProc.Done(): + msg := "error: agent exited before connecting" + if waitErr != nil { + msg += ": " + waitErr.Error() + } + signalReady(ready, msg) + agentProc.Kill() + os.Exit(1) + case <-time.After(60 * time.Second): + signalReady(ready, "error: timed out waiting for agent to connect") + agentProc.Kill() + os.Exit(1) + } + + go d.worker() + + select { + case <-d.agentDone: + case <-d.shutdown: + } + agentProc.Kill() +} + +// readyWriter returns the inherited pipe `lk agent session start` reads to learn the +// daemon became ready (or failed). Nil if not launched via start. +func readyWriter() *os.File { + fdStr := os.Getenv(envSessionReadyFD) + if fdStr == "" { + return nil + } + fd, err := strconv.Atoi(fdStr) + if err != nil { + return nil + } + return os.NewFile(uintptr(fd), "ready") +} + +func signalReady(f *os.File, msg string) { + if f == nil { + return + } + fmt.Fprintln(f, msg) + f.Close() +} + +type sessionDaemon struct { + server *console.TCPServer + agentProc *AgentProcess + + agentMu sync.Mutex + agentConn net.Conn + agentRead io.Reader + writeMu sync.Mutex // serializes writes to the agent connection + + events chan *agent.AgentSessionEvent + responses chan *agent.SessionResponse + queue chan *sessionCommand + + agentReady chan struct{} + agentDone chan struct{} + doneOnce sync.Once + shutdown chan struct{} + shutOnce sync.Once + + reqCounter int +} + +type sessionCommand struct { + kind string + text string + out net.Conn + done chan struct{} +} + +func (d *sessionDaemon) acceptLoop() { + for { + conn, err := d.server.AcceptConn() + if err != nil { + return // listener closed + } + go d.handleConn(conn) + } +} + +func (d *sessionDaemon) handleConn(conn net.Conn) { + isControl, reader, err := classifyConn(conn) + if err != nil { + conn.Close() + return + } + if isControl { + d.handleControlConn(conn) + return + } + + // First non-control connection is the agent. + d.agentMu.Lock() + if d.agentConn != nil { + d.agentMu.Unlock() + conn.Close() + return + } + d.agentConn = conn + d.agentRead = reader + d.agentMu.Unlock() + + close(d.agentReady) + go d.agentMessageLoop() +} + +// classifyConn routes a connection by its 4-byte preamble. Control clients send +// the magic; the unmodified agent never does. "LKCP" decodes to a ~1.28 GB +// length prefix, which exceeds pkg/ipc's 1 MB cap, so a real agent frame can +// never begin with these bytes. +func classifyConn(conn net.Conn) (bool, io.Reader, error) { + var hdr [4]byte + if _, err := io.ReadFull(conn, hdr[:]); err != nil { + return false, nil, err + } + if string(hdr[:]) == sessionMagic { + return true, conn, nil + } + // Push the peeked bytes back so proto framing sees a complete frame. + return false, io.MultiReader(bytes.NewReader(hdr[:]), conn), nil +} + +func (d *sessionDaemon) agentMessageLoop() { + for { + msg := &agent.AgentSessionMessage{} + if err := ipc.ReadProto(d.agentRead, msg); err != nil { + d.doneOnce.Do(func() { close(d.agentDone) }) + return + } + switch m := msg.Message.(type) { + case *agent.AgentSessionMessage_Event: + select { + case d.events <- m.Event: + default: + } + case *agent.AgentSessionMessage_Response: + if m.Response != nil { + select { + case d.responses <- m.Response: + default: + } + } + case *agent.AgentSessionMessage_AudioOutput, *agent.AgentSessionMessage_AudioPlaybackClear: + // No audio sink in text mode: drop. + case *agent.AgentSessionMessage_AudioPlaybackFlush: + // Nothing to drain, so ack immediately or the agent's turn (and the + // RunInputResponse we await) never completes. + _ = d.writeAgent(&agent.AgentSessionMessage{ + Message: &agent.AgentSessionMessage_AudioPlaybackFinished{ + AudioPlaybackFinished: &agent.AgentSessionMessage_ConsoleIO_AudioPlaybackFinished{}, + }, + }) + } + } +} + +func (d *sessionDaemon) writeAgent(msg *agent.AgentSessionMessage) error { + d.agentMu.Lock() + conn := d.agentConn + d.agentMu.Unlock() + if conn == nil { + return fmt.Errorf("agent not connected") + } + d.writeMu.Lock() + defer d.writeMu.Unlock() + return ipc.WriteProto(conn, msg) +} + +// setTextMode disables the agent's audio I/O so it runs as a pure text turn +// handler, matching what `lk agent console` does when switching to text mode. +func (d *sessionDaemon) setTextMode() { + off := false + _ = d.writeAgent(&agent.AgentSessionMessage{ + Message: &agent.AgentSessionMessage_Request{ + Request: &agent.SessionRequest{ + RequestId: "session-io", + Request: &agent.SessionRequest_UpdateIo{ + UpdateIo: &agent.SessionRequest_UpdateIO{ + Input: &agent.SessionRequest_UpdateIO_Input{AudioEnabled: &off}, + Output: &agent.SessionRequest_UpdateIO_Output{AudioEnabled: &off, TranscriptionEnabled: &off}, + }, + }, + }, + }, + }) +} + +func (d *sessionDaemon) handleControlConn(conn net.Conn) { + var req controlRequest + if err := readControlFrame(conn, &req); err != nil { + conn.Close() + return + } + cmd := &sessionCommand{kind: req.Cmd, text: req.Text, out: conn, done: make(chan struct{})} + select { + case d.queue <- cmd: + case <-d.shutdown: + conn.Close() + return + } + <-cmd.done + conn.Close() +} + +func (d *sessionDaemon) worker() { + for { + select { + case cmd := <-d.queue: + d.runCommand(cmd) + case <-d.shutdown: + return + } + } +} + +func (d *sessionDaemon) runCommand(cmd *sessionCommand) { + defer close(cmd.done) + switch cmd.kind { + case "say": + d.runSay(cmd) + case "end": + _ = writeControlFrame(cmd.out, controlReply{Done: true}) + d.shutOnce.Do(func() { close(d.shutdown) }) + default: + _ = writeControlFrame(cmd.out, controlReply{Done: true, Error: "unknown command: " + cmd.kind}) + } +} + +func (d *sessionDaemon) runSay(cmd *sessionCommand) { + d.reqCounter++ + reqID := "session-" + strconv.Itoa(d.reqCounter) + + d.drainEvents() // discard anything emitted before this turn (e.g. greeting) + _ = writeControlFrame(cmd.out, controlReply{Line: renderUserMessage(cmd.text)}) + + if err := d.writeAgent(&agent.AgentSessionMessage{ + Message: &agent.AgentSessionMessage_Request{ + Request: &agent.SessionRequest{ + RequestId: reqID, + Request: &agent.SessionRequest_RunInput_{ + RunInput: &agent.SessionRequest_RunInput{Text: cmd.text}, + }, + }, + }, + }); err != nil { + _ = writeControlFrame(cmd.out, controlReply{Done: true, Error: err.Error()}) + return + } + + for { + select { + case ev := <-d.events: + if line := renderEvent(ev); line != "" { + if err := writeControlFrame(cmd.out, controlReply{Line: line}); err != nil { + return + } + } + case resp := <-d.responses: + if resp.GetRequestId() == reqID { + d.flushEvents(cmd.out) + _ = writeControlFrame(cmd.out, controlReply{Done: true}) + return + } + case <-d.agentDone: + _ = writeControlFrame(cmd.out, controlReply{Done: true, Error: "agent exited"}) + return + } + } +} + +func (d *sessionDaemon) drainEvents() { + for { + select { + case <-d.events: + default: + return + } + } +} + +func (d *sessionDaemon) flushEvents(out net.Conn) { + for { + select { + case ev := <-d.events: + if line := renderEvent(ev); line != "" { + _ = writeControlFrame(out, controlReply{Line: line}) + } + default: + return + } + } +} diff --git a/cmd/lk/session_render.go b/cmd/lk/session_render.go new file mode 100644 index 00000000..4b124dfe --- /dev/null +++ b/cmd/lk/session_render.go @@ -0,0 +1,176 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "strings" + + "github.com/charmbracelet/lipgloss" + + agent "github.com/livekit/protocol/livekit/agent" +) + +// Styles for the headless session output. Named distinctly from the console +// TUI styles so both can coexist in the console-tagged build. +var ( + sessionCyan = lipgloss.Color("#1fd5f9") + sessionGreen = lipgloss.Color("#6BCB77") + sessionPurple = lipgloss.Color("#8f83ff") + sessionRed = lipgloss.Color("#FF6B6B") + sessionUserStyle = lipgloss.NewStyle().Foreground(sessionCyan).Bold(true) + sessionAgentStyle = lipgloss.NewStyle().Foreground(sessionGreen).Bold(true) + sessionDimStyle = lipgloss.NewStyle().Faint(true) + sessionRedStyle = lipgloss.NewStyle().Foreground(sessionRed) +) + +// renderUserMessage formats the text the user said, echoed back by `say`. +func renderUserMessage(text string) string { + var b strings.Builder + b.WriteString("\n ") + b.WriteString(lipgloss.NewStyle().Foreground(sessionCyan).Render("● ")) + b.WriteString(sessionUserStyle.Render("You")) + for _, line := range strings.Split(text, "\n") { + b.WriteString("\n ") + b.WriteString(line) + } + return b.String() +} + +// renderEvent turns an AgentSessionEvent into a printable line, or "" if the +// event carries nothing worth showing in text mode. +func renderEvent(ev *agent.AgentSessionEvent) string { + if ev == nil { + return "" + } + switch e := ev.Event.(type) { + case *agent.AgentSessionEvent_ConversationItemAdded_: + if item := e.ConversationItemAdded.Item; item != nil { + return renderChatItem(item) + } + case *agent.AgentSessionEvent_FunctionToolsExecuted_: + return renderFunctionTools(e.FunctionToolsExecuted) + case *agent.AgentSessionEvent_Error_: + return " " + sessionRedStyle.Render("✗ "+e.Error.Message) + } + return "" +} + +func renderChatItem(item *agent.ChatContext_ChatItem) string { + switch i := item.Item.(type) { + case *agent.ChatContext_ChatItem_Message: + msg := i.Message + if msg.Role == agent.ChatRole_USER { + return "" // the user message is echoed separately by `say` + } + var parts []string + for _, c := range msg.Content { + if t := c.GetText(); t != "" { + parts = append(parts, t) + } + } + text := strings.Join(parts, "") + if text == "" { + return "" + } + var b strings.Builder + b.WriteString("\n ") + b.WriteString(lipgloss.NewStyle().Foreground(sessionGreen).Render("● ")) + b.WriteString(sessionAgentStyle.Render("Agent")) + for _, line := range strings.Split(text, "\n") { + b.WriteString("\n ") + b.WriteString(line) + } + return b.String() + + case *agent.ChatContext_ChatItem_FunctionCall: + return "\n ● function_tool: " + i.FunctionCall.Name + + case *agent.ChatContext_ChatItem_FunctionCallOutput: + fco := i.FunctionCallOutput + prefix, style := "✓ ", sessionDimStyle + if fco.IsError { + prefix, style = "✗ ", sessionRedStyle + } + return " " + style.Render(prefix+sessionSummarizeOutput(fco.Output)) + + case *agent.ChatContext_ChatItem_AgentHandoff: + h := i.AgentHandoff + old := "" + if h.OldAgentId != nil && *h.OldAgentId != "" { + old = sessionDimStyle.Render(*h.OldAgentId) + " → " + } + return " " + lipgloss.NewStyle().Foreground(sessionPurple).Render("● ") + + sessionDimStyle.Render("handoff: ") + old + h.NewAgentId + + case *agent.ChatContext_ChatItem_AgentConfigUpdate: + u := i.AgentConfigUpdate + var parts []string + if u.Instructions != nil { + parts = append(parts, "instructions updated") + } + if len(u.ToolsAdded) > 0 { + parts = append(parts, "tools added: "+strings.Join(u.ToolsAdded, ", ")) + } + if len(u.ToolsRemoved) > 0 { + parts = append(parts, "tools removed: "+strings.Join(u.ToolsRemoved, ", ")) + } + if len(parts) == 0 { + return "" + } + return " " + lipgloss.NewStyle().Foreground(sessionPurple).Render("● ") + + sessionDimStyle.Render("config: "+strings.Join(parts, "; ")) + } + return "" +} + +func renderFunctionTools(ft *agent.AgentSessionEvent_FunctionToolsExecuted) string { + if ft == nil { + return "" + } + outputs := make(map[string]*agent.FunctionCallOutput, len(ft.FunctionCallOutputs)) + for _, fco := range ft.FunctionCallOutputs { + outputs[fco.CallId] = fco + } + var b strings.Builder + for i, fc := range ft.FunctionCalls { + if i > 0 { + b.WriteString("\n") + } + b.WriteString("\n ● function_tool: ") + b.WriteString(fc.Name) + if fco, ok := outputs[fc.CallId]; ok { + b.WriteString("\n ") + if fco.IsError { + b.WriteString(sessionRedStyle.Render("✗ " + sessionSummarizeOutput(fco.Output))) + } else { + b.WriteString(sessionDimStyle.Render("✓ " + sessionSummarizeOutput(fco.Output))) + } + } + } + return b.String() +} + +// sessionSummarizeOutput collapses a tool output to a single, length-capped line. +func sessionSummarizeOutput(out string) string { + out = strings.TrimSpace(out) + if idx := strings.IndexByte(out, '\n'); idx >= 0 { + out = out[:idx] + } + const max = 120 + if len(out) > max { + out = out[:max] + "…" + } + return out +} diff --git a/cmd/lk/simulate_subprocess.go b/cmd/lk/simulate_subprocess.go index 174fb0b4..f15728a2 100644 --- a/cmd/lk/simulate_subprocess.go +++ b/cmd/lk/simulate_subprocess.go @@ -14,8 +14,6 @@ package main -//lint:file-ignore U1000 consumed by console-tagged commands (hidden from the default tag-free lint build) and the lk session daemon (follow-up PR); remove once the daemon merges - import ( "bufio" "encoding/json" From 4fc871ad1aa47283481cd1bf66fa8d652375f1b2 Mon Sep 17 00:00:00 2001 From: Toubat Date: Tue, 2 Jun 2026 18:29:58 -0700 Subject: [PATCH 2/5] fix(cli): skip empty function-tool output line in session renderer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tools that return no string (e.g. handoff tools returning an Agent) produced a bare "✓ " line. Suppress the output line when the summarized output is empty for successful calls; error outputs still render. Co-authored-by: Cursor --- cmd/lk/session_render.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/cmd/lk/session_render.go b/cmd/lk/session_render.go index 4b124dfe..b4a084b8 100644 --- a/cmd/lk/session_render.go +++ b/cmd/lk/session_render.go @@ -99,11 +99,14 @@ func renderChatItem(item *agent.ChatContext_ChatItem) string { case *agent.ChatContext_ChatItem_FunctionCallOutput: fco := i.FunctionCallOutput - prefix, style := "✓ ", sessionDimStyle + summary := sessionSummarizeOutput(fco.Output) if fco.IsError { - prefix, style = "✗ ", sessionRedStyle + return " " + sessionRedStyle.Render("✗ "+summary) } - return " " + style.Render(prefix+sessionSummarizeOutput(fco.Output)) + if summary == "" { + return "" + } + return " " + sessionDimStyle.Render("✓ "+summary) case *agent.ChatContext_ChatItem_AgentHandoff: h := i.AgentHandoff @@ -151,11 +154,13 @@ func renderFunctionTools(ft *agent.AgentSessionEvent_FunctionToolsExecuted) stri b.WriteString("\n ● function_tool: ") b.WriteString(fc.Name) if fco, ok := outputs[fc.CallId]; ok { - b.WriteString("\n ") + summary := sessionSummarizeOutput(fco.Output) if fco.IsError { - b.WriteString(sessionRedStyle.Render("✗ " + sessionSummarizeOutput(fco.Output))) - } else { - b.WriteString(sessionDimStyle.Render("✓ " + sessionSummarizeOutput(fco.Output))) + b.WriteString("\n ") + b.WriteString(sessionRedStyle.Render("✗ " + summary)) + } else if summary != "" { + b.WriteString("\n ") + b.WriteString(sessionDimStyle.Render("✓ " + summary)) } } } From 5f0ca88881e4faee30af8dfc7ac45d1e9364e3e7 Mon Sep 17 00:00:00 2001 From: Toubat Date: Wed, 3 Jun 2026 15:46:31 -0700 Subject: [PATCH 3/5] refactor(cli): dispatch session daemon via hidden subcommand entrypoint Replace the env-gated branch at the top of main() with a dedicated, hidden `lk agent session daemon` subcommand (mirroring the existing hidden `generate-fish-completion` command). `start` now re-execs the binary into that subcommand instead of setting LK_SESSION_DAEMON=1, so the daemon has its own entrypoint dispatched by the CLI framework rather than special-casing main(). Re-exec of the same binary is retained (a separate binary can't be located reliably after `go install`); runtime params still flow through the LK_SESSION_* env vars. Co-authored-by: Cursor --- cmd/lk/main.go | 7 ------- cmd/lk/session.go | 27 ++++++++++++++++++++------- cmd/lk/session_daemon.go | 5 +++-- 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/cmd/lk/main.go b/cmd/lk/main.go index 28086e1e..5c491e6e 100644 --- a/cmd/lk/main.go +++ b/cmd/lk/main.go @@ -32,13 +32,6 @@ import ( ) func main() { - // When re-exec'd as the detached session daemon, run that and never reach - // the CLI framework (the daemon is not an exposed subcommand). - if os.Getenv(envSessionDaemon) == "1" { - runSessionDaemon() - return - } - app := &cli.Command{ Name: "lk", Usage: "CLI client to LiveKit", diff --git a/cmd/lk/session.go b/cmd/lk/session.go index 3e22aa95..d5f87c81 100644 --- a/cmd/lk/session.go +++ b/cmd/lk/session.go @@ -38,12 +38,14 @@ const ( sessionHost = "127.0.0.1" defaultSessionPort = 8775 - envSessionDaemon = "LK_SESSION_DAEMON" // "1" in the re-exec'd daemon child - envSessionPort = "LK_SESSION_PORT" // fixed port - envSessionDir = "LK_SESSION_DIR" // resolved project dir - envSessionEntry = "LK_SESSION_ENTRY" // resolved entrypoint (project-relative) - envSessionPType = "LK_SESSION_PTYPE" // agentfs.ProjectType string + envSessionPort = "LK_SESSION_PORT" // fixed port + envSessionDir = "LK_SESSION_DIR" // resolved project dir + envSessionEntry = "LK_SESSION_ENTRY" // resolved entrypoint (project-relative) + envSessionPType = "LK_SESSION_PTYPE" // agentfs.ProjectType string envSessionReadyFD = "LK_SESSION_READY_FD" + + // sessionDaemonSubcommand is the hidden entrypoint `start` re-execs into. + sessionDaemonSubcommand = "daemon" ) var sessionPortFlag = &cli.IntFlag{ @@ -86,6 +88,18 @@ var agentSessionCommand = &cli.Command{ Flags: []cli.Flag{sessionPortFlag}, Action: runSessionEnd, }, + { + // Hidden re-exec entrypoint. `lk agent session start` relaunches + // this binary as `lk agent session daemon` to run the detached + // daemon; it is not meant to be invoked directly. Configuration is + // inherited through the LK_SESSION_* env vars set by start. + Name: sessionDaemonSubcommand, + Hidden: true, + Action: func(ctx context.Context, cmd *cli.Command) error { + runSessionDaemon() + return nil + }, + }, }, } @@ -121,9 +135,8 @@ func runSessionStart(ctx context.Context, cmd *cli.Command) error { return err } - daemon := exec.Command(exe) + daemon := exec.Command(exe, "agent", "session", sessionDaemonSubcommand) daemon.Env = append(os.Environ(), - envSessionDaemon+"=1", envSessionPort+"="+strconv.Itoa(port), envSessionDir+"="+projectDir, envSessionEntry+"="+entrypoint, diff --git a/cmd/lk/session_daemon.go b/cmd/lk/session_daemon.go index 29c3d85d..720e8b38 100644 --- a/cmd/lk/session_daemon.go +++ b/cmd/lk/session_daemon.go @@ -31,8 +31,9 @@ import ( agent "github.com/livekit/protocol/livekit/agent" ) -// runSessionDaemon is the entry point when the binary is re-exec'd with -// LK_SESSION_DAEMON=1. It never returns to the CLI framework. +// runSessionDaemon is the entry point for the hidden `lk agent session daemon` +// subcommand that `lk agent session start` re-execs. It runs the detached +// daemon to completion (until the agent exits or `end` is received). func runSessionDaemon() { ready := readyWriter() port, _ := strconv.Atoi(os.Getenv(envSessionPort)) From 0a32ba09da43b4ce7588de4d7c996ae9ccc9ee31 Mon Sep 17 00:00:00 2001 From: Toubat Date: Wed, 3 Jun 2026 15:50:39 -0700 Subject: [PATCH 4/5] fix(cli): reject direct invocation of hidden session daemon entrypoint A registered subcommand is always invokable (Hidden only drops it from help), so a stray `lk agent session daemon` previously spawned a half-configured daemon (random port, empty project dir) that exited silently. Guard the entrypoint on the inherited readiness pipe that `start` always provides: without it, return a clear error directing the user to `lk agent session start`. Co-authored-by: Cursor --- cmd/lk/session.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/cmd/lk/session.go b/cmd/lk/session.go index d5f87c81..31cab7e9 100644 --- a/cmd/lk/session.go +++ b/cmd/lk/session.go @@ -89,13 +89,15 @@ var agentSessionCommand = &cli.Command{ Action: runSessionEnd, }, { - // Hidden re-exec entrypoint. `lk agent session start` relaunches - // this binary as `lk agent session daemon` to run the detached - // daemon; it is not meant to be invoked directly. Configuration is - // inherited through the LK_SESSION_* env vars set by start. Name: sessionDaemonSubcommand, Hidden: true, Action: func(ctx context.Context, cmd *cli.Command) error { + // Only meaningful when re-exec'd by `start`, which hands down an + // inherited readiness pipe. Reject direct invocation so a stray + // `lk agent session daemon` can't spawn a half-configured daemon. + if os.Getenv(envSessionReadyFD) == "" { + return fmt.Errorf("`session daemon` is an internal entrypoint; run `lk agent session start ` instead") + } runSessionDaemon() return nil }, From 7514534b35d4df5ec72b976d688b376e10ad487f Mon Sep 17 00:00:00 2001 From: Toubat Date: Wed, 3 Jun 2026 16:22:10 -0700 Subject: [PATCH 5/5] Update session.go --- cmd/lk/session.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/cmd/lk/session.go b/cmd/lk/session.go index 31cab7e9..0b964f1d 100644 --- a/cmd/lk/session.go +++ b/cmd/lk/session.go @@ -92,9 +92,6 @@ var agentSessionCommand = &cli.Command{ Name: sessionDaemonSubcommand, Hidden: true, Action: func(ctx context.Context, cmd *cli.Command) error { - // Only meaningful when re-exec'd by `start`, which hands down an - // inherited readiness pipe. Reject direct invocation so a stray - // `lk agent session daemon` can't spawn a half-configured daemon. if os.Getenv(envSessionReadyFD) == "" { return fmt.Errorf("`session daemon` is an internal entrypoint; run `lk agent session start ` instead") }