Skip to content

Commit

Permalink
Improve robustness of TransformStreamDefaultController
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell committed Feb 19, 2023
1 parent 905965c commit 7e3c9fb
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 64 deletions.
134 changes: 83 additions & 51 deletions src/workerd/api/streams/standard.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1439,7 +1439,8 @@ ReadableStreamDefaultController::ReadableStreamDefaultController(
UnderlyingSource underlyingSource,
StreamQueuingStrategy queuingStrategy)
: ioContext(tryGetIoContext()),
impl(kj::mv(underlyingSource), kj::mv(queuingStrategy)) {}
impl(kj::mv(underlyingSource), kj::mv(queuingStrategy)),
weakRef(kj::refcounted<WeakRef<ReadableStreamDefaultController>>(*this)) {}

void ReadableStreamDefaultController::start(jsg::Lock& js) {
impl.start(js, JSG_THIS);
Expand Down Expand Up @@ -2748,13 +2749,16 @@ jsg::Promise<void> WritableStreamDefaultController::write(

// ======================================================================================
WritableStreamJsController::WritableStreamJsController()
: ioContext(tryGetIoContext()) {}
: ioContext(tryGetIoContext()),
weakRef(kj::refcounted<WeakRef<WritableStreamJsController>>(*this)) {}

WritableStreamJsController::WritableStreamJsController(StreamStates::Closed closed)
: ioContext(tryGetIoContext()), state(closed) {}
: ioContext(tryGetIoContext()), state(closed),
weakRef(kj::refcounted<WeakRef<WritableStreamJsController>>(*this)) {}

WritableStreamJsController::WritableStreamJsController(StreamStates::Errored errored)
: ioContext(tryGetIoContext()), state(kj::mv(errored)) {}
: ioContext(tryGetIoContext()), state(kj::mv(errored)),
weakRef(kj::refcounted<WeakRef<WritableStreamJsController>>(*this)) {}

jsg::Promise<void> WritableStreamJsController::abort(
jsg::Lock& js,
Expand Down Expand Up @@ -3145,13 +3149,18 @@ TransformStreamDefaultController::TransformStreamDefaultController(jsg::Lock& js
startPromise(js.newPromiseAndResolver<void>()) {}

kj::Maybe<int> TransformStreamDefaultController::getDesiredSize() {
return getReadableController().getDesiredSize();
KJ_IF_MAYBE(readableController, tryGetReadableController()) {
return readableController->getDesiredSize();
}
return nullptr;
}

void TransformStreamDefaultController::enqueue(
jsg::Lock& js,
v8::Local<v8::Value> chunk) {
auto& readableController = getReadableController();
auto& readableController = JSG_REQUIRE_NONNULL(
tryGetReadableController(), TypeError,
"The readable side of this TransformStream is no longer readable.");
JSG_REQUIRE(readableController.canCloseOrEnqueue(), TypeError,
"The readable side of this TransformStream is no longer readable.");
js.tryCatch([&] {
Expand All @@ -3168,51 +3177,49 @@ void TransformStreamDefaultController::enqueue(
}

void TransformStreamDefaultController::error(jsg::Lock& js, v8::Local<v8::Value> reason) {
errorNoIoContextCheck(js, reason);
}

void TransformStreamDefaultController::errorNoIoContextCheck(
jsg::Lock& js,
v8::Local<v8::Value> reason) {
getReadableController().error(js, reason);
KJ_IF_MAYBE(readableController, tryGetReadableController()) {
readableController->error(js, reason);
maybeReadableController = nullptr;
}
errorWritableAndUnblockWrite(js, reason);
}

void TransformStreamDefaultController::terminate(jsg::Lock& js) {
getReadableController().close(js);
KJ_IF_MAYBE(readableController, tryGetReadableController()) {
readableController->close(js);
maybeReadableController = nullptr;
}
errorWritableAndUnblockWrite(js, js.v8TypeError("The transform stream has been terminated"_kj));
}

jsg::Promise<void> TransformStreamDefaultController::write(
jsg::Lock& js,
v8::Local<v8::Value> chunk) {
auto& writableController = getWritableController();

KJ_IF_MAYBE(error, writableController.isErroredOrErroring(js)) {
return js.rejectedPromise<void>(*error);
}
KJ_IF_MAYBE(writableController, tryGetWritableController()) {
KJ_IF_MAYBE(error, writableController->isErroredOrErroring(js)) {
return js.rejectedPromise<void>(*error);
}

KJ_ASSERT(writableController.isWritable());
KJ_ASSERT(writableController->isWritable());

if (backpressure) {
auto chunkRef = js.v8Ref(chunk);
return KJ_ASSERT_NONNULL(maybeBackpressureChange).promise.whenResolved().then(js,
JSG_VISITABLE_LAMBDA((chunkRef = kj::mv(chunkRef), ref = JSG_THIS),
(chunkRef, ref), (jsg::Lock& js) mutable -> jsg::Promise<void> {
KJ_IF_MAYBE(writableController, ref->maybeWritableController) {
KJ_IF_MAYBE(error, writableController->isErroring(js)) {
return js.rejectedPromise<void>(*error);
if (backpressure) {
auto chunkRef = js.v8Ref(chunk);
return KJ_ASSERT_NONNULL(maybeBackpressureChange).promise.whenResolved().then(js,
JSG_VISITABLE_LAMBDA((chunkRef = kj::mv(chunkRef), ref=JSG_THIS),
(chunkRef, ref), (jsg::Lock& js) mutable -> jsg::Promise<void> {
KJ_IF_MAYBE(writableController, ref->tryGetWritableController()) {
KJ_IF_MAYBE(error, writableController->isErroring(js)) {
return js.rejectedPromise<void>(*error);
}
}
KJ_ASSERT(writableController->isWritable());
return ref->performTransform(js, chunkRef.getHandle(js));
} else {
// This case should not happen in general, this is purely defensive.
return js.rejectedPromise<void>(KJ_EXCEPTION(FAILED,
"jsg.TypeError: Writing to the TransformStream failed."));
}
}));
}));
}
return performTransform(js, chunk);
} else {
return js.rejectedPromise<void>(KJ_EXCEPTION(FAILED,
"jsg.TypeError: Writing to the TransformStream failed."));
}
return performTransform(js, chunk);
}

jsg::Promise<void> TransformStreamDefaultController::abort(
Expand All @@ -3223,13 +3230,14 @@ jsg::Promise<void> TransformStreamDefaultController::abort(
}

jsg::Promise<void> TransformStreamDefaultController::close(jsg::Lock& js) {
auto onSuccess = JSG_VISITALBE((ref=JSG_THIS), (ref), (jsg::Lock& js) -> jsg::Promise<void> {
auto& readableController = ref->getReadableController();

// Allows for a graceful close of the readable side. Close will
// complete once all of the queued data is read or the stream
// errors.
readableController.close(js);
auto onSuccess = JSG_VISITABLE_LAMBDA(
(ref=JSG_THIS), (ref), (jsg::Lock& js) -> jsg::Promise<void> {
KJ_IF_MAYBE(readableController, ref->tryGetReadableController()) {
// Allows for a graceful close of the readable side. Close will
// complete once all of the queued data is read or the stream
// errors.
readableController->close(js);
}
return js.resolvedPromise();
});

Expand All @@ -3252,6 +3260,7 @@ jsg::Promise<void> TransformStreamDefaultController::pull(jsg::Lock& js) {
jsg::Promise<void> TransformStreamDefaultController::cancel(
jsg::Lock& js,
v8::Local<v8::Value> reason) {
maybeReadableController = nullptr;
errorWritableAndUnblockWrite(js, reason);
return js.resolvedPromise();
}
Expand All @@ -3276,8 +3285,12 @@ jsg::Promise<void> TransformStreamDefaultController::performTransform(
}
// If we got here, there is no transform algorithm. Per the spec, the default
// behavior then is to just pass along the value untransformed.
enqueue(js, chunk);
return js.resolvedPromise();
return js.tryCatch([&] {
enqueue(js, chunk);
return js.resolvedPromise();
}, [&](jsg::Value exception) {
return js.rejectedPromise<void>(kj::mv(exception));
});
}

void TransformStreamDefaultController::setBackpressure(jsg::Lock& js, bool newBackpressure) {
Expand All @@ -3294,9 +3307,11 @@ void TransformStreamDefaultController::errorWritableAndUnblockWrite(
jsg::Lock& js,
v8::Local<v8::Value> reason) {
algorithms.clear();
auto& writableController = getWritableController();
if (writableController.isWritable()) {
writableController.doError(js, reason);
KJ_IF_MAYBE(writableController, tryGetWritableController()) {
if (writableController->isWritable()) {
writableController->doError(js, reason);
}
maybeWritableController = nullptr;
}
if (backpressure) {
setBackpressure(js, false);
Expand All @@ -3317,16 +3332,17 @@ void TransformStreamDefaultController::init(
jsg::Optional<Transformer> maybeTransformer) {
KJ_ASSERT(maybeReadableController == nullptr);
KJ_ASSERT(maybeWritableController == nullptr);
maybeWritableController = static_cast<WritableStreamJsController&>(writable->getController());
maybeWritableController =
static_cast<WritableStreamJsController&>(writable->getController()).getWeakRef();

// The TransformStreamDefaultController needs to have a reference to the underlying controller
// and not just the readable because if the readable is teed, or passed off to source, etc,
// the TransformStream has to make sure that it can continue to interface with the controller
// to push data into it.
auto& readableController = static_cast<ReadableStreamJsController&>(readable->getController());
auto readableRef = KJ_ASSERT_NONNULL(readableController.getController());
maybeReadableController = kj::mv(KJ_ASSERT_NONNULL(
readableRef.tryGet<jsg::Ref<ReadableStreamDefaultController>>()));
maybeReadableController = KJ_ASSERT_NONNULL(
readableRef.tryGet<jsg::Ref<ReadableStreamDefaultController>>())->getWeakRef();

auto transformer = kj::mv(maybeTransformer).orDefault({});

Expand Down Expand Up @@ -3362,4 +3378,20 @@ void TransformStreamDefaultController::init(
JSG_THIS);
}

kj::Maybe<ReadableStreamDefaultController&>
TransformStreamDefaultController::tryGetReadableController() {
KJ_IF_MAYBE(controller, maybeReadableController) {
return (*controller)->tryGet();
}
return nullptr;
}

kj::Maybe<WritableStreamJsController&>
TransformStreamDefaultController::tryGetWritableController() {
KJ_IF_MAYBE(controller, maybeWritableController) {
return (*controller)->tryGet();
}
return nullptr;
}

} // namespace workerd::api
61 changes: 48 additions & 13 deletions src/workerd/api/streams/standard.h
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,20 @@ class WritableImpl {

} // namespace jscontroller

template <typename T>
class WeakRef: public kj::Refcounted {
// Used to allow the TransformStreamDefaultController to hold safe
// weak refs to the ReadableStreamDefaultController and WritableStreamJsController.
public:
WeakRef(T& ref) : ref(ref) {}
KJ_DISALLOW_COPY_AND_MOVE(WeakRef);
kj::Maybe<T&> tryGet() { return ref; }
private:
void reset() { ref = nullptr; }
kj::Maybe<T&> ref;
friend T;
};

// =======================================================================================

class ReadableStreamDefaultController: public jsg::Object {
Expand All @@ -673,6 +687,7 @@ class ReadableStreamDefaultController: public jsg::Object {

ReadableStreamDefaultController(UnderlyingSource underlyingSource,
StreamQueuingStrategy queuingStrategy);
~ReadableStreamDefaultController() noexcept(false) { weakRef->reset(); }

void start(jsg::Lock& js);

Expand Down Expand Up @@ -706,9 +721,14 @@ class ReadableStreamDefaultController: public jsg::Object {
});
}

kj::Own<WeakRef<ReadableStreamDefaultController>> getWeakRef() {
return kj::addRef(*weakRef);
}

private:
kj::Maybe<IoContext&> ioContext;
ReadableImpl impl;
kj::Own<WeakRef<ReadableStreamDefaultController>> weakRef;

void visitForGc(jsg::GcVisitor& visitor);
};
Expand Down Expand Up @@ -1043,7 +1063,7 @@ class WritableStreamJsController: public WritableStreamController,

KJ_DISALLOW_COPY_AND_MOVE(WritableStreamJsController);

~WritableStreamJsController() noexcept(false) override {}
~WritableStreamJsController() noexcept(false) override { weakRef->reset(); }

jsg::Promise<void> abort(jsg::Lock& js,
jsg::Optional<v8::Local<v8::Value>> reason) override;
Expand Down Expand Up @@ -1099,6 +1119,10 @@ class WritableStreamJsController: public WritableStreamController,

void visitForGc(jsg::GcVisitor& visitor) override;

kj::Own<WeakRef<WritableStreamJsController>> getWeakRef() {
return kj::addRef(*weakRef);
}

private:
jsg::Promise<void> pipeLoop(jsg::Lock& js);

Expand All @@ -1107,13 +1131,30 @@ class WritableStreamJsController: public WritableStreamController,
kj::OneOf<StreamStates::Closed, StreamStates::Errored, Controller> state = StreamStates::Closed();
WritableLockImpl lock;
kj::Maybe<jsg::Promise<void>> maybeAbortPromise;
kj::Own<WeakRef<WritableStreamJsController>> weakRef;

friend WritableLockImpl;
};

// =======================================================================================

class TransformStreamDefaultController: public jsg::Object {
// The relationship between the TransformStreamDefaultController and the
// readable/writable streams associated with it can be complicated.
// Strong references to the TransformStreamDefaultController are held by
// the *algorithms* passed into the readable and writable streams using
// JSG_VISITABLE_LAMBDAs. When those algorithms are cleared, the strong
// references holding the TransformStreamDefaultController are freed.
// However, user code can do silly things like hold the Transform controller
// long after both the readable and writable sides have been gc'd.
//
// We do not want to create a strong reference cycle between the various
// controllers so we use weak refs within the transform controller to
// safely reference the readable and writable sides. If either side goes
// away cleanly (using the algorithms) the weak references are cleared.
// If either side goes away due to garbage collection while the transform
// controller is still alive, the weak references are cleared. The transform
// controller then safely handles the disappearance of either side.
public:
TransformStreamDefaultController(jsg::Lock& js);

Expand Down Expand Up @@ -1182,20 +1223,14 @@ class TransformStreamDefaultController: public jsg::Object {
v8::Local<v8::Value> chunk);
void setBackpressure(jsg::Lock& js, bool newBackpressure);

inline ReadableStreamDefaultController& getReadableController() {
return *KJ_ASSERT_NONNULL(maybeReadableController);
}

inline WritableStreamJsController& getWritableController() {
return KJ_ASSERT_NONNULL(maybeWritableController);
}

void errorNoIoContextCheck(jsg::Lock& js, v8::Local<v8::Value> reason);

kj::Maybe<IoContext&> ioContext;
jsg::PromiseResolverPair<void> startPromise;
kj::Maybe<jsg::Ref<ReadableStreamDefaultController>> maybeReadableController;
kj::Maybe<WritableStreamJsController&> maybeWritableController;

kj::Maybe<ReadableStreamDefaultController&> tryGetReadableController();
kj::Maybe<WritableStreamJsController&> tryGetWritableController();

kj::Maybe<kj::Own<WeakRef<ReadableStreamDefaultController>>> maybeReadableController;
kj::Maybe<kj::Own<WeakRef<WritableStreamJsController>>> maybeWritableController;
Algorithms algorithms;
bool backpressure = false;
kj::Maybe<jsg::PromiseResolverPair<void>> maybeBackpressureChange;
Expand Down

0 comments on commit 7e3c9fb

Please sign in to comment.