From a796d21f06307419f352da8b9943f6745ff4084f Mon Sep 17 00:00:00 2001 From: Robert Craigie Date: Thu, 16 Jan 2025 16:33:38 +0000 Subject: [PATCH] feat(client): add Realtime API support (#1266) --- README.md | 87 ++++++++++++++++++++++++++ examples/package.json | 7 ++- examples/realtime/websocket.ts | 48 +++++++++++++++ examples/realtime/ws.ts | 55 +++++++++++++++++ package.json | 6 ++ src/beta/realtime/index.ts | 1 + src/beta/realtime/internal-base.ts | 83 +++++++++++++++++++++++++ src/beta/realtime/websocket.ts | 97 +++++++++++++++++++++++++++++ src/beta/realtime/ws.ts | 69 +++++++++++++++++++++ src/lib/EventEmitter.ts | 98 ++++++++++++++++++++++++++++++ yarn.lock | 12 ++++ 11 files changed, 560 insertions(+), 3 deletions(-) create mode 100644 examples/realtime/websocket.ts create mode 100644 examples/realtime/ws.ts create mode 100644 src/beta/realtime/index.ts create mode 100644 src/beta/realtime/internal-base.ts create mode 100644 src/beta/realtime/websocket.ts create mode 100644 src/beta/realtime/ws.ts create mode 100644 src/lib/EventEmitter.ts diff --git a/README.md b/README.md index 3039857a1..e7d69a669 100644 --- a/README.md +++ b/README.md @@ -83,6 +83,93 @@ main(); If you need to cancel a stream, you can `break` from the loop or call `stream.controller.abort()`. +## Realtime API beta + +The Realtime API enables you to build low-latency, multi-modal conversational experiences. It currently supports text and audio as both input and output, as well as [function calling](https://platform.openai.com/docs/guides/function-calling) through a `WebSocket` connection. + +The Realtime API works through a combination of client-sent events and server-sent events. Clients can send events to do things like update session configuration or send text and audio inputs. Server events confirm when audio responses have completed, or when a text response from the model has been received. A full event reference can be found [here](https://platform.openai.com/docs/api-reference/realtime-client-events) and a guide can be found [here](https://platform.openai.com/docs/guides/realtime). + +This SDK supports accessing the Realtime API through the [WebSocket API](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) or with [ws](https://github.com/websockets/ws). + +Basic text based example with `ws`: + +```ts +// requires `yarn add ws @types/ws` +import { OpenAIRealtimeWS } from 'openai/beta/realtime/ws'; + +const rt = new OpenAIRealtimeWS({ model: 'gpt-4o-realtime-preview-2024-12-17' }); + +// access the underlying `ws.WebSocket` instance +rt.socket.on('open', () => { + console.log('Connection opened!'); + rt.send({ + type: 'session.update', + session: { + modalities: ['text'], + model: 'gpt-4o-realtime-preview', + }, + }); + + rt.send({ + type: 'conversation.item.create', + item: { + type: 'message', + role: 'user', + content: [{ type: 'input_text', text: 'Say a couple paragraphs!' }], + }, + }); + + rt.send({ type: 'response.create' }); +}); + +rt.on('error', (err) => { + // in a real world scenario this should be logged somewhere as you + // likely want to continue procesing events regardless of any errors + throw err; +}); + +rt.on('session.created', (event) => { + console.log('session created!', event.session); + console.log(); +}); + +rt.on('response.text.delta', (event) => process.stdout.write(event.delta)); +rt.on('response.text.done', () => console.log()); + +rt.on('response.done', () => rt.close()); + +rt.socket.on('close', () => console.log('\nConnection closed!')); +``` + +To use the web API `WebSocket` implementation, replace `OpenAIRealtimeWS` with `OpenAIRealtimeWebSocket` and adjust any `rt.socket` access: + +```ts +import { OpenAIRealtimeWebSocket } from 'openai/beta/realtime/websocket'; + +const rt = new OpenAIRealtimeWebSocket({ model: 'gpt-4o-realtime-preview-2024-12-17' }); +// ... +rt.socket.addEventListener('open', () => { + // ... +}); +``` + +A full example can be found [here](https://github.com/openai/openai-node/blob/master/examples/realtime/web.ts). + +### Realtime error handling + +When an error is encountered, either on the client side or returned from the server through the [`error` event](https://platform.openai.com/docs/guides/realtime/realtime-api-beta#handling-errors), the `error` event listener will be fired. However, if you haven't registered an `error` event listener then an `unhandled Promise rejection` error will be thrown. + +It is **highly recommended** that you register an `error` event listener and handle errors approriately as typically the underlying connection is still usable. + +```ts +const rt = new OpenAIRealtimeWS({ model: 'gpt-4o-realtime-preview-2024-12-17' }); +rt.on('error', (err) => { + // in a real world scenario this should be logged somewhere as you + // likely want to continue procesing events regardless of any errors + throw err; +}); +``` + ### Request & Response types This library includes TypeScript definitions for all request params and response fields. You may import and use them like so: diff --git a/examples/package.json b/examples/package.json index c8a5f7087..b8c34ac45 100644 --- a/examples/package.json +++ b/examples/package.json @@ -6,14 +6,15 @@ "license": "MIT", "private": true, "dependencies": { + "@azure/identity": "^4.2.0", "express": "^4.18.2", "next": "^14.1.1", "openai": "file:..", - "zod-to-json-schema": "^3.21.4", - "@azure/identity": "^4.2.0" + "zod-to-json-schema": "^3.21.4" }, "devDependencies": { "@types/body-parser": "^1.19.3", - "@types/express": "^4.17.19" + "@types/express": "^4.17.19", + "@types/web": "^0.0.194" } } diff --git a/examples/realtime/websocket.ts b/examples/realtime/websocket.ts new file mode 100644 index 000000000..0da131bc3 --- /dev/null +++ b/examples/realtime/websocket.ts @@ -0,0 +1,48 @@ +import { OpenAIRealtimeWebSocket } from 'openai/beta/realtime/websocket'; + +async function main() { + const rt = new OpenAIRealtimeWebSocket({ model: 'gpt-4o-realtime-preview-2024-12-17' }); + + // access the underlying `ws.WebSocket` instance + rt.socket.addEventListener('open', () => { + console.log('Connection opened!'); + rt.send({ + type: 'session.update', + session: { + modalities: ['text'], + model: 'gpt-4o-realtime-preview', + }, + }); + + rt.send({ + type: 'conversation.item.create', + item: { + type: 'message', + role: 'user', + content: [{ type: 'input_text', text: 'Say a couple paragraphs!' }], + }, + }); + + rt.send({ type: 'response.create' }); + }); + + rt.on('error', (err) => { + // in a real world scenario this should be logged somewhere as you + // likely want to continue procesing events regardless of any errors + throw err; + }); + + rt.on('session.created', (event) => { + console.log('session created!', event.session); + console.log(); + }); + + rt.on('response.text.delta', (event) => process.stdout.write(event.delta)); + rt.on('response.text.done', () => console.log()); + + rt.on('response.done', () => rt.close()); + + rt.socket.addEventListener('close', () => console.log('\nConnection closed!')); +} + +main(); diff --git a/examples/realtime/ws.ts b/examples/realtime/ws.ts new file mode 100644 index 000000000..4bbe85e5d --- /dev/null +++ b/examples/realtime/ws.ts @@ -0,0 +1,55 @@ +import { OpenAIRealtimeWS } from 'openai/beta/realtime/ws'; + +async function main() { + const rt = new OpenAIRealtimeWS({ model: 'gpt-4o-realtime-preview-2024-12-17' }); + + // access the underlying `ws.WebSocket` instance + rt.socket.on('open', () => { + console.log('Connection opened!'); + rt.send({ + type: 'session.update', + session: { + modalities: ['foo'] as any, + model: 'gpt-4o-realtime-preview', + }, + }); + rt.send({ + type: 'session.update', + session: { + modalities: ['text'], + model: 'gpt-4o-realtime-preview', + }, + }); + + rt.send({ + type: 'conversation.item.create', + item: { + type: 'message', + role: 'user', + content: [{ type: 'input_text', text: 'Say a couple paragraphs!' }], + }, + }); + + rt.send({ type: 'response.create' }); + }); + + rt.on('error', (err) => { + // in a real world scenario this should be logged somewhere as you + // likely want to continue procesing events regardless of any errors + throw err; + }); + + rt.on('session.created', (event) => { + console.log('session created!', event.session); + console.log(); + }); + + rt.on('response.text.delta', (event) => process.stdout.write(event.delta)); + rt.on('response.text.done', () => console.log()); + + rt.on('response.done', () => rt.close()); + + rt.socket.on('close', () => console.log('\nConnection closed!')); +} + +main(); diff --git a/package.json b/package.json index ff6ec16bc..77e2d609f 100644 --- a/package.json +++ b/package.json @@ -36,6 +36,7 @@ "@swc/core": "^1.3.102", "@swc/jest": "^0.2.29", "@types/jest": "^29.4.0", + "@types/ws": "^8.5.13", "@typescript-eslint/eslint-plugin": "^6.7.0", "@typescript-eslint/parser": "^6.7.0", "eslint": "^8.49.0", @@ -52,6 +53,7 @@ "tsc-multi": "^1.1.0", "tsconfig-paths": "^4.0.0", "typescript": "^4.8.2", + "ws": "^8.18.0", "zod": "^3.23.8" }, "sideEffects": [ @@ -126,9 +128,13 @@ }, "bin": "./bin/cli", "peerDependencies": { + "ws": "^8.18.0", "zod": "^3.23.8" }, "peerDependenciesMeta": { + "ws": { + "optional": true + }, "zod": { "optional": true } diff --git a/src/beta/realtime/index.ts b/src/beta/realtime/index.ts new file mode 100644 index 000000000..75f0f3088 --- /dev/null +++ b/src/beta/realtime/index.ts @@ -0,0 +1 @@ +export { OpenAIRealtimeError } from './internal-base'; diff --git a/src/beta/realtime/internal-base.ts b/src/beta/realtime/internal-base.ts new file mode 100644 index 000000000..391d69911 --- /dev/null +++ b/src/beta/realtime/internal-base.ts @@ -0,0 +1,83 @@ +import { RealtimeClientEvent, RealtimeServerEvent, ErrorEvent } from '../../resources/beta/realtime/realtime'; +import { EventEmitter } from '../../lib/EventEmitter'; +import { OpenAIError } from '../../error'; + +export class OpenAIRealtimeError extends OpenAIError { + /** + * The error data that the API sent back in an `error` event. + */ + error?: ErrorEvent.Error | undefined; + + /** + * The unique ID of the server event. + */ + event_id?: string | undefined; + + constructor(message: string, event: ErrorEvent | null) { + super(message); + + this.error = event?.error; + this.event_id = event?.event_id; + } +} + +type Simplify = { [KeyType in keyof T]: T[KeyType] } & {}; + +type RealtimeEvents = Simplify< + { + event: (event: RealtimeServerEvent) => void; + error: (error: OpenAIRealtimeError) => void; + } & { + [EventType in Exclude]: ( + event: Extract, + ) => unknown; + } +>; + +export abstract class OpenAIRealtimeEmitter extends EventEmitter { + /** + * Send an event to the API. + */ + abstract send(event: RealtimeClientEvent): void; + + /** + * Close the websocket connection. + */ + abstract close(props?: { code: number; reason: string }): void; + + protected _onError(event: null, message: string, cause: any): void; + protected _onError(event: ErrorEvent, message?: string | undefined): void; + protected _onError(event: ErrorEvent | null, message?: string | undefined, cause?: any): void { + message = + event?.error ? + `${event.error.message} code=${event.error.code} param=${event.error.param} type=${event.error.type} event_id=${event.error.event_id}` + : message ?? 'unknown error'; + + if (!this._hasListener('error')) { + const error = new OpenAIRealtimeError( + message + + `\n\nTo resolve these unhandled rejection errors you should bind an \`error\` callback, e.g. \`rt.on('error', (error) => ...)\` `, + event, + ); + // @ts-ignore + error.cause = cause; + Promise.reject(error); + return; + } + + const error = new OpenAIRealtimeError(message, event); + // @ts-ignore + error.cause = cause; + + this._emit('error', error); + } +} + +export function buildRealtimeURL(props: { baseURL: string; model: string }): URL { + const path = '/realtime'; + + const url = new URL(props.baseURL + (props.baseURL.endsWith('/') ? path.slice(1) : path)); + url.protocol = 'wss'; + url.searchParams.set('model', props.model); + return url; +} diff --git a/src/beta/realtime/websocket.ts b/src/beta/realtime/websocket.ts new file mode 100644 index 000000000..e0853779d --- /dev/null +++ b/src/beta/realtime/websocket.ts @@ -0,0 +1,97 @@ +import { OpenAI } from '../../index'; +import { OpenAIError } from '../../error'; +import * as Core from '../../core'; +import type { RealtimeClientEvent, RealtimeServerEvent } from '../../resources/beta/realtime/realtime'; +import { OpenAIRealtimeEmitter, buildRealtimeURL } from './internal-base'; + +interface MessageEvent { + data: string; +} + +type _WebSocket = + typeof globalThis extends ( + { + WebSocket: infer ws; + } + ) ? + // @ts-ignore + InstanceType + : any; + +export class OpenAIRealtimeWebSocket extends OpenAIRealtimeEmitter { + url: URL; + socket: _WebSocket; + + constructor( + props: { + model: string; + dangerouslyAllowBrowser?: boolean; + }, + client?: Pick, + ) { + super(); + + const dangerouslyAllowBrowser = + props.dangerouslyAllowBrowser ?? + (client as any)?._options?.dangerouslyAllowBrowser ?? + (client?.apiKey.startsWith('ek_') ? true : null); + + if (!dangerouslyAllowBrowser && Core.isRunningInBrowser()) { + throw new OpenAIError( + "It looks like you're running in a browser-like environment.\n\nThis is disabled by default, as it risks exposing your secret API credentials to attackers.\n\nYou can avoid this error by creating an ephemeral session token:\nhttps://platform.openai.com/docs/api-reference/realtime-sessions\n", + ); + } + + client ??= new OpenAI({ dangerouslyAllowBrowser }); + + this.url = buildRealtimeURL({ baseURL: client.baseURL, model: props.model }); + // @ts-ignore + this.socket = new WebSocket(this.url, [ + 'realtime', + `openai-insecure-api-key.${client.apiKey}`, + 'openai-beta.realtime-v1', + ]); + + this.socket.addEventListener('message', (websocketEvent: MessageEvent) => { + const event = (() => { + try { + return JSON.parse(websocketEvent.data.toString()) as RealtimeServerEvent; + } catch (err) { + this._onError(null, 'could not parse websocket event', err); + return null; + } + })(); + + if (event) { + this._emit('event', event); + + if (event.type === 'error') { + this._onError(event); + } else { + // @ts-expect-error TS isn't smart enough to get the relationship right here + this._emit(event.type, event); + } + } + }); + + this.socket.addEventListener('error', (event: any) => { + this._onError(null, event.message, null); + }); + } + + send(event: RealtimeClientEvent) { + try { + this.socket.send(JSON.stringify(event)); + } catch (err) { + this._onError(null, 'could not send data', err); + } + } + + close(props?: { code: number; reason: string }) { + try { + this.socket.close(props?.code ?? 1000, props?.reason ?? 'OK'); + } catch (err) { + this._onError(null, 'could not close the connection', err); + } + } +} diff --git a/src/beta/realtime/ws.ts b/src/beta/realtime/ws.ts new file mode 100644 index 000000000..33bb11ad9 --- /dev/null +++ b/src/beta/realtime/ws.ts @@ -0,0 +1,69 @@ +import WS from 'ws'; +import { OpenAI } from '../../index'; +import type { RealtimeClientEvent, RealtimeServerEvent } from '../../resources/beta/realtime/realtime'; +import { OpenAIRealtimeEmitter, buildRealtimeURL } from './internal-base'; + +export class OpenAIRealtimeWS extends OpenAIRealtimeEmitter { + url: URL; + socket: WS.WebSocket; + + constructor( + props: { model: string; options?: WS.ClientOptions | undefined }, + client?: Pick, + ) { + super(); + client ??= new OpenAI(); + + this.url = buildRealtimeURL({ baseURL: client.baseURL, model: props.model }); + this.socket = new WS.WebSocket(this.url, { + ...props.options, + headers: { + ...props.options?.headers, + Authorization: `Bearer ${client.apiKey}`, + 'OpenAI-Beta': 'realtime=v1', + }, + }); + + this.socket.on('message', (wsEvent) => { + const event = (() => { + try { + return JSON.parse(wsEvent.toString()) as RealtimeServerEvent; + } catch (err) { + this._onError(null, 'could not parse websocket event', err); + return null; + } + })(); + + if (event) { + this._emit('event', event); + + if (event.type === 'error') { + this._onError(event); + } else { + // @ts-expect-error TS isn't smart enough to get the relationship right here + this._emit(event.type, event); + } + } + }); + + this.socket.on('error', (err) => { + this._onError(null, err.message, err); + }); + } + + send(event: RealtimeClientEvent) { + try { + this.socket.send(JSON.stringify(event)); + } catch (err) { + this._onError(null, 'could not send data', err); + } + } + + close(props?: { code: number; reason: string }) { + try { + this.socket.close(props?.code ?? 1000, props?.reason ?? 'OK'); + } catch (err) { + this._onError(null, 'could not close the connection', err); + } + } +} diff --git a/src/lib/EventEmitter.ts b/src/lib/EventEmitter.ts new file mode 100644 index 000000000..9adeebdc3 --- /dev/null +++ b/src/lib/EventEmitter.ts @@ -0,0 +1,98 @@ +type EventListener = Events[EventType]; + +type EventListeners = Array<{ + listener: EventListener; + once?: boolean; +}>; + +export type EventParameters = { + [Event in EventType]: EventListener extends (...args: infer P) => any ? P : never; +}[EventType]; + +export class EventEmitter any>> { + #listeners: { + [Event in keyof EventTypes]?: EventListeners; + } = {}; + + /** + * Adds the listener function to the end of the listeners array for the event. + * No checks are made to see if the listener has already been added. Multiple calls passing + * the same combination of event and listener will result in the listener being added, and + * called, multiple times. + * @returns this, so that calls can be chained + */ + on(event: Event, listener: EventListener): this { + const listeners: EventListeners = + this.#listeners[event] || (this.#listeners[event] = []); + listeners.push({ listener }); + return this; + } + + /** + * Removes the specified listener from the listener array for the event. + * off() will remove, at most, one instance of a listener from the listener array. If any single + * listener has been added multiple times to the listener array for the specified event, then + * off() must be called multiple times to remove each instance. + * @returns this, so that calls can be chained + */ + off(event: Event, listener: EventListener): this { + const listeners = this.#listeners[event]; + if (!listeners) return this; + const index = listeners.findIndex((l) => l.listener === listener); + if (index >= 0) listeners.splice(index, 1); + return this; + } + + /** + * Adds a one-time listener function for the event. The next time the event is triggered, + * this listener is removed and then invoked. + * @returns this, so that calls can be chained + */ + once(event: Event, listener: EventListener): this { + const listeners: EventListeners = + this.#listeners[event] || (this.#listeners[event] = []); + listeners.push({ listener, once: true }); + return this; + } + + /** + * This is similar to `.once()`, but returns a Promise that resolves the next time + * the event is triggered, instead of calling a listener callback. + * @returns a Promise that resolves the next time given event is triggered, + * or rejects if an error is emitted. (If you request the 'error' event, + * returns a promise that resolves with the error). + * + * Example: + * + * const message = await stream.emitted('message') // rejects if the stream errors + */ + emitted( + event: Event, + ): Promise< + EventParameters extends [infer Param] ? Param + : EventParameters extends [] ? void + : EventParameters + > { + return new Promise((resolve, reject) => { + // TODO: handle errors + this.once(event, resolve as any); + }); + } + + protected _emit( + this: EventEmitter, + event: Event, + ...args: EventParameters + ) { + const listeners: EventListeners | undefined = this.#listeners[event]; + if (listeners) { + this.#listeners[event] = listeners.filter((l) => !l.once) as any; + listeners.forEach(({ listener }: any) => listener(...(args as any))); + } + } + + protected _hasListener(event: keyof EventTypes): boolean { + const listeners = this.#listeners[event]; + return listeners && listeners.length > 0; + } +} diff --git a/yarn.lock b/yarn.lock index c0220f984..0a4307f70 100644 --- a/yarn.lock +++ b/yarn.lock @@ -881,6 +881,13 @@ resolved "https://registry.yarnpkg.com/@types/stack-utils/-/stack-utils-2.0.3.tgz#6209321eb2c1712a7e7466422b8cb1fc0d9dd5d8" integrity sha512-9aEbYZ3TbYMznPdcdr3SmIrLXwC/AKZXQeCf9Pgao5CKb8CyHuEX5jzWPTkvregvhRJHcpRO6BFoGW9ycaOkYw== +"@types/ws@^8.5.13": + version "8.5.13" + resolved "https://registry.yarnpkg.com/@types/ws/-/ws-8.5.13.tgz#6414c280875e2691d0d1e080b05addbf5cb91e20" + integrity sha512-osM/gWBTPKgHV8XkTunnegTRIsvF6owmf5w+JtAfOw472dptdm0dlGv4xCt6GwQRcC2XVOvvRE/0bAoQcL2QkA== + dependencies: + "@types/node" "*" + "@types/yargs-parser@*": version "21.0.3" resolved "https://registry.yarnpkg.com/@types/yargs-parser/-/yargs-parser-21.0.3.tgz#815e30b786d2e8f0dcd85fd5bcf5e1a04d008f15" @@ -3472,6 +3479,11 @@ write-file-atomic@^4.0.2: imurmurhash "^0.1.4" signal-exit "^3.0.7" +ws@^8.18.0: + version "8.18.0" + resolved "https://registry.yarnpkg.com/ws/-/ws-8.18.0.tgz#0d7505a6eafe2b0e712d232b42279f53bc289bbc" + integrity sha512-8VbfWfHLbbwu3+N6OKsOMpBdT4kXPDDB9cJk2bJ6mh9ucxdlnNvH1e+roYkKmN9Nxw2yjz7VzeO9oOz2zJ04Pw== + y18n@^5.0.5: version "5.0.8" resolved "https://registry.yarnpkg.com/y18n/-/y18n-5.0.8.tgz#7f4934d0f7ca8c56f95314939ddcd2dd91ce1d55"