Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node:fs: allow WriteStream._write to be overwritten #8237

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 118 additions & 4 deletions src/js/node/fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ const promises = require("node:fs/promises");
const Stream = require("node:stream");

var _writeStreamPathFastPathSymbol = Symbol.for("Bun.NodeWriteStreamFastPath");
const kFs = Symbol("kFs");
const kIoDone = Symbol("kIoDone");
const kIsPerformingIO = Symbol("kIsPerformingIO");
var _fs = Symbol.for("#fs");

const constants = $processBindingConstants.fs;
Expand Down Expand Up @@ -473,7 +476,6 @@ const readStreamPathOrFdSymbol = Symbol.for("Bun.NodeReadStreamPathOrFd");
const writeStreamSymbol = Symbol.for("Bun.NodeWriteStream");
var writeStreamPathFastPathSymbol = Symbol.for("Bun.NodeWriteStreamFastPath");
var writeStreamPathFastPathCallSymbol = Symbol.for("Bun.NodeWriteStreamFastPathCall");
var kIoDone = Symbol.for("kIoDone");

var defaultReadStreamOptions = {
file: undefined,
Expand Down Expand Up @@ -901,6 +903,7 @@ var WriteStreamClass = (WriteStream = function WriteStream(path, options = defau

tempThis.path = path;
tempThis.fd = null;
this[kFs] = options.fs || fs;
tempThis[_writeStreamPathFastPathSymbol] =
autoClose &&
(start === undefined || start === 0) &&
Expand Down Expand Up @@ -1117,16 +1120,127 @@ WriteStreamPrototype.write = function write(chunk, encoding, cb) {
};

// Do not inherit
WriteStreamPrototype._write = undefined;
WriteStreamPrototype._writev = undefined;
WriteStream.prototype._write = function (data, encoding, cb) {
if (data == null) {
const e = new Error("write: data must not be null");
e.code = "ERR_STREAM_NULL_VALUES";
throw e;
}
this[kIsPerformingIO] = true;
writeAll.$call(this, data, data.length, this.pos, er => {
this[kIsPerformingIO] = false;
if (this.destroyed) {
// Tell ._destroy() that it's safe to close the fd now.
cb(er);
return this.emit(kIoDone, er);
}

cb(er);
});

if (this.pos !== undefined) this.pos += data.length;
};
function writeAll(data, size, pos, cb, retries = 0) {
this[kFs].write(this.fd, data, 0, size, pos, (er, bytesWritten, buffer) => {
// No data currently available and operation should be retried later.
if (er?.code === "EAGAIN") {
er = null;
bytesWritten = 0;
}

if (this.destroyed || er) {
return cb(er || new Error("ERR_STREAM_DESTROYED: Cannot call write after a stream was destroyed"));
}

this.bytesWritten += bytesWritten;

retries = bytesWritten ? 0 : retries + 1;
size -= bytesWritten;
pos += bytesWritten;

// Try writing non-zero number of bytes up to 5 times.
if (retries > 5) {
cb(new ERR_SYSTEM_ERROR("write failed"));
} else if (size) {
writeAll.$call(this, buffer.slice(bytesWritten), size, pos, cb, retries);
} else {
cb();
}
});
}

WriteStream.prototype._writev = function (data, cb) {
const len = data.length;
const chunks = new Array(len);
let size = 0;

for (let i = 0; i < len; i++) {
const chunk = data[i].chunk;

chunks[i] = chunk;
size += chunk.length;
}

this[kIsPerformingIO] = true;
writevAll.$call(this, chunks, size, this.pos, er => {
this[kIsPerformingIO] = false;
if (this.destroyed) {
// Tell ._destroy() that it's safe to close the fd now.
cb(er);
return this.emit(kIoDone, er);
}

cb(er);
});

if (this.pos !== undefined) this.pos += size;
};
function writevAll(chunks, size, pos, cb, retries = 0) {
this[kFs].writev(this.fd, chunks, this.pos, (er, bytesWritten, buffers) => {
// No data currently available and operation should be retried later.
if (er?.code === "EAGAIN") {
er = null;
bytesWritten = 0;
}

if (this.destroyed || er) {
return cb(er || new ERR_STREAM_DESTROYED("writev"));
}

this.bytesWritten += bytesWritten;

retries = bytesWritten ? 0 : retries + 1;
size -= bytesWritten;
pos += bytesWritten;

// Try writing non-zero number of bytes up to 5 times.
if (retries > 5) {
cb(new ERR_SYSTEM_ERROR("writev failed"));
} else if (size) {
writevAll.$call(this, [Buffer.concat(buffers).slice(bytesWritten)], size, pos, cb, retries);
} else {
cb();
}
});
}

WriteStreamPrototype.end = function end(chunk, encoding, cb) {
var native = this.pos === undefined;
return NativeWritable.prototype.end.$call(this, chunk, encoding, cb, native);
};

WriteStreamPrototype._destroy = function _destroy(err, cb) {
this.close(err, cb);
// Usually for async IO it is safe to close a file descriptor
// even when there are pending operations. However, due to platform
// differences file IO is implemented using synchronous operations
// running in a thread pool. Therefore, file descriptors are not safe
// to close while used in a pending read or write operation. Wait for
// any pending IO (kIsPerformingIO) to complete (kIoDone).
if (this[kIsPerformingIO]) {
this.once(kIoDone, er => close(this, err || er, cb));
} else {
close(this, err, cb);
}
};

function WriteStream_errorOrDestroy(err) {
Expand Down
Loading