Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/major-pianos-battle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chat": minor
---

add streaming options to thread.post() with platform-specific namespacing
5 changes: 5 additions & 0 deletions packages/chat/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ export type {
export { isPostableObject } from "./postable-object";
export { reviver } from "./reviver";
export { StreamingMarkdownRenderer } from "./streaming-markdown";
export {
StreamingPlan,
type StreamingPlanData,
type StreamingPlanOptions,
} from "./streaming-plan";
export { type SerializedThread, ThreadImpl } from "./thread";

// Card builders - import then re-export to ensure values are properly exported
Expand Down
89 changes: 89 additions & 0 deletions packages/chat/src/streaming-plan.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import {
POSTABLE_OBJECT,
type PostableObject,
type PostableObjectContext,
} from "./postable-object";
import type { Adapter, StreamChunk, StreamEvent } from "./types";

export interface StreamingPlanOptions {
/**
* Block Kit elements to attach when the stream stops (Slack only).
* Useful for adding feedback buttons after a streamed response.
*/
endWith?: unknown[];
/**
* Controls how task_update chunks are displayed (Slack only).
* - `"plan"` - all tasks grouped into a single plan block
* - `"timeline"` - individual task cards shown inline with text (default)
*/
groupTasks?: "plan" | "timeline";
/**
* Minimum interval between updates in ms (default: 500).
* Used for fallback mode (post+edit on adapters without native streaming).
*/
updateIntervalMs?: number;
}

export interface StreamingPlanData {
options: StreamingPlanOptions;
stream: AsyncIterable<string | StreamChunk | StreamEvent>;
}

/**
* A StreamingPlan wraps an async iterable with platform-specific streaming options.
*
* Use this when you need to pass options like task grouping or stop blocks
* to the streaming API. For simple streaming without options, pass the
* async iterable directly to `thread.post()`.
*
* @example
* ```typescript
* const stream = new StreamingPlan(result.fullStream, {
* groupTasks: "plan",
* endWith: [feedbackBlock],
* });
* await thread.post(stream);
* ```
*/
export class StreamingPlan implements PostableObject<StreamingPlanData> {
readonly $$typeof = POSTABLE_OBJECT;
readonly kind = "stream";

private readonly _stream: AsyncIterable<string | StreamChunk | StreamEvent>;
private readonly _options: StreamingPlanOptions;

constructor(
stream: AsyncIterable<string | StreamChunk | StreamEvent>,
options: StreamingPlanOptions = {}
) {
this._stream = stream;
this._options = options;
}

get stream(): AsyncIterable<string | StreamChunk | StreamEvent> {
return this._stream;
}

get options(): StreamingPlanOptions {
return this._options;
}

getFallbackText(): string {
return "";
}

getPostData(): StreamingPlanData {
return {
stream: this._stream,
options: this._options,
};
}

isSupported(_adapter: Adapter): boolean {
return true;
}

onPosted(_context: PostableObjectContext): void {
// Streams are one-shot, no lifecycle binding needed
}
}
146 changes: 146 additions & 0 deletions packages/chat/src/thread.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
mockLogger,
} from "./mock-adapter";
import { Plan } from "./plan";
import { StreamingPlan } from "./streaming-plan";
import { ThreadImpl } from "./thread";
import type { Adapter, Message, ScheduledMessage, StreamChunk } from "./types";
import { NotImplementedError } from "./types";
Expand Down Expand Up @@ -638,6 +639,151 @@ describe("ThreadImpl", () => {
})
);
});

it("should pass StreamingPlan PostableObject options to adapter.stream", async () => {
const mockStream = vi.fn().mockResolvedValue({
id: "msg-stream",
threadId: "t1",
raw: "Hello",
});
mockAdapter.stream = mockStream;

const textStream = createTextStream(["Hello"]);
const streamMsg = new StreamingPlan(textStream, {
groupTasks: "plan",
endWith: [{ type: "actions" }],
updateIntervalMs: 1000,
});
await thread.post(streamMsg);

expect(mockStream).toHaveBeenCalledWith(
"slack:C123:1234.5678",
expect.any(Object),
expect.objectContaining({
taskDisplayMode: "plan",
stopBlocks: [{ type: "actions" }],
updateIntervalMs: 1000,
})
);
});

it("should pass StreamingPlan with only groupTasks", async () => {
const mockStream = vi.fn().mockResolvedValue({
id: "msg-stream",
threadId: "t1",
raw: "Hello",
});
mockAdapter.stream = mockStream;

const textStream = createTextStream(["Hello"]);
await thread.post(
new StreamingPlan(textStream, { groupTasks: "timeline" })
);

expect(mockStream).toHaveBeenCalledWith(
"slack:C123:1234.5678",
expect.any(Object),
expect.objectContaining({
taskDisplayMode: "timeline",
})
);
const options = mockStream.mock.calls[0][2];
expect(options.stopBlocks).toBeUndefined();
});

