diff --git a/src/workerd/api/streams/standard.c++ b/src/workerd/api/streams/standard.c++ index 8f0f442fda5..8edaf600427 100644 --- a/src/workerd/api/streams/standard.c++ +++ b/src/workerd/api/streams/standard.c++ @@ -1439,7 +1439,8 @@ ReadableStreamDefaultController::ReadableStreamDefaultController( UnderlyingSource underlyingSource, StreamQueuingStrategy queuingStrategy) : ioContext(tryGetIoContext()), - impl(kj::mv(underlyingSource), kj::mv(queuingStrategy)) {} + impl(kj::mv(underlyingSource), kj::mv(queuingStrategy)), + weakRef(kj::refcounted>(*this)) {} void ReadableStreamDefaultController::start(jsg::Lock& js) { impl.start(js, JSG_THIS); @@ -2748,13 +2749,16 @@ jsg::Promise WritableStreamDefaultController::write( // ====================================================================================== WritableStreamJsController::WritableStreamJsController() - : ioContext(tryGetIoContext()) {} + : ioContext(tryGetIoContext()), + weakRef(kj::refcounted>(*this)) {} WritableStreamJsController::WritableStreamJsController(StreamStates::Closed closed) - : ioContext(tryGetIoContext()), state(closed) {} + : ioContext(tryGetIoContext()), state(closed), + weakRef(kj::refcounted>(*this)) {} WritableStreamJsController::WritableStreamJsController(StreamStates::Errored errored) - : ioContext(tryGetIoContext()), state(kj::mv(errored)) {} + : ioContext(tryGetIoContext()), state(kj::mv(errored)), + weakRef(kj::refcounted>(*this)) {} jsg::Promise WritableStreamJsController::abort( jsg::Lock& js, @@ -3145,13 +3149,18 @@ TransformStreamDefaultController::TransformStreamDefaultController(jsg::Lock& js startPromise(js.newPromiseAndResolver()) {} kj::Maybe TransformStreamDefaultController::getDesiredSize() { - return getReadableController().getDesiredSize(); + KJ_IF_MAYBE(readableController, tryGetReadableController()) { + return readableController->getDesiredSize(); + } + return nullptr; } void TransformStreamDefaultController::enqueue( jsg::Lock& js, v8::Local chunk) { - auto& readableController = getReadableController(); + auto& readableController = JSG_REQUIRE_NONNULL( + tryGetReadableController(), TypeError, + "The readable side of this TransformStream is no longer readable."); JSG_REQUIRE(readableController.canCloseOrEnqueue(), TypeError, "The readable side of this TransformStream is no longer readable."); js.tryCatch([&] { @@ -3168,51 +3177,49 @@ void TransformStreamDefaultController::enqueue( } void TransformStreamDefaultController::error(jsg::Lock& js, v8::Local reason) { - errorNoIoContextCheck(js, reason); -} - -void TransformStreamDefaultController::errorNoIoContextCheck( - jsg::Lock& js, - v8::Local reason) { - getReadableController().error(js, reason); + KJ_IF_MAYBE(readableController, tryGetReadableController()) { + readableController->error(js, reason); + maybeReadableController = nullptr; + } errorWritableAndUnblockWrite(js, reason); } void TransformStreamDefaultController::terminate(jsg::Lock& js) { - getReadableController().close(js); + KJ_IF_MAYBE(readableController, tryGetReadableController()) { + readableController->close(js); + maybeReadableController = nullptr; + } errorWritableAndUnblockWrite(js, js.v8TypeError("The transform stream has been terminated"_kj)); } jsg::Promise TransformStreamDefaultController::write( jsg::Lock& js, v8::Local chunk) { - auto& writableController = getWritableController(); - - KJ_IF_MAYBE(error, writableController.isErroredOrErroring(js)) { - return js.rejectedPromise(*error); - } + KJ_IF_MAYBE(writableController, tryGetWritableController()) { + KJ_IF_MAYBE(error, writableController->isErroredOrErroring(js)) { + return js.rejectedPromise(*error); + } - KJ_ASSERT(writableController.isWritable()); + KJ_ASSERT(writableController->isWritable()); - if (backpressure) { - auto chunkRef = js.v8Ref(chunk); - return KJ_ASSERT_NONNULL(maybeBackpressureChange).promise.whenResolved().then(js, - JSG_VISITABLE_LAMBDA((chunkRef = kj::mv(chunkRef), ref = JSG_THIS), - (chunkRef, ref), (jsg::Lock& js) mutable -> jsg::Promise { - KJ_IF_MAYBE(writableController, ref->maybeWritableController) { - KJ_IF_MAYBE(error, writableController->isErroring(js)) { - return js.rejectedPromise(*error); + if (backpressure) { + auto chunkRef = js.v8Ref(chunk); + return KJ_ASSERT_NONNULL(maybeBackpressureChange).promise.whenResolved().then(js, + JSG_VISITABLE_LAMBDA((chunkRef = kj::mv(chunkRef), ref=JSG_THIS), + (chunkRef, ref), (jsg::Lock& js) mutable -> jsg::Promise { + KJ_IF_MAYBE(writableController, ref->tryGetWritableController()) { + KJ_IF_MAYBE(error, writableController->isErroring(js)) { + return js.rejectedPromise(*error); + } } - KJ_ASSERT(writableController->isWritable()); return ref->performTransform(js, chunkRef.getHandle(js)); - } else { - // This case should not happen in general, this is purely defensive. - return js.rejectedPromise(KJ_EXCEPTION(FAILED, - "jsg.TypeError: Writing to the TransformStream failed.")); - } - })); + })); + } + return performTransform(js, chunk); + } else { + return js.rejectedPromise(KJ_EXCEPTION(FAILED, + "jsg.TypeError: Writing to the TransformStream failed.")); } - return performTransform(js, chunk); } jsg::Promise TransformStreamDefaultController::abort( @@ -3223,13 +3230,14 @@ jsg::Promise TransformStreamDefaultController::abort( } jsg::Promise TransformStreamDefaultController::close(jsg::Lock& js) { - auto onSuccess = JSG_VISITALBE((ref=JSG_THIS), (ref), (jsg::Lock& js) -> jsg::Promise { - auto& readableController = ref->getReadableController(); - - // Allows for a graceful close of the readable side. Close will - // complete once all of the queued data is read or the stream - // errors. - readableController.close(js); + auto onSuccess = JSG_VISITABLE_LAMBDA( + (ref=JSG_THIS), (ref), (jsg::Lock& js) -> jsg::Promise { + KJ_IF_MAYBE(readableController, ref->tryGetReadableController()) { + // Allows for a graceful close of the readable side. Close will + // complete once all of the queued data is read or the stream + // errors. + readableController->close(js); + } return js.resolvedPromise(); }); @@ -3252,6 +3260,7 @@ jsg::Promise TransformStreamDefaultController::pull(jsg::Lock& js) { jsg::Promise TransformStreamDefaultController::cancel( jsg::Lock& js, v8::Local reason) { + maybeReadableController = nullptr; errorWritableAndUnblockWrite(js, reason); return js.resolvedPromise(); } @@ -3276,8 +3285,12 @@ jsg::Promise TransformStreamDefaultController::performTransform( } // If we got here, there is no transform algorithm. Per the spec, the default // behavior then is to just pass along the value untransformed. - enqueue(js, chunk); - return js.resolvedPromise(); + return js.tryCatch([&] { + enqueue(js, chunk); + return js.resolvedPromise(); + }, [&](jsg::Value exception) { + return js.rejectedPromise(kj::mv(exception)); + }); } void TransformStreamDefaultController::setBackpressure(jsg::Lock& js, bool newBackpressure) { @@ -3294,9 +3307,11 @@ void TransformStreamDefaultController::errorWritableAndUnblockWrite( jsg::Lock& js, v8::Local reason) { algorithms.clear(); - auto& writableController = getWritableController(); - if (writableController.isWritable()) { - writableController.doError(js, reason); + KJ_IF_MAYBE(writableController, tryGetWritableController()) { + if (writableController->isWritable()) { + writableController->doError(js, reason); + } + maybeWritableController = nullptr; } if (backpressure) { setBackpressure(js, false); @@ -3317,7 +3332,8 @@ void TransformStreamDefaultController::init( jsg::Optional maybeTransformer) { KJ_ASSERT(maybeReadableController == nullptr); KJ_ASSERT(maybeWritableController == nullptr); - maybeWritableController = static_cast(writable->getController()); + maybeWritableController = + static_cast(writable->getController()).getWeakRef(); // The TransformStreamDefaultController needs to have a reference to the underlying controller // and not just the readable because if the readable is teed, or passed off to source, etc, @@ -3325,8 +3341,8 @@ void TransformStreamDefaultController::init( // to push data into it. auto& readableController = static_cast(readable->getController()); auto readableRef = KJ_ASSERT_NONNULL(readableController.getController()); - maybeReadableController = kj::mv(KJ_ASSERT_NONNULL( - readableRef.tryGet>())); + maybeReadableController = KJ_ASSERT_NONNULL( + readableRef.tryGet>())->getWeakRef(); auto transformer = kj::mv(maybeTransformer).orDefault({}); @@ -3362,4 +3378,20 @@ void TransformStreamDefaultController::init( JSG_THIS); } +kj::Maybe +TransformStreamDefaultController::tryGetReadableController() { + KJ_IF_MAYBE(controller, maybeReadableController) { + return (*controller)->tryGet(); + } + return nullptr; +} + +kj::Maybe +TransformStreamDefaultController::tryGetWritableController() { + KJ_IF_MAYBE(controller, maybeWritableController) { + return (*controller)->tryGet(); + } + return nullptr; +} + } // namespace workerd::api diff --git a/src/workerd/api/streams/standard.h b/src/workerd/api/streams/standard.h index d9597e55b93..b2dc0b15bad 100644 --- a/src/workerd/api/streams/standard.h +++ b/src/workerd/api/streams/standard.h @@ -661,6 +661,20 @@ class WritableImpl { } // namespace jscontroller +template +class WeakRef: public kj::Refcounted { + // Used to allow the TransformStreamDefaultController to hold safe + // weak refs to the ReadableStreamDefaultController and WritableStreamJsController. +public: + WeakRef(T& ref) : ref(ref) {} + KJ_DISALLOW_COPY_AND_MOVE(WeakRef); + kj::Maybe tryGet() { return ref; } +private: + void reset() { ref = nullptr; } + kj::Maybe ref; + friend T; +}; + // ======================================================================================= class ReadableStreamDefaultController: public jsg::Object { @@ -673,6 +687,7 @@ class ReadableStreamDefaultController: public jsg::Object { ReadableStreamDefaultController(UnderlyingSource underlyingSource, StreamQueuingStrategy queuingStrategy); + ~ReadableStreamDefaultController() noexcept(false) { weakRef->reset(); } void start(jsg::Lock& js); @@ -706,9 +721,14 @@ class ReadableStreamDefaultController: public jsg::Object { }); } + kj::Own> getWeakRef() { + return kj::addRef(*weakRef); + } + private: kj::Maybe ioContext; ReadableImpl impl; + kj::Own> weakRef; void visitForGc(jsg::GcVisitor& visitor); }; @@ -1043,7 +1063,7 @@ class WritableStreamJsController: public WritableStreamController, KJ_DISALLOW_COPY_AND_MOVE(WritableStreamJsController); - ~WritableStreamJsController() noexcept(false) override {} + ~WritableStreamJsController() noexcept(false) override { weakRef->reset(); } jsg::Promise abort(jsg::Lock& js, jsg::Optional> reason) override; @@ -1099,6 +1119,10 @@ class WritableStreamJsController: public WritableStreamController, void visitForGc(jsg::GcVisitor& visitor) override; + kj::Own> getWeakRef() { + return kj::addRef(*weakRef); + } + private: jsg::Promise pipeLoop(jsg::Lock& js); @@ -1107,6 +1131,7 @@ class WritableStreamJsController: public WritableStreamController, kj::OneOf state = StreamStates::Closed(); WritableLockImpl lock; kj::Maybe> maybeAbortPromise; + kj::Own> weakRef; friend WritableLockImpl; }; @@ -1114,6 +1139,22 @@ class WritableStreamJsController: public WritableStreamController, // ======================================================================================= class TransformStreamDefaultController: public jsg::Object { + // The relationship between the TransformStreamDefaultController and the + // readable/writable streams associated with it can be complicated. + // Strong references to the TransformStreamDefaultController are held by + // the *algorithms* passed into the readable and writable streams using + // JSG_VISITABLE_LAMBDAs. When those algorithms are cleared, the strong + // references holding the TransformStreamDefaultController are freed. + // However, user code can do silly things like hold the Transform controller + // long after both the readable and writable sides have been gc'd. + // + // We do not want to create a strong reference cycle between the various + // controllers so we use weak refs within the transform controller to + // safely reference the readable and writable sides. If either side goes + // away cleanly (using the algorithms) the weak references are cleared. + // If either side goes away due to garbage collection while the transform + // controller is still alive, the weak references are cleared. The transform + // controller then safely handles the disappearance of either side. public: TransformStreamDefaultController(jsg::Lock& js); @@ -1182,20 +1223,14 @@ class TransformStreamDefaultController: public jsg::Object { v8::Local chunk); void setBackpressure(jsg::Lock& js, bool newBackpressure); - inline ReadableStreamDefaultController& getReadableController() { - return *KJ_ASSERT_NONNULL(maybeReadableController); - } - - inline WritableStreamJsController& getWritableController() { - return KJ_ASSERT_NONNULL(maybeWritableController); - } - - void errorNoIoContextCheck(jsg::Lock& js, v8::Local reason); - kj::Maybe ioContext; jsg::PromiseResolverPair startPromise; - kj::Maybe> maybeReadableController; - kj::Maybe maybeWritableController; + + kj::Maybe tryGetReadableController(); + kj::Maybe tryGetWritableController(); + + kj::Maybe>> maybeReadableController; + kj::Maybe>> maybeWritableController; Algorithms algorithms; bool backpressure = false; kj::Maybe> maybeBackpressureChange;