From 88f84e3058d92392f0ace797b7084a899e97cd97 Mon Sep 17 00:00:00 2001 From: Stainless Bot Date: Mon, 30 Oct 2023 23:34:40 +0000 Subject: [PATCH] feat: streaming improvements --- src/lib/ChatCompletionRunner.ts | 11 ----------- src/lib/ChatCompletionStream.ts | 24 ++++++++++++++++++++---- src/lib/ChatCompletionStreamingRunner.ts | 11 ----------- src/resources/beta/chat/completions.ts | 15 ++++----------- 4 files changed, 24 insertions(+), 37 deletions(-) diff --git a/src/lib/ChatCompletionRunner.ts b/src/lib/ChatCompletionRunner.ts index cb9bd4867..e2caf32eb 100644 --- a/src/lib/ChatCompletionRunner.ts +++ b/src/lib/ChatCompletionRunner.ts @@ -3,7 +3,6 @@ import { type Completions, type ChatCompletionMessage, type ChatCompletionMessageParam, - type ChatCompletionCreateParams, type ChatCompletionCreateParamsNonStreaming, } from 'openai/resources/chat/completions'; import { type RunnableFunctions, type BaseFunctionsArgs } from './RunnableFunction'; @@ -34,16 +33,6 @@ export class ChatCompletionRunner extends AbstractChatCompletionRunner runner._runChatCompletion(completions, params, options)); - return runner; - } - override _addMessage(message: ChatCompletionMessage | ChatCompletionMessageParam) { super._addMessage(message); if (message.role === 'assistant' && message.content) { diff --git a/src/lib/ChatCompletionStream.ts b/src/lib/ChatCompletionStream.ts index 9b3e2a419..4e68f660f 100644 --- a/src/lib/ChatCompletionStream.ts +++ b/src/lib/ChatCompletionStream.ts @@ -5,7 +5,7 @@ import { type ChatCompletion, type ChatCompletionChunk, type ChatCompletionCreateParams, - type ChatCompletionCreateParamsStreaming, + ChatCompletionCreateParamsBase, } from 'openai/resources/chat/completions'; import { AbstractChatCompletionRunner, @@ -19,7 +19,9 @@ export interface ChatCompletionStreamEvents extends AbstractChatCompletionRunner chunk: (chunk: ChatCompletionChunk, snapshot: ChatCompletionSnapshot) => void; } -export type ChatCompletionStreamParams = ChatCompletionCreateParamsStreaming; +export type ChatCompletionStreamParams = Omit & { + stream?: true; +}; export class ChatCompletionStream extends AbstractChatCompletionRunner @@ -31,6 +33,13 @@ export class ChatCompletionStream return this.#currentChatCompletionSnapshot; } + /** + * Intended for use on the frontend, consuming a stream produced with + * `.toReadableStream()` on the backend. + * + * Note that messages sent to the model do not appear in `.on('message')` + * in this context. + */ static fromReadableStream(stream: ReadableStream): ChatCompletionStream { const runner = new ChatCompletionStream(); runner._run(() => runner._fromReadableStream(stream)); @@ -39,11 +48,11 @@ export class ChatCompletionStream static createChatCompletion( completions: Completions, - params: ChatCompletionCreateParams, + params: ChatCompletionStreamParams, options?: Core.RequestOptions, ): ChatCompletionStream { const runner = new ChatCompletionStream(); - runner._run(() => runner._runChatCompletion(completions, params, options)); + runner._run(() => runner._runChatCompletion(completions, { ...params, stream: true }, options)); return runner; } @@ -110,8 +119,15 @@ export class ChatCompletionStream this.#beginRequest(); this._connected(); const stream = Stream.fromReadableStream(readableStream, this.controller); + let chatId; for await (const chunk of stream) { + if (chatId && chatId !== chunk.id) { + // A new request has been made. + this._addChatCompletion(this.#endRequest()); + } + this.#addChunk(chunk); + chatId = chunk.id; } if (stream.controller.signal?.aborted) { throw new APIUserAbortError(); diff --git a/src/lib/ChatCompletionStreamingRunner.ts b/src/lib/ChatCompletionStreamingRunner.ts index 0057c7623..1e5e09de6 100644 --- a/src/lib/ChatCompletionStreamingRunner.ts +++ b/src/lib/ChatCompletionStreamingRunner.ts @@ -2,7 +2,6 @@ import * as Core from 'openai/core'; import { Completions, type ChatCompletionChunk, - type ChatCompletionCreateParams, type ChatCompletionCreateParamsStreaming, } from 'openai/resources/chat/completions'; import { type AbstractChatCompletionRunnerEvents } from './AbstractChatCompletionRunner'; @@ -41,14 +40,4 @@ export class ChatCompletionStreamingRunner runner._run(() => runner._runFunctions(completions, params, options)); return runner; } - - static override createChatCompletion( - completions: Completions, - params: ChatCompletionCreateParams, - options?: Core.RequestOptions, - ): ChatCompletionStreamingRunner { - const runner = new ChatCompletionStreamingRunner(); - runner._run(() => runner._runChatCompletion(completions, params, options)); - return runner; - } } diff --git a/src/resources/beta/chat/completions.ts b/src/resources/beta/chat/completions.ts index 8bb23a789..24fe90a0a 100644 --- a/src/resources/beta/chat/completions.ts +++ b/src/resources/beta/chat/completions.ts @@ -20,8 +20,8 @@ export { RunnableFunctionWithoutParse, ParsingFunction, } from 'openai/lib/RunnableFunction'; -import { ChatCompletionStream } from 'openai/lib/ChatCompletionStream'; -import { ChatCompletionCreateParamsStreaming } from 'openai/resources/chat/completions'; +import { ChatCompletionStream, type ChatCompletionStreamParams } from 'openai/lib/ChatCompletionStream'; +export { ChatCompletionStream, type ChatCompletionStreamParams } from 'openai/lib/ChatCompletionStream'; export class Completions extends APIResource { /** @@ -64,14 +64,7 @@ export class Completions extends APIResource { /** * Creates a chat completion stream */ - stream( - body: Omit & { stream?: true }, - options?: Core.RequestOptions, - ): ChatCompletionStream { - return ChatCompletionStream.createChatCompletion( - this.client.chat.completions, - { ...body, stream: true }, - options, - ); + stream(body: ChatCompletionStreamParams, options?: Core.RequestOptions): ChatCompletionStream { + return ChatCompletionStream.createChatCompletion(this.client.chat.completions, body, options); } }