Skip to content

Commit 693d98e

Browse files
committed
feat: support InputStream
1 parent d58aadf commit 693d98e

3 files changed

Lines changed: 455 additions & 57 deletions

File tree

core/src/runner/runner.ts

Lines changed: 205 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ export class Runner {
138138
*
139139
* @param params.userId The user ID of the session.
140140
* @param params.sessionId The session ID of the session.
141-
* @param params.newMessage A new message to append to the session.
141+
* @param params.newMessage An optional new message to append to the session.
142142
* @param params.stateDelta An optional state delta to apply to the session.
143143
* @param params.runConfig The run config for the agent.
144144
* @yields The events generated by the agent.
@@ -147,7 +147,7 @@ export class Runner {
147147
async *runAsync(params: {
148148
userId: string;
149149
sessionId: string;
150-
newMessage: Content;
150+
newMessage?: Content;
151151
stateDelta?: Record<string, unknown>;
152152
runConfig?: RunConfig;
153153
}): AsyncGenerator<Event, void, undefined> {
@@ -208,22 +208,22 @@ export class Runner {
208208
pluginManager: this.pluginManager,
209209
});
210210

211-
// =========================================================================
212-
// Preprocess plugins on user message
213-
// =========================================================================
214-
const pluginUserMessage =
215-
await this.pluginManager.runOnUserMessageCallback({
216-
userMessage: newMessage,
217-
invocationContext,
218-
});
219-
if (pluginUserMessage) {
220-
newMessage = pluginUserMessage as Content;
221-
}
222-
223211
// =========================================================================
224212
// Append user message to session
225213
// =========================================================================
226214
if (newMessage) {
215+
// =========================================================================
216+
// Preprocess plugins on user message
217+
// =========================================================================
218+
const pluginUserMessage =
219+
await this.pluginManager.runOnUserMessageCallback({
220+
userMessage: newMessage,
221+
invocationContext,
222+
});
223+
if (pluginUserMessage) {
224+
newMessage = pluginUserMessage as Content;
225+
}
226+
227227
if (!newMessage.parts?.length) {
228228
throw new Error('No parts in the newMessage.');
229229
}
@@ -264,52 +264,48 @@ export class Runner {
264264
// =========================================================================
265265
// Run the agent with the plugins (aka hooks to apply in the lifecycle)
266266
// =========================================================================
267-
if (newMessage) {
268-
// =========================================================================
269-
// Run the agent with the plugins (aka hooks to apply in the lifecycle)
270-
// =========================================================================
271-
// Step 1: Run the before_run callbacks to see if we should early exit.
272-
const beforeRunCallbackResponse =
273-
await this.pluginManager.runBeforeRunCallback({
274-
invocationContext,
275-
});
267+
// Step 1: Run the before_run callbacks to see if we should early exit.
268+
const beforeRunCallbackResponse =
269+
await this.pluginManager.runBeforeRunCallback({
270+
invocationContext,
271+
});
276272

277-
if (beforeRunCallbackResponse) {
278-
const earlyExitEvent = createEvent({
279-
invocationId: invocationContext.invocationId,
280-
author: 'model',
281-
content: beforeRunCallbackResponse,
282-
});
283-
// TODO: b/447446338 - In the future, do *not* save live call audio
284-
// content to session This is a feature in Python ADK
285-
await this.sessionService.appendEvent({
286-
session,
287-
event: earlyExitEvent,
288-
});
289-
yield earlyExitEvent;
290-
} else {
291-
// Step 2: Otherwise continue with normal execution
292-
for await (const event of invocationContext.agent.runAsync(
293-
invocationContext,
294-
)) {
295-
if (!event.partial) {
296-
await this.sessionService.appendEvent({session, event});
297-
}
298-
// Step 3: Run the on_event callbacks to optionally modify the event.
299-
const modifiedEvent =
300-
await this.pluginManager.runOnEventCallback({
301-
invocationContext,
302-
event,
303-
});
304-
if (modifiedEvent) {
305-
yield modifiedEvent;
306-
} else {
307-
yield event;
308-
}
273+
if (beforeRunCallbackResponse) {
274+
const earlyExitEvent = createEvent({
275+
invocationId: invocationContext.invocationId,
276+
author: 'model',
277+
content: beforeRunCallbackResponse,
278+
});
279+
// TODO: b/447446338 - In the future, do *not* save live call audio
280+
// content to session This is a feature in Python ADK
281+
await this.sessionService.appendEvent({
282+
session,
283+
event: earlyExitEvent,
284+
});
285+
yield earlyExitEvent;
286+
} else {
287+
// Step 2: Otherwise continue with normal execution
288+
for await (const event of invocationContext.agent.runAsync(
289+
invocationContext,
290+
)) {
291+
if (!event.partial) {
292+
await this.sessionService.appendEvent({session, event});
293+
}
294+
// Step 3: Run the on_event callbacks to optionally modify the event.
295+
const modifiedEvent = await this.pluginManager.runOnEventCallback(
296+
{
297+
invocationContext,
298+
event,
299+
},
300+
);
301+
if (modifiedEvent) {
302+
yield modifiedEvent;
303+
} else {
304+
yield event;
309305
}
310-
// Step 4: Run the after_run callbacks to optionally modify the context.
311-
await this.pluginManager.runAfterRunCallback({invocationContext});
312306
}
307+
// Step 4: Run the after_run callbacks to optionally modify the context.
308+
await this.pluginManager.runAfterRunCallback({invocationContext});
313309
}
314310
},
315311
);
@@ -435,9 +431,161 @@ export class Runner {
435431
}
436432
return true;
437433
}
434+
435+
/**
436+
* Runs the agent continuously with a stream of input messages.
437+
* This provides a true concurrent multiplexing model, handling live user
438+
* messages safely during active execution block intervals.
439+
*
440+
* @param params.userId The user ID of the session.
441+
* @param params.sessionId The session ID of the session.
442+
* @param params.inputStream The stream of incoming user content.
443+
* @param params.stateDelta An optional state delta.
444+
* @param params.runConfig The run config for the agent.
445+
* @yields The events generated by the agent and echoed from the user.
446+
*/
447+
async *runStream(params: {
448+
userId: string;
449+
sessionId: string;
450+
inputStream: AsyncIterable<Content>;
451+
stateDelta?: Record<string, unknown>;
452+
runConfig?: RunConfig;
453+
}): AsyncGenerator<Event, void, undefined> {
454+
const outputQueue = new AsyncEventQueue<Event>();
455+
let activeAgentGenerators = 0;
456+
let pendingRun = false;
457+
458+
// Helper to start the agent execution generator
459+
const runAgent = async () => {
460+
activeAgentGenerators++;
461+
try {
462+
do {
463+
pendingRun = false;
464+
const generator = this.runAsync({
465+
userId: params.userId,
466+
sessionId: params.sessionId,
467+
stateDelta: params.stateDelta,
468+
runConfig: params.runConfig,
469+
});
470+
for await (const event of generator) {
471+
outputQueue.push(event);
472+
}
473+
} while (pendingRun);
474+
} catch (e) {
475+
outputQueue.close(e as Error);
476+
} finally {
477+
activeAgentGenerators--;
478+
}
479+
};
480+
481+
// Helper to consume incoming user stream
482+
const consumeInput = async () => {
483+
try {
484+
const session = await this.sessionService.getSession({
485+
appName: this.appName,
486+
userId: params.userId,
487+
sessionId: params.sessionId,
488+
});
489+
490+
if (!session) {
491+
throw new Error(`Session not found: ${params.sessionId}`);
492+
}
493+
494+
for await (const newMessage of params.inputStream) {
495+
if (!newMessage || !newMessage.parts?.length) continue;
496+
497+
// 1. Immediately inject the event into the session events.
498+
// This allows mid-generation concurrency reactivity on the next step.
499+
const userEvent = createEvent({
500+
invocationId: newInvocationContextId(),
501+
author: 'user',
502+
content: newMessage,
503+
});
504+
505+
if (params.runConfig?.saveInputBlobsAsArtifacts) {
506+
await this.saveArtifacts(
507+
userEvent.invocationId,
508+
params.userId,
509+
params.sessionId,
510+
newMessage,
511+
);
512+
}
513+
514+
await this.sessionService.appendEvent({
515+
session,
516+
event: userEvent,
517+
});
518+
519+
outputQueue.push(userEvent); // Echo it out as an event
520+
521+
// 2. Trigger the agent loop if no active generators are handling the session.
522+
if (activeAgentGenerators === 0) {
523+
runAgent();
524+
} else {
525+
// Let the active generator know it needs to run again if it exits soon.
526+
pendingRun = true;
527+
}
528+
}
529+
} catch (e) {
530+
outputQueue.close(e as Error);
531+
} finally {
532+
outputQueue.close();
533+
}
534+
};
535+
536+
// Start consuming
537+
consumeInput();
538+
539+
// Stream multiplexed output
540+
yield* outputQueue;
541+
}
438542
// TODO - b/425992518: Implement runLive and related methods.
439543
}
440544

545+
class AsyncEventQueue<T> {
546+
private queue: T[] = [];
547+
private resolves: ((val: IteratorResult<T>) => void)[] = [];
548+
private isClosed = false;
549+
private error?: Error;
550+
551+
push(value: T) {
552+
if (this.resolves.length > 0) {
553+
this.resolves.shift()!({value, done: false});
554+
} else {
555+
this.queue.push(value);
556+
}
557+
}
558+
559+
close(error?: Error) {
560+
this.isClosed = true;
561+
this.error = error;
562+
while (this.resolves.length > 0) {
563+
if (error) {
564+
this.resolves.shift()!(Promise.reject(error));
565+
} else {
566+
this.resolves.shift()!({value: undefined, done: true});
567+
}
568+
}
569+
}
570+
571+
async *[Symbol.asyncIterator](): AsyncGenerator<T, void, undefined> {
572+
while (true) {
573+
if (this.queue.length > 0) {
574+
yield this.queue.shift()!;
575+
} else if (this.isClosed) {
576+
if (this.error) throw this.error;
577+
return;
578+
} else {
579+
const result = await new Promise<IteratorResult<T>>((resolve) => {
580+
this.resolves.push(resolve);
581+
});
582+
if (result.done) return;
583+
yield result.value;
584+
}
585+
}
586+
}
587+
}
588+
441589
/**
442590
* It iterates through the events in reverse order, and returns the event
443591
* containing a function call with a functionCall.id matching the

core/test/runner/streaming_runner_test.ts

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,82 @@ describe('Runner Streaming and Ephemeral', () => {
137137
});
138138
});
139139

140+
describe('runStream', () => {
141+
it('should multiplex input stream and agent events', async () => {
142+
let resolveInput: (val: any) => void;
143+
let inputPromise = new Promise((resolve) => {
144+
resolveInput = resolve;
145+
});
146+
147+
const inputStream = {
148+
loopActive: true,
149+
push: (val: string) => {
150+
resolveInput(val);
151+
},
152+
close: () => {
153+
resolveInput(null);
154+
},
155+
async *[Symbol.asyncIterator]() {
156+
while (this.loopActive) {
157+
const val = await inputPromise;
158+
if (val === null) break; // End of stream
159+
yield {role: 'user', parts: [{text: val}]};
160+
inputPromise = new Promise((resolve) => {
161+
resolveInput = resolve;
162+
});
163+
}
164+
},
165+
};
166+
167+
const session = await sessionService.createSession({
168+
appName: TEST_APP_ID,
169+
userId: TEST_USER_ID,
170+
});
171+
172+
const generator = runner.runStream({
173+
userId: TEST_USER_ID,
174+
sessionId: session.id,
175+
inputStream: inputStream as any,
176+
});
177+
178+
const events: Event[] = [];
179+
const consumeEvents = async () => {
180+
for await (const event of generator) {
181+
events.push(event);
182+
}
183+
};
184+
185+
// Start consuming events
186+
const consumePromise = consumeEvents();
187+
188+
// Push first message
189+
inputStream.push('Hello 1');
190+
191+
// Give it a moment to process first message
192+
await new Promise((r) => setTimeout(r, 50));
193+
194+
// Push second message
195+
inputStream.push('Hello 2');
196+
197+
// Close stream
198+
await new Promise((r) => setTimeout(r, 50));
199+
inputStream.close();
200+
201+
await consumePromise;
202+
203+
// We should see user events and model events mixed in the output
204+
expect(events.length).toBeGreaterThan(0);
205+
const userEvents = events.filter(
206+
(e) =>
207+
e.author === 'user' && e.content?.parts?.[0]?.text?.includes('Hello'),
208+
);
209+
expect(userEvents.length).toBe(2);
210+
211+
const modelEvents = events.filter((e) => e.author === rootAgent.name);
212+
expect(modelEvents.length).toBeGreaterThan(0);
213+
});
214+
});
215+
140216
describe('toStructuredEvents', () => {
141217
it('should convert error events', () => {
142218
const event = createEvent({

0 commit comments

Comments
 (0)