it("should pass StreamingPlan with only endWith", async () => {
const mockStream = vi.fn().mockResolvedValue({
id: "msg-stream",
threadId: "t1",
raw: "Hello",
});
mockAdapter.stream = mockStream;

const textStream = createTextStream(["Hello"]);
await thread.post(
new StreamingPlan(textStream, { endWith: [{ type: "actions" }] })
);

expect(mockStream).toHaveBeenCalledWith(
"slack:C123:1234.5678",
expect.any(Object),
expect.objectContaining({
stopBlocks: [{ type: "actions" }],
})
);
const options = mockStream.mock.calls[0][2];
expect(options.taskDisplayMode).toBeUndefined();
});

it("should pass StreamingPlan with only updateIntervalMs", async () => {
const mockStream = vi.fn().mockResolvedValue({
id: "msg-stream",
threadId: "t1",
raw: "Hello",
});
mockAdapter.stream = mockStream;

const textStream = createTextStream(["Hello"]);
await thread.post(
new StreamingPlan(textStream, { updateIntervalMs: 2000 })
);

expect(mockStream).toHaveBeenCalledWith(
"slack:C123:1234.5678",
expect.any(Object),
expect.objectContaining({
updateIntervalMs: 2000,
})
);
const options = mockStream.mock.calls[0][2];
expect(options.taskDisplayMode).toBeUndefined();
expect(options.stopBlocks).toBeUndefined();
});

it("should route StreamingPlan through fallback when adapter has no native streaming", async () => {
mockAdapter.stream = undefined;

const textStream = createTextStream(["Hello", " ", "World"]);
await thread.post(
new StreamingPlan(textStream, {
groupTasks: "plan",
endWith: [{ type: "actions" }],
updateIntervalMs: 2000,
})
);

// Should post initial placeholder and edit with final content
expect(mockAdapter.postMessage).toHaveBeenCalledWith(
"slack:C123:1234.5678",
"..."
);
expect(mockAdapter.editMessage).toHaveBeenLastCalledWith(
"slack:C123:1234.5678",
"msg-1",
{ markdown: "Hello World" }
);
});

it("should still work without options (backward compat)", async () => {
const mockStream = vi.fn().mockResolvedValue({
id: "msg-stream",
threadId: "t1",
raw: "Hello",
});
mockAdapter.stream = mockStream;

const textStream = createTextStream(["Hello"]);
await thread.post(textStream);

expect(mockStream).toHaveBeenCalledWith(
"slack:C123:1234.5678",
expect.any(Object),
expect.any(Object)
);
const options = mockStream.mock.calls[0][2];
expect(options.taskDisplayMode).toBeUndefined();
expect(options.stopBlocks).toBeUndefined();
});
});

describe("fallback streaming error logging", () => {
Expand Down
29 changes: 26 additions & 3 deletions packages/chat/src/thread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,28 @@ export class ThreadImpl<TState = Record<string, unknown>>
message: string | PostableMessage | ChatElement
): Promise<SentMessage | PostableObject> {
if (isPostableObject(message)) {
// StreamingPlan PostableObject - route to streaming with options
if (message.kind === "stream") {
const data = message.getPostData() as {
stream: AsyncIterable<string | StreamChunk | StreamEvent>;
options: {
groupTasks?: "plan" | "timeline";
endWith?: unknown[];
updateIntervalMs?: number;
};
};
const streamOptions: StreamOptions = {
...(data.options.updateIntervalMs
? { updateIntervalMs: data.options.updateIntervalMs }
: {}),
...(data.options.groupTasks
? { taskDisplayMode: data.options.groupTasks }
: {}),
...(data.options.endWith ? { stopBlocks: data.options.endWith } : {}),
};
await this.handleStream(data.stream, streamOptions);
return message;
}
await this.handlePostableObject(message);
return message;
}
Expand Down Expand Up @@ -516,12 +538,13 @@ export class ThreadImpl<TState = Record<string, unknown>>
* then uses adapter's native streaming if available, otherwise falls back to post+edit.
*/
private async handleStream(
rawStream: AsyncIterable<string | StreamChunk | StreamEvent>
rawStream: AsyncIterable<string | StreamChunk | StreamEvent>,
callerOptions?: StreamOptions
): Promise<SentMessage> {
// Normalize: handles plain strings, AI SDK fullStream events, and StreamChunk objects
const textStream = fromFullStream(rawStream);
// Build streaming options from current message context
const options: StreamOptions = {};
// Build streaming options from current message context + caller options
const options: StreamOptions = { ...callerOptions };
if (this._currentMessage) {
options.recipientUserId = this._currentMessage.author.userId;
// Extract teamId from raw Slack payload
Expand Down
7 changes: 7 additions & 0 deletions packages/chat/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,13 @@ export interface Thread<TState = Record<string, unknown>, TRawMessage = unknown>
* const result = await agent.stream({ prompt: message.text });
* await thread.post(result.textStream);
*
* // Stream with options via StreamingPlan PostableObject
* const stream = new StreamingPlan(result.fullStream, {
* groupTasks: "plan",
* endWith: [feedbackBlocks],
* });
* await thread.post(stream);
*
* // Plan with live updates
* const plan = new Plan({ initialMessage: "Working..." });
* await thread.post(plan);
Expand Down
Loading