Skip to content

Commit

Permalink
fix(response-stream): improve chunk identification (fixes #260)
Browse files Browse the repository at this point in the history
Adds logic to ensure that the actual chunk is properly identified and passed into the streaming response.
  • Loading branch information
florian-g2 committed Sep 9, 2024
1 parent 5022260 commit 2aa474e
Showing 1 changed file with 41 additions and 23 deletions.
64 changes: 41 additions & 23 deletions src/network/response-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import type { BothValueHeaders } from '../@types';
import { type ILogger, NO_OP, parseHeaders } from '../core';
import { getString } from './utils';

// header or data crlf
const crlfBuffer = Buffer.from('\r\n');

const endChunked = '0\r\n\r\n';
const headerEnd = '\r\n\r\n';
const endStatusSeparator = '\r\n';
Expand Down Expand Up @@ -55,11 +58,8 @@ export class ServerlessStreamResponse extends ServerResponse {
this.chunkedEncoding = true;

let internalWritable: Writable | null = null;
let isFirstCall = true;
// this ignore is used because I need to ignore these write calls:
// https://github.com/nodejs/node/blob/main/lib/_http_outgoing.js#L934-L935
// https://github.com/nodejs/node/blob/main/lib/_http_outgoing.js#L937
let writesToIgnore = 0;
let firstCrlfBufferEncountered = false;
let chunkEncountered = false;

const socket: Partial<Socket> & { _writableState: any } = {
_writableState: {},
Expand All @@ -86,23 +86,7 @@ export class ServerlessStreamResponse extends ServerResponse {
encoding,
}));

if (!isFirstCall && internalWritable) {
if (data === endChunked) {
internalWritable.end(cb);

return true;
}

if (writesToIgnore > 0) {
writesToIgnore--;
return true;
}

internalWritable.write(data, cb);
writesToIgnore = 3;
} else if (isFirstCall) {
isFirstCall = false;

if (!internalWritable) {
const stringData = getString(data);
const endStatusIndex = stringData.indexOf(endStatusSeparator);
const status = +stringData.slice(0, endStatusIndex).split(' ')[1];
Expand All @@ -120,14 +104,48 @@ export class ServerlessStreamResponse extends ServerResponse {
}),
);

writesToIgnore = 1;
internalWritable = onReceiveHeaders(status, headers);

// If we get an endChunked right after header which means the response body is empty, we need to immediately end the writable
if (stringData.substring(endHeaderIndex + 4) === endChunked)
internalWritable.end();

return true;
}

if (data === endChunked) {
internalWritable.end(cb);
return true;
}

// if header or data crlf
if (Buffer.isBuffer(data) && crlfBuffer.equals(data)) {
const isHeaderCrlf = !firstCrlfBufferEncountered;
if (isHeaderCrlf) {
firstCrlfBufferEncountered = true;
return true;
}

const isDataCrlf = firstCrlfBufferEncountered && chunkEncountered;
if (isDataCrlf) {
// done with chunk
firstCrlfBufferEncountered = false;
chunkEncountered = false;
return true;
}

// the crlf *is* the chunk
}

const isContentLength = !firstCrlfBufferEncountered;
if (isContentLength) {
// discard content length
return true;
}

// write chunk
chunkEncountered = true;
internalWritable.write(data, cb);
return true;
},
};
Expand Down

0 comments on commit 2aa474e

Please sign in to comment.