Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions demo/app.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { ChatMode, ConnectionState } from '@sdk/types';
import { useEffect, useRef, useState } from 'preact/hooks';

import './app.css';
import { agentId, clientKey, didApiUrl, didSocketApiUrl } from './environment';
import { agentId, clientKey, debug, didApiUrl, didSocketApiUrl } from './environment';
import { useAgentManager } from './hooks/useAgentManager';

export function App() {
Expand All @@ -19,6 +19,7 @@ export function App() {

const { srcObject, connectionState, messages, isSpeaking, connect, disconnect, speak, chat, interrupt } =
useAgentManager({
debug,
agentId,
baseURL: didApiUrl,
wsURL: didSocketApiUrl,
Expand Down Expand Up @@ -64,10 +65,10 @@ export function App() {
{connectionState === ConnectionState.Connected
? 'Send'
: connectionState === ConnectionState.Connecting
? 'Connecting...'
: connectionState === ConnectionState.Fail
? 'Failed, Try Again'
: 'Connect'}
? 'Connecting...'
: connectionState === ConnectionState.Fail
? 'Failed, Try Again'
: 'Connect'}
</button>

<button
Expand Down
1 change: 1 addition & 0 deletions demo/environment.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export const nodeEnv = import.meta.env.VITE_NODE_ENV as string;
export const debug = (import.meta.env.VITE_DEBUG as boolean) || false;
export const didApiUrl = import.meta.env.VITE_DID_API_URL as string;
export const didSocketApiUrl = import.meta.env.VITE_WS_ENDPOINT as string;
export const mixpanelKey = import.meta.env.VITE_MIXPANEL_KEY as string;
Expand Down
3 changes: 3 additions & 0 deletions demo/hooks/useAgentManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ interface UseAgentManagerOptions {
externalId?: string;
mixpanelKey?: string;
mixpanelAdditionalProperties?: Record<string, any>;
debug?: boolean;
}

export function useAgentManager(props: UseAgentManagerOptions) {
Expand All @@ -41,6 +42,7 @@ export function useAgentManager(props: UseAgentManagerOptions) {
streamOptions,
mixpanelKey,
mixpanelAdditionalProperties,
debug,
} = props;

const [isSpeaking, setIsSpeaking] = useState(false);
Expand Down Expand Up @@ -104,6 +106,7 @@ export function useAgentManager(props: UseAgentManagerOptions) {
mixpanelKey,
mixpanelAdditionalProperties,
streamOptions,
debug,
});

await newManager.connect();
Expand Down
17 changes: 16 additions & 1 deletion src/services/agent-manager/connect-to-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
AgentsAPI,
Chat,
ChatMode,
ChatProgressCallback,
ConnectionState,
CreateStreamOptions,
CreateStreamV2Options,
Expand Down Expand Up @@ -158,6 +159,8 @@ function trackLegacyVideoAnalytics(
type ConnectToManagerOptions = AgentManagerOptions & {
callbacks: AgentManagerOptions['callbacks'] & {
onVideoIdChange?: (videoId: string | null) => void;
/** Internal callback for livekit-manager data channel events */
onMessage?: ChatProgressCallback;
};
chatId?: string;
};
Expand All @@ -172,6 +175,8 @@ function connectToManager(
return new Promise(async (resolve, reject) => {
try {
let streamingManager: StreamingManager<CreateStreamOptions | CreateStreamV2Options>;
let shouldResolveOnComplete = false;

streamingManager = await createStreamingManager(agent, getAgentStreamOptions(agent, options), {
...options,
analytics,
Expand All @@ -181,7 +186,13 @@ function connectToManager(
options.callbacks.onConnectionStateChange?.(state);

if (state === ConnectionState.Connected) {
resolve(streamingManager);
// If manager is ready, resolve immediately
// Otherwise, mark to resolve after manager is created
if (streamingManager) {
resolve(streamingManager);
} else {
shouldResolveOnComplete = true;
}
}
},
onVideoStateChange: (state: StreamingState, statsReport?: any) => {
Expand Down Expand Up @@ -213,6 +224,10 @@ function connectToManager(
},
},
});

if (shouldResolveOnComplete) {
resolve(streamingManager);
}
} catch (error) {
reject(error);
}
Expand Down
82 changes: 46 additions & 36 deletions src/services/agent-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
AgentManagerOptions,
Chat,
ChatMode,
ChatResponse,
ConnectionState,
CreateStreamOptions,
Interrupt,
Expand All @@ -15,6 +16,7 @@ import { CONNECTION_RETRY_TIMEOUT_MS } from '@sdk/config/consts';
import { didApiUrl, didSocketApiUrl, mixpanelKey } from '@sdk/config/environment';
import { ChatCreationFailed, ValidationError } from '@sdk/errors';
import { getRandom } from '@sdk/utils';
import { isStreamsV2Agent } from '@sdk/utils/agent';
import { isChatModeWithoutChat, isTextualChat } from '@sdk/utils/chat';
import { createAgentsApi } from '../../api/agents';
import { getAgentInfo, getAnalyticsInfo } from '../../utils/analytics';
Expand Down Expand Up @@ -72,6 +74,7 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt
const agentsApi = createAgentsApi(options.auth, baseURL, options.callbacks.onError, options.externalId);

const agentEntity = await agentsApi.getById(agent);
const isStreamsV2 = isStreamsV2Agent(agentEntity.presenter.type);
analytics.enrich(getAgentInfo(agentEntity));

const { onMessage, clearQueue } = createMessageEventQueue(analytics, items, options, agentEntity, () =>
Expand Down Expand Up @@ -100,7 +103,7 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt
}

const websocketPromise =
options.mode === ChatMode.DirectPlayback
options.mode === ChatMode.DirectPlayback || isStreamsV2
? Promise.resolve(undefined)
: createSocketManager(
options.auth,
Expand All @@ -113,7 +116,10 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt
() => {
return initializeStreamAndChat(
agentEntity,
{ ...options, callbacks: { ...options.callbacks, onVideoIdChange: updateVideoId } },
{
...options,
callbacks: { ...options.callbacks, onVideoIdChange: updateVideoId, onMessage },
},
agentsApi,
analytics,
items.chat
Expand Down Expand Up @@ -253,41 +259,45 @@ export async function createAgentManager(agent: string, options: AgentManagerOpt
};

const sendChatRequest = async (messages: Message[], chatId: string) => {
return retryOperation(
() => {
return agentsApi.chat(
agentEntity.id,
chatId,
{
chatMode: items.chatMode,
streamId: items.streamingManager?.streamId,
sessionId: items.streamingManager?.sessionId,
messages: messages.map(({ matches, ...message }) => message),
},
{
...getRequestHeaders(items.chatMode),
skipErrorHandler: true,
}
);
const chatRequestFn = isStreamsV2
? async () => {
await items.streamingManager?.sendTextMessage?.(userMessage);
return Promise.resolve({} as ChatResponse);
}
: async () => {
return agentsApi.chat(
agentEntity.id,
chatId,
{
chatMode: items.chatMode,
streamId: items.streamingManager?.streamId,
sessionId: items.streamingManager?.sessionId,
messages: messages.map(({ matches, ...message }) => message),
},
{
...getRequestHeaders(items.chatMode),
skipErrorHandler: true,
}
);
};

return retryOperation(chatRequestFn, {
limit: 2,
shouldRetryFn: error => {
const isInvalidSessionId = error?.message?.includes('missing or invalid session_id');
const isStreamError = error?.message?.includes('Stream Error');

if (!isStreamError && !isInvalidSessionId) {
options.callbacks.onError?.(error);
return false;
}
return true;
},
{
limit: 2,
shouldRetryFn: error => {
const isInvalidSessionId = error?.message?.includes('missing or invalid session_id');
const isStreamError = error?.message?.includes('Stream Error');

if (!isStreamError && !isInvalidSessionId) {
options.callbacks.onError?.(error);
return false;
}
return true;
},
onRetry: async () => {
await disconnect();
await connect(false);
},
}
);
onRetry: async () => {
await disconnect();
await connect(false);
},
});
};

try {
Expand Down
6 changes: 2 additions & 4 deletions src/services/streaming-manager/advanced.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,7 @@ describe('Streaming Manager Advanced', () => {
expect.anything(),
expect.anything(),
expect.anything(),
expect.any(Function),
false
expect.any(Function)
);

const connectivityCallback = (pollStats as jest.Mock).mock.calls[0][4];
Expand All @@ -422,8 +421,7 @@ describe('Streaming Manager Advanced', () => {
expect.anything(),
expect.anything(),
expect.anything(),
expect.anything(),
false
expect.anything()
);

expect(manager.streamId).toBe('streamId');
Expand Down
46 changes: 0 additions & 46 deletions src/services/streaming-manager/business-flows.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import { StreamApiFactory, StreamingAgentFactory, StreamingManagerOptionsFactory } from '../../test-utils/factories';
import { AgentActivityState, CreateStreamOptions, StreamType, StreamingManagerOptions } from '../../types/index';
import { pollStats } from './stats/poll';
import { createWebRTCStreamingManager as createStreamingManager } from './webrtc-manager';

// Mock createStreamApi
Expand Down Expand Up @@ -146,51 +145,6 @@ describe('Streaming Manager Business Flows', () => {
});
});

describe('Warmup Mode Flows', () => {
it('should handle warmup=true with legacy stream', async () => {
const warmupAgent = { ...agent, stream_warmup: true };
const manager = await createStreamingManager(agentId, warmupAgent, options);

// Warmup should be enabled for legacy streams
expect(pollStats).toHaveBeenCalledWith(
expect.anything(),
expect.anything(),
expect.anything(),
expect.anything(),
expect.anything(),
true // warmup = true
);

expect(manager.streamId).toBe('streamId');
});

it('should handle warmup=true with fluent stream (should disable warmup)', async () => {
mockApi.createStream.mockResolvedValueOnce({
id: 'streamId',
offer: { type: 'offer', sdp: 'sdp' },
ice_servers: [],
session_id: 'sessionId',
fluent: true,
interrupt_enabled: false,
});

const warmupAgent = { ...agent, stream_warmup: true };
const manager = await createStreamingManager(agentId, warmupAgent, options);

// Warmup should be disabled for fluent streams
expect(pollStats).toHaveBeenCalledWith(
expect.anything(),
expect.anything(),
expect.anything(),
expect.anything(),
expect.anything(),
false // warmup = false for fluent
);

expect(manager.streamType).toBe(StreamType.Fluent);
});
});

describe('Error Handling Flows', () => {
it('should handle connection failure', async () => {
mockApi.createStream.mockRejectedValueOnce(new Error('Connection failed'));
Expand Down
7 changes: 7 additions & 0 deletions src/services/streaming-manager/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ export type StreamingManager<T extends CreateStreamOptions | CreateStreamV2Optio
*/
sendDataChannelMessage(payload: string): void;

/**
* Method to send text messages to the server
* @param payload The message payload to send
* supported only for livekit manager
*/
sendTextMessage?(payload: string): Promise<void>;

/**
* Session identifier information, should be returned in the body of all streaming requests
*/
Expand Down
Loading