diff --git a/.changeset/tough-lobsters-guess.md b/.changeset/tough-lobsters-guess.md
new file mode 100644
index 00000000000..50a130bd188
--- /dev/null
+++ b/.changeset/tough-lobsters-guess.md
@@ -0,0 +1,5 @@
+---
+"effect": minor
+---
+
+add option to .releaseLock a ReadableStream on finalization
diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts
index bbcf0bd7026..b06fd6ca747 100644
--- a/packages/effect/src/Stream.ts
+++ b/packages/effect/src/Stream.ts
@@ -2120,10 +2120,16 @@ export const fromTQueue: (queue: TDequeue) => Stream = internal.fromTQu
* @since 2.0.0
* @category constructors
*/
-export const fromReadableStream: (
- evaluate: LazyArg>,
- onError: (error: unknown) => E
-) => Stream = internal.fromReadableStream
+export const fromReadableStream: {
+ (
+ options: {
+ readonly evaluate: LazyArg>
+ readonly onError: (error: unknown) => E
+ readonly releaseLockOnEnd?: boolean | undefined
+ }
+ ): Stream
+ (evaluate: LazyArg>, onError: (error: unknown) => E): Stream
+} = internal.fromReadableStream
/**
* Creates a stream from a `ReadableStreamBYOBReader`.
@@ -2134,11 +2140,21 @@ export const fromReadableStream: (
* @since 2.0.0
* @category constructors
*/
-export const fromReadableStreamByob: (
- evaluate: LazyArg>,
- onError: (error: unknown) => E,
- allocSize?: number
-) => Stream = internal.fromReadableStreamByob
+export const fromReadableStreamByob: {
+ (
+ options: {
+ readonly evaluate: LazyArg>
+ readonly onError: (error: unknown) => E
+ readonly bufferSize?: number | undefined
+ readonly releaseLockOnEnd?: boolean | undefined
+ }
+ ): Stream
+ (
+ evaluate: LazyArg>,
+ onError: (error: unknown) => E,
+ allocSize?: number
+ ): Stream
+} = internal.fromReadableStreamByob
/**
* Creates a stream from a `Schedule` that does not require any further
diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts
index 76805839833..76bc6bbbf48 100644
--- a/packages/effect/src/internal/stream.ts
+++ b/packages/effect/src/internal/stream.ts
@@ -18,7 +18,7 @@ import * as MergeDecision from "../MergeDecision.js"
import * as Option from "../Option.js"
import type * as Order from "../Order.js"
import { pipeArguments } from "../Pipeable.js"
-import { hasProperty, isTagged, type Predicate, type Refinement } from "../Predicate.js"
+import { hasProperty, type Predicate, type Refinement } from "../Predicate.js"
import * as PubSub from "../PubSub.js"
import * as Queue from "../Queue.js"
import * as RcRef from "../RcRef.js"
@@ -3261,14 +3261,38 @@ export const fromSchedule = (schedule: Schedule.Schedule):
)
/** @internal */
-export const fromReadableStream = (
- evaluate: LazyArg>,
- onError: (error: unknown) => E
-): Stream.Stream =>
- unwrapScoped(Effect.map(
+export const fromReadableStream: {
+ (
+ options: {
+ readonly evaluate: LazyArg>
+ readonly onError: (error: unknown) => E
+ readonly releaseLockOnEnd?: boolean | undefined
+ }
+ ): Stream.Stream
+ (
+ evaluate: LazyArg>,
+ onError: (error: unknown) => E
+ ): Stream.Stream
+} = (
+ ...args: [options: {
+ readonly evaluate: LazyArg>
+ readonly onError: (error: unknown) => E
+ readonly releaseLockOnEnd?: boolean | undefined
+ }] | [
+ evaluate: LazyArg>,
+ onError: (error: unknown) => E
+ ]
+): Stream.Stream => {
+ const evaluate = args.length === 1 ? args[0].evaluate : args[0]
+ const onError = args.length === 1 ? args[0].onError : args[1]
+ const releaseLockOnEnd = args.length === 1 ? args[0].releaseLockOnEnd === true : false
+ return unwrapScoped(Effect.map(
Effect.acquireRelease(
Effect.sync(() => evaluate().getReader()),
- (reader) => Effect.promise(() => reader.cancel())
+ (reader) =>
+ releaseLockOnEnd
+ ? Effect.sync(() => reader.releaseLock())
+ : Effect.promise(() => reader.cancel())
),
(reader) =>
repeatEffectOption(
@@ -3281,34 +3305,59 @@ export const fromReadableStream = (
)
)
))
+}
/** @internal */
-export const fromReadableStreamByob = (
- evaluate: LazyArg>,
- onError: (error: unknown) => E,
- allocSize = 4096
-): Stream.Stream =>
- unwrapScoped(Effect.map(
+export const fromReadableStreamByob: {
+ (
+ options: {
+ readonly evaluate: LazyArg>
+ readonly onError: (error: unknown) => E
+ readonly bufferSize?: number | undefined
+ readonly releaseLockOnEnd?: boolean | undefined
+ }
+ ): Stream.Stream
+ (
+ evaluate: LazyArg>,
+ onError: (error: unknown) => E,
+ allocSize?: number
+ ): Stream.Stream
+} = (
+ ...args: [options: {
+ readonly evaluate: LazyArg>
+ readonly onError: (error: unknown) => E
+ readonly bufferSize?: number | undefined
+ readonly releaseLockOnEnd?: boolean | undefined
+ }] | [
+ evaluate: LazyArg>,
+ onError: (error: unknown) => E,
+ allocSize?: number | undefined
+ ]
+): Stream.Stream => {
+ const evaluate = args.length === 1 ? args[0].evaluate : args[0]
+ const onError = args.length === 1 ? args[0].onError : args[1]
+ const allocSize = (args.length === 1 ? args[0].bufferSize : args[2]) ?? 4096
+ const releaseLockOnEnd = args.length === 1 ? args[0].releaseLockOnEnd === true : false
+ return unwrapScoped(Effect.map(
Effect.acquireRelease(
Effect.sync(() => evaluate().getReader({ mode: "byob" })),
- (reader) => Effect.promise(() => reader.cancel())
+ (reader) => releaseLockOnEnd ? Effect.sync(() => reader.releaseLock()) : Effect.promise(() => reader.cancel())
),
(reader) =>
catchAll(
forever(readChunkStreamByobReader(reader, onError, allocSize)),
- (error) => isTagged(error, "EOF") ? empty : fail(error as E)
+ (error) => error === EOF ? empty : fail(error)
)
))
-
-interface EOF {
- readonly _tag: "EOF"
}
+const EOF = Symbol.for("effect/Stream/EOF")
+
const readChunkStreamByobReader = (
reader: ReadableStreamBYOBReader,
onError: (error: unknown) => E,
size: number
-): Stream.Stream => {
+): Stream.Stream => {
const buffer = new ArrayBuffer(size)
return paginateEffect(0, (offset) =>
Effect.flatMap(
@@ -3318,7 +3367,7 @@ const readChunkStreamByobReader = (
}),
({ done, value }) => {
if (done) {
- return Effect.fail({ _tag: "EOF" })
+ return Effect.fail(EOF)
}
const newOffset = offset + value.byteLength
return Effect.succeed([
diff --git a/packages/effect/test/Stream/constructors.test.ts b/packages/effect/test/Stream/constructors.test.ts
index 15926537f87..f5aea08fec4 100644
--- a/packages/effect/test/Stream/constructors.test.ts
+++ b/packages/effect/test/Stream/constructors.test.ts
@@ -229,10 +229,10 @@ describe("Stream", () => {
}
const result = yield* $(
- Stream.fromReadableStream(
- () => new ReadableStream(new NumberSource()),
- (error) => new FromReadableStreamError(error)
- ),
+ Stream.fromReadableStream({
+ evaluate: () => new ReadableStream(new NumberSource()),
+ onError: (error) => new FromReadableStreamError(error)
+ }),
Stream.take(10),
Stream.runCollect
)