Skip to content

Commit

Permalink
ensure fibers are interrupted in Stream.mergeWith (#3175)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart authored Jul 4, 2024
1 parent 359ff8a commit a9c4fb3
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 6 deletions.
5 changes: 5 additions & 0 deletions .changeset/lazy-donkeys-do.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": patch
---

ensure fibers are interrupted in Stream.mergeWith
13 changes: 7 additions & 6 deletions packages/effect/src/internal/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1607,11 +1607,12 @@ export const mergeWith = dual<
(input) => {
const queueReader = fromInput(input)
return Effect.map(
Effect.zip(
Effect.all([
toPull(core.pipeTo(queueReader, self)),
toPull(core.pipeTo(queueReader, options.other))
),
([pullL, pullR]) => {
toPull(core.pipeTo(queueReader, options.other)),
Effect.scope
]),
([pullL, pullR, scope]) => {
type State = MergeState.MergeState<
Env | Env1,
OutErr,
Expand Down Expand Up @@ -1806,8 +1807,8 @@ export const mergeWith = dual<
return pipe(
core.fromEffect(
Effect.zipWith(
Effect.forkDaemon(pullL),
Effect.forkDaemon(pullR),
Effect.forkIn(pullL, scope),
Effect.forkIn(pullR, scope),
(left, right): State =>
mergeState.BothRunning<
Env | Env1,
Expand Down
24 changes: 24 additions & 0 deletions packages/effect/test/Stream/interrupting.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Schedule } from "effect"
import * as Chunk from "effect/Chunk"
import * as Deferred from "effect/Deferred"
import * as Duration from "effect/Duration"
Expand Down Expand Up @@ -164,4 +165,27 @@ describe("Stream", () => {
const result = yield* $(Fiber.join(fiber))
assert.deepStrictEqual(Array.from(result), [])
}))

it.effect("interruptWhen - interrupts the effect", () =>
Effect.gen(function*() {
let interrupted = false
const effect = Effect.never.pipe(
Effect.onInterrupt(() =>
Effect.sync(() => {
interrupted = true
})
)
)

const fiber = yield* Stream.fromSchedule(Schedule.spaced("1 second")).pipe(
Stream.interruptWhen(effect),
Stream.take(1),
Stream.runDrain,
Effect.fork
)
yield* TestClock.adjust("1 seconds")
yield* fiber.await

assert.strictEqual(interrupted, true)
}))
})

0 comments on commit a9c4fb3

Please sign in to comment.