Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions examples/sdk-core/realtime/connection-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ async function main() {
case "connected":
console.log("Connected! Streaming active.");
break;
case "generating":
console.log("Generation started! Frames incoming.");
break;
case "reconnecting":
console.log("Connection lost, reconnecting...");
break;
case "disconnected":
console.log("Disconnected from server.");
break;
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export type {
RealTimeClientInitialState,
} from "./realtime/client";
export type { SetInput } from "./realtime/methods";
export type { ConnectionState } from "./realtime/webrtc-connection";
export type { ConnectionState } from "./realtime/types";
export {
type ImageModelDefinition,
type ImageModels,
Expand Down
34 changes: 30 additions & 4 deletions packages/sdk/src/realtime/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { modelStateSchema } from "../shared/types";
import { createWebrtcError, type DecartSDKError } from "../utils/errors";
import { AudioStreamManager } from "./audio-stream-manager";
import { realtimeMethods, type SetInput } from "./methods";
import type { ConnectionState } from "./types";
import { WebRTCManager } from "./webrtc-manager";

async function blobToBase64(blob: Blob): Promise<string> {
Expand Down Expand Up @@ -90,15 +91,15 @@ const realTimeClientConnectOptionsSchema = z.object({
export type RealTimeClientConnectOptions = z.infer<typeof realTimeClientConnectOptionsSchema>;

export type Events = {
connectionChange: "connected" | "connecting" | "disconnected" | "reconnecting";
connectionChange: ConnectionState;
error: DecartSDKError;
};

export type RealTimeClient = {
set: (input: SetInput) => Promise<void>;
setPrompt: (prompt: string, { enhance }?: { enhance?: boolean }) => Promise<void>;
isConnected: () => boolean;
getConnectionState: () => "connected" | "connecting" | "disconnected" | "reconnecting";
getConnectionState: () => ConnectionState;
disconnect: () => void;
on: <K extends keyof Events>(event: K, listener: (data: Events[K]) => void) => void;
off: <K extends keyof Events>(event: K, listener: (data: Events[K]) => void) => void;
Expand Down Expand Up @@ -167,16 +168,38 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => {
: undefined;

const url = `${baseUrl}${options.model.urlPath}`;

const eventBuffer: Array<{ event: keyof Events; data: Events[keyof Events] }> = [];
let buffering = true;

const emitOrBuffer = <K extends keyof Events>(event: K, data: Events[K]) => {
if (buffering) {
eventBuffer.push({ event, data: data as Events[keyof Events] });
} else {
eventEmitter.emit(event, data);
}
};

const flushBufferedEvents = () => {
setTimeout(() => {
buffering = false;
for (const { event, data } of eventBuffer) {
(eventEmitter.emit as (type: keyof Events, data: Events[keyof Events]) => void)(event, data);
}
eventBuffer.length = 0;
}, 0);
};

webrtcManager = new WebRTCManager({
webrtcUrl: `${url}?api_key=${encodeURIComponent(apiKey)}&model=${encodeURIComponent(options.model.name)}`,
integration,
onRemoteStream,
onConnectionStateChange: (state) => {
eventEmitter.emit("connectionChange", state);
emitOrBuffer("connectionChange", state);
},
onError: (error) => {
console.error("WebRTC error:", error);
eventEmitter.emit("error", createWebrtcError(error));
emitOrBuffer("error", createWebrtcError(error));
},
customizeOffer: options.customizeOffer as ((offer: RTCSessionDescriptionInit) => Promise<void>) | undefined,
vp8MinBitrate: 300,
Expand All @@ -203,6 +226,8 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => {
isConnected: () => manager.isConnected(),
getConnectionState: () => manager.getConnectionState(),
disconnect: () => {
buffering = false;
eventBuffer.length = 0;
manager.cleanup();
audioStreamManager?.cleanup();
},
Expand All @@ -227,6 +252,7 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => {
client.playAudio = (audio: Blob | File | ArrayBuffer) => manager.playAudio(audio);
}

flushBufferedEvents();
return client;
} catch (error) {
webrtcManager?.cleanup();
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/src/realtime/methods.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export const realtimeMethods = (
) => {
const assertConnected = () => {
const state = webrtcManager.getConnectionState();
if (state !== "connected") {
if (state !== "connected" && state !== "generating") {
throw new Error(`Cannot send message: connection is ${state}`);
}
};
Expand Down
9 changes: 8 additions & 1 deletion packages/sdk/src/realtime/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ export type SetImageAckMessage = {
error: null | string;
};

export type GenerationStartedMessage = {
type: "generation_started";
};

export type ConnectionState = "connecting" | "connected" | "generating" | "disconnected" | "reconnecting";

// Incoming message types (from server)
export type IncomingWebRTCMessage =
| ReadyMessage
Expand All @@ -69,7 +75,8 @@ export type IncomingWebRTCMessage =
| IceRestartMessage
| PromptAckMessage
| ErrorMessage
| SetImageAckMessage;
| SetImageAckMessage
| GenerationStartedMessage;

// Outgoing message types (to server)
export type OutgoingWebRTCMessage =
Expand Down
18 changes: 12 additions & 6 deletions packages/sdk/src/realtime/webrtc-connection.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import mitt from "mitt";
import { buildUserAgent } from "../utils/user-agent";
import type {
ConnectionState,
IncomingWebRTCMessage,
OutgoingWebRTCMessage,
PromptAckMessage,
Expand All @@ -23,8 +24,6 @@ interface ConnectionCallbacks {
initialPrompt?: { text: string; enhance?: boolean };
}

export type ConnectionState = "connecting" | "connected" | "disconnected" | "reconnecting";

type WsMessageEvents = {
promptAck: PromptAckMessage;
setImageAck: SetImageAckMessage;
Expand Down Expand Up @@ -107,7 +106,7 @@ export class WebRTCConnection {
await Promise.race([
new Promise<void>((resolve, reject) => {
const checkConnection = setInterval(() => {
if (this.state === "connected") {
if (this.state === "connected" || this.state === "generating") {
clearInterval(checkConnection);
resolve();
} else if (this.state === "disconnected") {
Expand Down Expand Up @@ -151,6 +150,11 @@ export class WebRTCConnection {
return;
}

if (msg.type === "generation_started") {
this.setState("generating");
return;
}

// All other messages require peer connection
if (!this.pc) return;

Expand Down Expand Up @@ -340,9 +344,11 @@ export class WebRTCConnection {
this.pc.onconnectionstatechange = () => {
if (!this.pc) return;
const s = this.pc.connectionState;
this.setState(
s === "connected" ? "connected" : ["connecting", "new"].includes(s) ? "connecting" : "disconnected",
);
const nextState =
s === "connected" ? "connected" : ["connecting", "new"].includes(s) ? "connecting" : "disconnected";
// Keep "generating" sticky unless the connection is actually lost.
if (this.state === "generating" && nextState !== "disconnected") return;
this.setState(nextState);
Comment on lines +347 to +351
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe some better if else for more readbility

};

this.pc.oniceconnectionstatechange = () => {
Expand Down
14 changes: 6 additions & 8 deletions packages/sdk/src/realtime/webrtc-manager.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pRetry, { AbortError } from "p-retry";
import type { OutgoingMessage } from "./types";
import { type ConnectionState, WebRTCConnection } from "./webrtc-connection";
import type { ConnectionState, OutgoingMessage } from "./types";
import { WebRTCConnection } from "./webrtc-connection";

export interface WebRTCConfig {
webrtcUrl: string;
Expand Down Expand Up @@ -62,7 +62,7 @@ export class WebRTCManager {
private emitState(state: ConnectionState): void {
if (this.managerState !== state) {
this.managerState = state;
if (state === "connected") this.hasConnected = true;
if (state === "connected" || state === "generating") this.hasConnected = true;
this.config.onConnectionStateChange?.(state);
}
}
Expand All @@ -75,12 +75,10 @@ export class WebRTCManager {

// During reconnection, intercept state changes from the connection layer
if (this.isReconnecting) {
if (state === "connected") {
// Reconnection succeeded
if (state === "connected" || state === "generating") {
this.isReconnecting = false;
this.emitState("connected");
this.emitState(state);
}
// Swallow other states during reconnection (connecting, disconnected)
return;
}

Expand Down Expand Up @@ -197,7 +195,7 @@ export class WebRTCManager {
}

isConnected(): boolean {
return this.managerState === "connected";
return this.managerState === "connected" || this.managerState === "generating";
}

getConnectionState(): ConnectionState {
Expand Down
Loading
Loading