Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 48 additions & 2 deletions packages/core/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,43 @@ export function isPlanApproval(line: string): boolean {
return /^(approve|approved|go|lgtm|implement)[.!]?$/i.test(line.trim());
}

/**
* Coalesce the burst of `line` events a multi-line PASTE produces into ONE
* message. A terminal delivers a pasted block as many newline-separated lines in
* a single tick, and readline emits a `line` event per newline — so without this
* each line was submitted separately: N messages, and mid-run, N "↳ queued (steers
* the next turn)" notices for a single paste. Buffering the lines and flushing them
* joined by newline on the next tick turns one paste into one message; a single
* human-typed line (one event, then an idle gap) flushes unchanged a tick later.
* `schedule` is injectable so the batching is deterministically testable.
*/
export function createPasteBatcher(
flush: (message: string) => void,
schedule: (fn: () => void) => void = (fn) => {
setImmediate(fn);
}
): (raw: string) => void {
let batch: string[] = [];
let scheduled = false;

return (raw: string): void => {
batch.push(raw);

if (scheduled) {
return;
}

scheduled = true;
schedule(() => {
const message = batch.join("\n");

batch = [];
scheduled = false;
flush(message);
});
};
}
Comment on lines +586 to +611

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Because createPasteBatcher uses setImmediate to schedule the flush, any pasted lines or piped input remaining in the buffer when the stream closes (EOF) will be lost. This is because the close event is emitted synchronously during the poll phase, which immediately triggers the REPL's teardown and exits the process before the setImmediate callback has a chance to run in the check phase.

To fix this, we should expose a synchronous flush method on the batcher so that any pending lines can be flushed immediately when the stream closes.

export interface PasteBatcher {
  (raw: string): void;
  flush(): void;
}

export function createPasteBatcher(
  flush: (message: string) => void,
  schedule: (fn: () => void) => void = (fn) => {
    setImmediate(fn);
  }
): PasteBatcher {
  let batch: string[] = [];
  let scheduled = false;

  const triggerFlush = (): void => {
    if (batch.length === 0) {
      return;
    }
    const message = batch.join("\n");
    batch = [];
    scheduled = false;
    flush(message);
  };

  const batcher = (raw: string): void => {
    batch.push(raw);

    if (scheduled) {
      return;
    }

    scheduled = true;
    schedule(() => {
      triggerFlush();
    });
  };

  batcher.flush = triggerFlush;

  return batcher;
}


// The /help body is generated from the command registry (src/cli/commands.ts) so
// the help text and the interactive `/` palette can never drift.
const HELP = formatHelp();
Expand Down Expand Up @@ -1665,8 +1702,11 @@ async function repl(args: ICliArgs): Promise<number> {
// Event-driven (not for-await) so stdin is read DURING a run: a line typed
// mid-run is queued to steer the next turn (or, if "/exit", aborts). This is
// what makes it feel like a real harness — you can redirect without waiting.
rl.on("line", (raw) => {
const line = raw.trim();
// Submit ONE message (already paste-coalesced). A multi-line paste arrives as
// many `line` events in one tick; the batcher joins them so this runs once with
// the whole block, instead of once per line (which read as N steer messages).
const submitLine = (message: string): void => {
const line = message.trim();

if (line.length === 0) {
if (!busy) {
Expand Down Expand Up @@ -1696,6 +1736,12 @@ async function repl(args: ICliArgs): Promise<number> {
}

void runLine(line);
};

const onPastedLine = createPasteBatcher(submitLine);

rl.on("line", (raw) => {
onPastedLine(raw);
});
Comment on lines +1741 to 1745

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

To prevent losing pending input when the stream closes (such as piped input or a fast paste followed by EOF), we must synchronously flush the paste batcher when the close event is emitted. By registering a close listener before the original one, we ensure the batcher is flushed first, which synchronously sets busy = true and prevents the REPL from exiting prematurely.

    const onPastedLine = createPasteBatcher(submitLine);

    rl.on("line", (raw) => {
      onPastedLine(raw);
    });

    rl.on("close", () => {
      onPastedLine.flush();
    });


rl.on("close", () => {
Expand Down
53 changes: 52 additions & 1 deletion packages/core/tests/cli.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@ import { test, expect } from "bun:test";
import { mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { parseArgs, isOneShot, applyRecipe, runNotify } from "../src/cli";
import {
parseArgs,
isOneShot,
applyRecipe,
runNotify,
createPasteBatcher,
} from "../src/cli";
import type { ITaskRecipe } from "../src/config/recipes";

// Regression: runNotify used to spawn `sh -c cmd` with a bare `await proc.exited`
Expand Down Expand Up @@ -301,3 +307,48 @@ test("spinner exposes a live 'compacting' activity label and repaints via onTick
spinner.stop();
expect(spinner.frameLabel()).toBe(""); // stopped → loader cleared
});

// Regression: a multi-line PASTE fired one readline `line` event per newline, so
// each line submitted separately — N messages, or mid-run N "↳ queued (steers the
// next turn)" notices for ONE paste. The batcher coalesces a burst (same tick)
// into a single newline-joined message.
test("createPasteBatcher coalesces a multi-line paste into one message", () => {
const flushed: string[] = [];
// Synchronous scheduler: run the flush immediately so the test is deterministic
// (the burst is pushed before the scheduled flush, exactly as in one tick).
const pending: (() => void)[] = [];
const onLine = createPasteBatcher(
(m) => flushed.push(m),
(fn) => pending.push(fn)
);

// A paste: many lines (incl. a blank one) delivered before the tick settles.
for (const l of ["line one", "line two", "", "line four"]) {
onLine(l);
}

// Nothing flushed until the scheduled tick runs.
expect(flushed).toEqual([]);

pending.forEach((fn) => fn());

expect(flushed).toEqual(["line one\nline two\n\nline four"]);
});

test("createPasteBatcher submits a single typed line unchanged, once", () => {
const flushed: string[] = [];
const pending: (() => void)[] = [];
const onLine = createPasteBatcher(
(m) => flushed.push(m),
(fn) => pending.push(fn)
);

onLine("just one line");
pending.forEach((fn) => fn());

// A later, separate line (new tick) flushes on its own — not merged with the first.
onLine("a second, separate message");
pending.slice(1).forEach((fn) => fn());

expect(flushed).toEqual(["just one line", "a second, separate message"]);
});
Comment on lines +338 to +354

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Add a unit test to verify that the new flush() method on createPasteBatcher correctly flushes any pending buffered lines immediately and synchronously.

test("createPasteBatcher submits a single typed line unchanged, once", () => {
  const flushed: string[] = [];
  const pending: (() => void)[] = [];
  const onLine = createPasteBatcher(
    (m) => flushed.push(m),
    (fn) => pending.push(fn)
  );

  onLine("just one line");
  pending.forEach((fn) => fn());

  // A later, separate line (new tick) flushes on its own — not merged with the first.
  onLine("a second, separate message");
  pending.slice(1).forEach((fn) => fn());

  expect(flushed).toEqual(["just one line", "a second, separate message"]);
});

test("createPasteBatcher flush() flushes immediately and synchronously", () => {
  const flushed: string[] = [];
  const pending: (() => void)[] = [];
  const onLine = createPasteBatcher(
    (m) => flushed.push(m),
    (fn) => pending.push(fn)
  );

  onLine("line one");
  onLine("line two");

  expect(flushed).toEqual([]);

  onLine.flush();

  expect(flushed).toEqual(["line one\nline two"]);
});