Skip to content

Commit

Permalink
chore(internal): move LineDecoder to a separate file (#1120)
Browse files Browse the repository at this point in the history
  • Loading branch information
stainless-app[bot] authored Oct 4, 2024
1 parent f7dc520 commit 67a4b65
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 111 deletions.
114 changes: 114 additions & 0 deletions src/internal/decoders/line.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import { OpenAIError } from '../../error';

type Bytes = string | ArrayBuffer | Uint8Array | Buffer | null | undefined;

/**
* A re-implementation of httpx's `LineDecoder` in Python that handles incrementally
* reading lines from text.
*
* https://github.com/encode/httpx/blob/920333ea98118e9cf617f246905d7b202510941c/httpx/_decoders.py#L258
*/
export class LineDecoder {
// prettier-ignore
static NEWLINE_CHARS = new Set(['\n', '\r']);
static NEWLINE_REGEXP = /\r\n|[\n\r]/g;

buffer: string[];
trailingCR: boolean;
textDecoder: any; // TextDecoder found in browsers; not typed to avoid pulling in either "dom" or "node" types.

constructor() {
this.buffer = [];
this.trailingCR = false;
}

decode(chunk: Bytes): string[] {
let text = this.decodeText(chunk);

if (this.trailingCR) {
text = '\r' + text;
this.trailingCR = false;
}
if (text.endsWith('\r')) {
this.trailingCR = true;
text = text.slice(0, -1);
}

if (!text) {
return [];
}

const trailingNewline = LineDecoder.NEWLINE_CHARS.has(text[text.length - 1] || '');
let lines = text.split(LineDecoder.NEWLINE_REGEXP);

// if there is a trailing new line then the last entry will be an empty
// string which we don't care about
if (trailingNewline) {
lines.pop();
}

if (lines.length === 1 && !trailingNewline) {
this.buffer.push(lines[0]!);
return [];
}

if (this.buffer.length > 0) {
lines = [this.buffer.join('') + lines[0], ...lines.slice(1)];
this.buffer = [];
}

if (!trailingNewline) {
this.buffer = [lines.pop() || ''];
}

return lines;
}

decodeText(bytes: Bytes): string {
if (bytes == null) return '';
if (typeof bytes === 'string') return bytes;

// Node:
if (typeof Buffer !== 'undefined') {
if (bytes instanceof Buffer) {
return bytes.toString();
}
if (bytes instanceof Uint8Array) {
return Buffer.from(bytes).toString();
}

throw new OpenAIError(
`Unexpected: received non-Uint8Array (${bytes.constructor.name}) stream chunk in an environment with a global "Buffer" defined, which this library assumes to be Node. Please report this error.`,
);
}

// Browser
if (typeof TextDecoder !== 'undefined') {
if (bytes instanceof Uint8Array || bytes instanceof ArrayBuffer) {
this.textDecoder ??= new TextDecoder('utf8');
return this.textDecoder.decode(bytes);
}

throw new OpenAIError(
`Unexpected: received non-Uint8Array/ArrayBuffer (${
(bytes as any).constructor.name
}) in a web platform. Please report this error.`,
);
}

throw new OpenAIError(
`Unexpected: neither Buffer nor TextDecoder are available as globals. Please report this error.`,
);
}

flush(): string[] {
if (!this.buffer.length && !this.trailingCR) {
return [];
}

const lines = [this.buffer.join('')];
this.buffer = [];
this.trailingCR = false;
return lines;
}
}
112 changes: 1 addition & 111 deletions src/streaming.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { ReadableStream, type Response } from './_shims/index';
import { OpenAIError } from './error';
import { LineDecoder } from './internal/decoders/line';

import { APIError } from 'openai/error';

Expand Down Expand Up @@ -329,117 +330,6 @@ class SSEDecoder {
}
}

/**
* A re-implementation of httpx's `LineDecoder` in Python that handles incrementally
* reading lines from text.
*
* https://github.com/encode/httpx/blob/920333ea98118e9cf617f246905d7b202510941c/httpx/_decoders.py#L258
*/
class LineDecoder {
// prettier-ignore
static NEWLINE_CHARS = new Set(['\n', '\r']);
static NEWLINE_REGEXP = /\r\n|[\n\r]/g;

buffer: string[];
trailingCR: boolean;
textDecoder: any; // TextDecoder found in browsers; not typed to avoid pulling in either "dom" or "node" types.

constructor() {
this.buffer = [];
this.trailingCR = false;
}

decode(chunk: Bytes): string[] {
let text = this.decodeText(chunk);

if (this.trailingCR) {
text = '\r' + text;
this.trailingCR = false;
}
if (text.endsWith('\r')) {
this.trailingCR = true;
text = text.slice(0, -1);
}

if (!text) {
return [];
}

const trailingNewline = LineDecoder.NEWLINE_CHARS.has(text[text.length - 1] || '');
let lines = text.split(LineDecoder.NEWLINE_REGEXP);

// if there is a trailing new line then the last entry will be an empty
// string which we don't care about
if (trailingNewline) {
lines.pop();
}

if (lines.length === 1 && !trailingNewline) {
this.buffer.push(lines[0]!);
return [];
}

if (this.buffer.length > 0) {
lines = [this.buffer.join('') + lines[0], ...lines.slice(1)];
this.buffer = [];
}

if (!trailingNewline) {
this.buffer = [lines.pop() || ''];
}

return lines;
}

decodeText(bytes: Bytes): string {
if (bytes == null) return '';
if (typeof bytes === 'string') return bytes;

// Node:
if (typeof Buffer !== 'undefined') {
if (bytes instanceof Buffer) {
return bytes.toString();
}
if (bytes instanceof Uint8Array) {
return Buffer.from(bytes).toString();
}

throw new OpenAIError(
`Unexpected: received non-Uint8Array (${bytes.constructor.name}) stream chunk in an environment with a global "Buffer" defined, which this library assumes to be Node. Please report this error.`,
);
}

// Browser
if (typeof TextDecoder !== 'undefined') {
if (bytes instanceof Uint8Array || bytes instanceof ArrayBuffer) {
this.textDecoder ??= new TextDecoder('utf8');
return this.textDecoder.decode(bytes);
}

throw new OpenAIError(
`Unexpected: received non-Uint8Array/ArrayBuffer (${
(bytes as any).constructor.name
}) in a web platform. Please report this error.`,
);
}

throw new OpenAIError(
`Unexpected: neither Buffer nor TextDecoder are available as globals. Please report this error.`,
);
}

flush(): string[] {
if (!this.buffer.length && !this.trailingCR) {
return [];
}

const lines = [this.buffer.join('')];
this.buffer = [];
this.trailingCR = false;
return lines;
}
}

/** This is an internal helper function that's just used for testing */
export function _decodeChunks(chunks: string[]): string[] {
const decoder = new LineDecoder();
Expand Down

0 comments on commit 67a4b65

Please sign in to comment.