diff --git a/lib/internal/webstreams/adapters.js b/lib/internal/webstreams/adapters.js index 3e5fd69d4d6a03..586458eb5d52c5 100644 --- a/lib/internal/webstreams/adapters.js +++ b/lib/internal/webstreams/adapters.js @@ -428,13 +428,21 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj let controller; - function onData(chunk) { - // Copy the Buffer to detach it from the pool. - if (Buffer.isBuffer(chunk) && !objectMode) - chunk = new Uint8Array(chunk); - controller.enqueue(chunk); - if (controller.desiredSize <= 0) - streamReadable.pause(); + function fillData() { + if (controller.desiredSize <= 0) { + return; + } + + let chunk; + while ((chunk = streamReadable.read(controller.desiredSize)) !== null) { + // Copy the Buffer to detach it from the pool. + if (Buffer.isBuffer(chunk) && !objectMode) + chunk = new Uint8Array(chunk); + controller.enqueue(chunk); + + if (controller.desiredSize <= 0) + break; + } } streamReadable.pause(); @@ -454,12 +462,14 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj controller.close(); }); - streamReadable.on('data', onData); + // Using 'readable' and not 'data' event as we want to know when the data + // is available but not consume it as maybe the ReadableStream internal queue is full + streamReadable.on('readable', fillData); return new ReadableStream({ start(c) { controller = c; }, - pull() { streamReadable.resume(); }, + pull() { fillData() }, cancel(reason) { destroy(streamReadable, reason);