diff --git a/.changeset/cold-cougars-pretend.md b/.changeset/cold-cougars-pretend.md
new file mode 100644
index 00000000000..df705b7505a
--- /dev/null
+++ b/.changeset/cold-cougars-pretend.md
@@ -0,0 +1,5 @@
+---
+"effect": minor
+---
+
+add TSubscriptionRef
diff --git a/.changeset/shiny-squids-sell.md b/.changeset/shiny-squids-sell.md
new file mode 100644
index 00000000000..0e68f8a95fd
--- /dev/null
+++ b/.changeset/shiny-squids-sell.md
@@ -0,0 +1,5 @@
+---
+"effect": minor
+---
+
+add Stream.fromTQueue & Stream.fromTPubSub
diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts
index 0441ef80125..75bbfb5e709 100644
--- a/packages/effect/src/Stream.ts
+++ b/packages/effect/src/Stream.ts
@@ -29,6 +29,8 @@ import type * as Sink from "./Sink.js"
import type * as Emit from "./StreamEmit.js"
import type * as HaltStrategy from "./StreamHaltStrategy.js"
import type * as Take from "./Take.js"
+import type { TPubSub } from "./TPubSub.js"
+import type { TDequeue } from "./TQueue.js"
import type * as Tracer from "./Tracer.js"
import type { Covariant, NoInfer, TupleOf } from "./Types.js"
import type * as Unify from "./Unify.js"
@@ -2013,6 +2015,14 @@ export const fromPubSub: {
): Stream
} = internal.fromPubSub
+/**
+ * Creates a stream from a subscription to a `TPubSub`.
+ *
+ * @since 3.10.0
+ * @category constructors
+ */
+export const fromTPubSub: (pubsub: TPubSub) => Stream = internal.fromTPubSub
+
/**
* Creates a new `Stream` from an iterable collection of values.
*
@@ -2094,6 +2104,14 @@ export const fromQueue: (
}
) => Stream = internal.fromQueue
+/**
+ * Creates a stream from a TQueue of values
+ *
+ * @since 3.10.0
+ * @category constructors
+ */
+export const fromTQueue: (queue: TDequeue) => Stream = internal.fromTQueue
+
/**
* Creates a stream from a `ReadableStream`.
*
diff --git a/packages/effect/src/TPubSub.ts b/packages/effect/src/TPubSub.ts
index 1a15cfa1b6a..f6b7b045ce6 100644
--- a/packages/effect/src/TPubSub.ts
+++ b/packages/effect/src/TPubSub.ts
@@ -107,6 +107,15 @@ export const isEmpty: (self: TPubSub) => STM.STM = internal.isEmp
*/
export const isFull: (self: TPubSub) => STM.STM = internal.isFull
+/**
+ * Interrupts any fibers that are suspended on `offer` or `take`. Future calls
+ * to `offer*` and `take*` will be interrupted immediately.
+ *
+ * @since 2.0.0
+ * @category utils
+ */
+export const shutdown: (self: TPubSub) => STM.STM = internal.shutdown
+
/**
* Returns `true` if `shutdown` has been called, otherwise returns `false`.
*
diff --git a/packages/effect/src/TQueue.ts b/packages/effect/src/TQueue.ts
index 800555b2c8a..e8b9b465faf 100644
--- a/packages/effect/src/TQueue.ts
+++ b/packages/effect/src/TQueue.ts
@@ -206,7 +206,7 @@ export const isTEnqueue: (u: unknown) => u is TEnqueue = internal.isTEn
* @since 2.0.0
* @category mutations
*/
-export const awaitShutdown: (self: TQueue) => STM.STM = internal.awaitShutdown
+export const awaitShutdown: (self: TDequeue | TEnqueue) => STM.STM = internal.awaitShutdown
/**
* Creates a bounded queue with the back pressure strategy. The queue will
@@ -226,7 +226,7 @@ export const bounded: (requestedCapacity: number) => STM.STM> = int
* @since 2.0.0
* @category getters
*/
-export const capacity: (self: TQueue) => number = internal.capacity
+export const capacity: (self: TDequeue | TEnqueue) => number = internal.capacity
/**
* Creates a bounded queue with the dropping strategy. The queue will drop new
@@ -245,7 +245,7 @@ export const dropping: (requestedCapacity: number) => STM.STM> = in
* @since 2.0.0
* @category getters
*/
-export const isEmpty: (self: TQueue) => STM.STM = internal.isEmpty
+export const isEmpty: (self: TDequeue | TEnqueue) => STM.STM = internal.isEmpty
/**
* Returns `true` if the `TQueue` contains at least one element, `false`
@@ -254,7 +254,7 @@ export const isEmpty: (self: TQueue) => STM.STM = internal.isEmpt
* @since 2.0.0
* @category getters
*/
-export const isFull: (self: TQueue) => STM.STM = internal.isFull
+export const isFull: (self: TDequeue | TEnqueue) => STM.STM = internal.isFull
/**
* Returns `true` if `shutdown` has been called, otherwise returns `false`.
@@ -262,7 +262,7 @@ export const isFull: (self: TQueue) => STM.STM = internal.isFull
* @since 2.0.0
* @category getters
*/
-export const isShutdown: (self: TQueue) => STM.STM = internal.isShutdown
+export const isShutdown: (self: TDequeue | TEnqueue) => STM.STM = internal.isShutdown
/**
* Places one value in the queue.
@@ -345,7 +345,7 @@ export const seek: {
* @since 2.0.0
* @category mutations
*/
-export const shutdown: (self: TQueue) => STM.STM = internal.shutdown
+export const shutdown: (self: TDequeue | TEnqueue) => STM.STM = internal.shutdown
/**
* Retrieves the size of the queue, which is equal to the number of elements
@@ -355,7 +355,7 @@ export const shutdown: (self: TQueue) => STM.STM = internal.shutdown
* @since 2.0.0
* @category getters
*/
-export const size: (self: TQueue) => STM.STM = internal.size
+export const size: (self: TDequeue | TEnqueue) => STM.STM = internal.size
/**
* Creates a bounded queue with the sliding strategy. The queue will add new
diff --git a/packages/effect/src/TRef.ts b/packages/effect/src/TRef.ts
index 5b98a7c6536..1dd83e9c4ed 100644
--- a/packages/effect/src/TRef.ts
+++ b/packages/effect/src/TRef.ts
@@ -7,6 +7,7 @@ import type * as TxnId from "./internal/stm/stm/txnId.js"
import type * as Versioned from "./internal/stm/stm/versioned.js"
import * as internal from "./internal/stm/tRef.js"
import type * as Option from "./Option.js"
+import type { Pipeable } from "./Pipeable.js"
import type * as STM from "./STM.js"
import type * as Types from "./Types.js"
@@ -34,7 +35,7 @@ export type TRefTypeId = typeof TRefTypeId
* @since 2.0.0
* @category models
*/
-export interface TRef extends TRef.Variance {
+export interface TRef extends TRef.Variance, Pipeable {
/**
* Note: the method is unbound, exposed only for potential extensions.
*/
diff --git a/packages/effect/src/TSubscriptionRef.ts b/packages/effect/src/TSubscriptionRef.ts
new file mode 100644
index 00000000000..dfc6ccb5ffe
--- /dev/null
+++ b/packages/effect/src/TSubscriptionRef.ts
@@ -0,0 +1,192 @@
+/**
+ * @since 3.10.0
+ */
+import type * as Effect from "./Effect.js"
+import * as internal from "./internal/stm/tSubscriptionRef.js"
+import type * as Option from "./Option.js"
+import type * as Scope from "./Scope.js"
+import type * as STM from "./STM.js"
+import type * as Stream from "./Stream.js"
+import type * as TPubSub from "./TPubSub.js"
+import type * as TQueue from "./TQueue.js"
+import type * as TRef from "./TRef.js"
+import type * as Types from "./Types.js"
+
+/**
+ * @since 3.10.0
+ * @category symbols
+ */
+export const TSubscriptionRefTypeId: unique symbol = internal.TSubscriptionRefTypeId
+
+/**
+ * @since 3.10.0
+ * @category symbols
+ */
+export type TSubscriptionRefTypeId = typeof TSubscriptionRefTypeId
+
+/**
+ * A `TSubscriptionRef` is a `TRef` that can be subscribed to in order to
+ * receive a `TDequeue` of the current value and all committed changes to the value.
+ *
+ * @since 3.10.0
+ * @category models
+ */
+export interface TSubscriptionRef extends TSubscriptionRef.Variance, TRef.TRef {
+ /** @internal */
+ readonly ref: TRef.TRef
+ /** @internal */
+ readonly pubsub: TPubSub.TPubSub
+ /** @internal */
+ modify(f: (a: A) => readonly [B, A]): STM.STM
+
+ /**
+ * A TDequeue containing the current value of the `Ref` as well as all changes
+ * to that value.
+ */
+ readonly changes: STM.STM>
+}
+
+/**
+ * @since 3.10.0
+ */
+export declare namespace TSubscriptionRef {
+ /**
+ * @since 3.10.0
+ * @category models
+ */
+ export interface Variance {
+ readonly [TSubscriptionRefTypeId]: {
+ readonly _A: Types.Invariant
+ }
+ }
+}
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const get: (self: TSubscriptionRef) => STM.STM = internal.get
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const getAndSet: {
+ (value: A): (self: TSubscriptionRef) => STM.STM
+ (self: TSubscriptionRef, value: A): STM.STM
+} = internal.getAndSet
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const getAndUpdate: {
+ (f: (a: A) => A): (self: TSubscriptionRef) => STM.STM
+ (self: TSubscriptionRef, f: (a: A) => A): STM.STM
+} = internal.getAndUpdate
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const getAndUpdateSome: {
+ (f: (a: A) => Option.Option): (self: TSubscriptionRef) => STM.STM
+ (self: TSubscriptionRef, f: (a: A) => Option.Option): STM.STM
+} = internal.getAndUpdateSome
+
+/**
+ * @since 3.10.0
+ * @category constructors
+ */
+export const make: (value: A) => STM.STM> = internal.make
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const modify: {
+ (f: (a: A) => readonly [B, A]): (self: TSubscriptionRef) => STM.STM
+ (self: TSubscriptionRef, f: (a: A) => readonly [B, A]): STM.STM
+} = internal.modify
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const modifySome: {
+ (fallback: B, f: (a: A) => Option.Option): (self: TSubscriptionRef) => STM.STM
+ (self: TSubscriptionRef, fallback: B, f: (a: A) => Option.Option): STM.STM
+} = internal.modifySome
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const set: {
+ (value: A): (self: TSubscriptionRef) => STM.STM
+ (self: TSubscriptionRef, value: A): STM.STM
+} = internal.set
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const setAndGet: {
+ (value: A): (self: TSubscriptionRef) => STM.STM
+ (self: TSubscriptionRef, value: A): STM.STM
+} = internal.setAndGet
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const update: {
+ (f: (a: A) => A): (self: TSubscriptionRef) => STM.STM
+ (self: TSubscriptionRef, f: (a: A) => A): STM.STM
+} = internal.update
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const updateAndGet: {
+ (f: (a: A) => A): (self: TSubscriptionRef) => STM.STM
+ (self: TSubscriptionRef, f: (a: A) => A): STM.STM
+} = internal.updateAndGet
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const updateSome: {
+ (f: (a: A) => Option.Option): (self: TSubscriptionRef) => STM.STM
+ (self: TSubscriptionRef, f: (a: A) => Option.Option): STM.STM
+} = internal.updateSome
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const updateSomeAndGet: {
+ (f: (a: A) => Option.Option): (self: TSubscriptionRef) => STM.STM
+ (self: TSubscriptionRef, f: (a: A) => Option.Option): STM.STM
+} = internal.updateSomeAndGet
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const changesScoped: (self: TSubscriptionRef) => Effect.Effect, never, Scope.Scope> =
+ internal.changesScoped
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const changesStream: (self: TSubscriptionRef) => Stream.Stream = internal.changesStream
+
+/**
+ * @since 3.10.0
+ * @category mutations
+ */
+export const changes: (self: TSubscriptionRef) => STM.STM> = (self) => self.changes
diff --git a/packages/effect/src/index.ts b/packages/effect/src/index.ts
index cb90a4ac728..a57bd513ed4 100644
--- a/packages/effect/src/index.ts
+++ b/packages/effect/src/index.ts
@@ -895,6 +895,11 @@ export * as TSemaphore from "./TSemaphore.js"
*/
export * as TSet from "./TSet.js"
+/**
+ * @since 3.10.0
+ */
+export * as TSubscriptionRef from "./TSubscriptionRef.js"
+
/**
* @since 2.0.0
*/
diff --git a/packages/effect/src/internal/stm/core.ts b/packages/effect/src/internal/stm/core.ts
index 7c0f5587152..12030907524 100644
--- a/packages/effect/src/internal/stm/core.ts
+++ b/packages/effect/src/internal/stm/core.ts
@@ -15,11 +15,10 @@ import { pipeArguments } from "../../Pipeable.js"
import { hasProperty } from "../../Predicate.js"
import type * as Scheduler from "../../Scheduler.js"
import type * as STM from "../../STM.js"
-import { StreamTypeId } from "../../Stream.js"
import { YieldWrap } from "../../Utils.js"
import { ChannelTypeId } from "../core-stream.js"
import { withFiberRuntime } from "../core.js"
-import { effectVariance } from "../effectable.js"
+import { effectVariance, StreamTypeId } from "../effectable.js"
import { OP_COMMIT } from "../opCodes/effect.js"
import { SingleShotGen } from "../singleShotGen.js"
import { SinkTypeId } from "../sink.js"
diff --git a/packages/effect/src/internal/stm/tPubSub.ts b/packages/effect/src/internal/stm/tPubSub.ts
index 089be12ce55..b838ddb90a2 100644
--- a/packages/effect/src/internal/stm/tPubSub.ts
+++ b/packages/effect/src/internal/stm/tPubSub.ts
@@ -201,6 +201,7 @@ class TPubSubSubscriptionImpl implements TQueue.TDequeue {
capacity(): number {
return this.requestedCapacity
}
+
size: STM.STM = core.withSTMRuntime((runtime) => {
let currentSubscriberHead = tRef.unsafeGet(this.subscriberHead, runtime.journal)
if (currentSubscriberHead === undefined) {
diff --git a/packages/effect/src/internal/stm/tQueue.ts b/packages/effect/src/internal/stm/tQueue.ts
index 90039374f32..8ca27b9865c 100644
--- a/packages/effect/src/internal/stm/tQueue.ts
+++ b/packages/effect/src/internal/stm/tQueue.ts
@@ -3,7 +3,7 @@ import * as Chunk from "../../Chunk.js"
import { dual, pipe } from "../../Function.js"
import * as Option from "../../Option.js"
import { hasProperty, type Predicate } from "../../Predicate.js"
-import * as STM from "../../STM.js"
+import type * as STM from "../../STM.js"
import type * as TQueue from "../../TQueue.js"
import type * as TRef from "../../TRef.js"
import * as core from "./core.js"
@@ -99,7 +99,7 @@ class TQueueImpl implements TQueue.TQueue {
size: STM.STM = core.withSTMRuntime((runtime) => {
const queue = tRef.unsafeGet(this.ref, runtime.journal)
if (queue === undefined) {
- return STM.interruptAs(runtime.fiberId)
+ return core.interruptAs(runtime.fiberId)
}
return core.succeed(queue.length)
})
diff --git a/packages/effect/src/internal/stm/tRef.ts b/packages/effect/src/internal/stm/tRef.ts
index c7805093604..3162fc252b0 100644
--- a/packages/effect/src/internal/stm/tRef.ts
+++ b/packages/effect/src/internal/stm/tRef.ts
@@ -1,5 +1,7 @@
import { dual } from "../../Function.js"
import * as Option from "../../Option.js"
+import type { Pipeable } from "../../Pipeable.js"
+import { pipeArguments } from "../../Pipeable.js"
import type * as STM from "../../STM.js"
import type * as TRef from "../../TRef.js"
import * as core from "./core.js"
@@ -16,13 +18,13 @@ export const TRefTypeId: TRef.TRefTypeId = Symbol.for(
TRefSymbolKey
) as TRef.TRefTypeId
-const tRefVariance = {
+export const tRefVariance = {
/* c8 ignore next */
_A: (_: any) => _
}
/** @internal */
-export class TRefImpl implements TRef.TRef {
+export class TRefImpl implements TRef.TRef, Pipeable {
readonly [TRefTypeId] = tRefVariance
/** @internal */
todos: Map
@@ -40,6 +42,9 @@ export class TRefImpl implements TRef.TRef {
return retValue
})
}
+ pipe() {
+ return pipeArguments(this, arguments)
+ }
}
/** @internal */
diff --git a/packages/effect/src/internal/stm/tSubscriptionRef.ts b/packages/effect/src/internal/stm/tSubscriptionRef.ts
new file mode 100644
index 00000000000..94a49240553
--- /dev/null
+++ b/packages/effect/src/internal/stm/tSubscriptionRef.ts
@@ -0,0 +1,286 @@
+import * as Effect from "../../Effect.js"
+import { dual, pipe } from "../../Function.js"
+import * as Option from "../../Option.js"
+import { pipeArguments } from "../../Pipeable.js"
+import * as STM from "../../STM.js"
+import * as TPubSub from "../../TPubSub.js"
+import * as TQueue from "../../TQueue.js"
+import * as TRef from "../../TRef.js"
+import type * as TSubscriptionRef from "../../TSubscriptionRef.js"
+import * as stream from "../stream.js"
+import { tDequeueVariance } from "./tQueue.js"
+import { tRefVariance } from "./tRef.js"
+
+/** @internal */
+const TSubscriptionRefSymbolKey = "effect/TSubscriptionRef"
+
+/** @internal */
+export const TSubscriptionRefTypeId: TSubscriptionRef.TSubscriptionRefTypeId = Symbol.for(
+ TSubscriptionRefSymbolKey
+) as TSubscriptionRef.TSubscriptionRefTypeId
+
+const TSubscriptionRefVariance = {
+ /* c8 ignore next */
+ _A: (_: any) => _
+}
+
+class TDequeueMerge implements TQueue.TDequeue {
+ [TQueue.TDequeueTypeId] = tDequeueVariance
+
+ constructor(
+ readonly first: TQueue.TDequeue,
+ readonly second: TQueue.TDequeue
+ ) {}
+
+ peek: STM.STM = STM.gen(this, function*() {
+ const first = yield* this.peekOption
+ if (first._tag === "Some") {
+ return first.value
+ }
+ return yield* STM.retry
+ })
+
+ peekOption: STM.STM> = STM.gen(this, function*() {
+ const first = yield* this.first.peekOption
+ if (first._tag === "Some") {
+ return first
+ }
+ const second = yield* this.second.peekOption
+ if (second._tag === "Some") {
+ return second
+ }
+ return Option.none()
+ })
+
+ take: STM.STM = STM.gen(this, function*() {
+ if (!(yield* this.first.isEmpty)) {
+ return yield* this.first.take
+ }
+ if (!(yield* this.second.isEmpty)) {
+ return yield* this.second.take
+ }
+ return yield* STM.retry
+ })
+
+ takeAll: STM.STM> = STM.gen(this, function*() {
+ return [...yield* this.first.takeAll, ...yield* this.second.takeAll]
+ })
+
+ takeUpTo(max: number): STM.STM> {
+ return STM.gen(this, function*() {
+ const first = yield* this.first.takeUpTo(max)
+ if (first.length >= max) {
+ return first
+ }
+ return [...first, ...yield* this.second.takeUpTo(max - first.length)]
+ })
+ }
+
+ capacity(): number {
+ return this.first.capacity() + this.second.capacity()
+ }
+
+ size: STM.STM = STM.gen(this, function*() {
+ return (yield* this.first.size) + (yield* this.second.size)
+ })
+
+ isFull: STM.STM = STM.gen(this, function*() {
+ return (yield* this.first.isFull) && (yield* this.second.isFull)
+ })
+
+ isEmpty: STM.STM = STM.gen(this, function*() {
+ return (yield* this.first.isEmpty) && (yield* this.second.isEmpty)
+ })
+
+ shutdown: STM.STM = STM.gen(this, function*() {
+ yield* this.first.shutdown
+ yield* this.second.shutdown
+ })
+
+ isShutdown: STM.STM = STM.gen(this, function*() {
+ return (yield* this.first.isShutdown) && (yield* this.second.isShutdown)
+ })
+
+ awaitShutdown: STM.STM = STM.gen(this, function*() {
+ yield* this.first.awaitShutdown
+ yield* this.second.awaitShutdown
+ })
+}
+
+/** @internal */
+class TSubscriptionRefImpl implements TSubscriptionRef.TSubscriptionRef {
+ readonly [TSubscriptionRefTypeId] = TSubscriptionRefVariance
+ readonly [TRef.TRefTypeId] = tRefVariance
+
+ constructor(
+ readonly ref: TRef.TRef,
+ readonly pubsub: TPubSub.TPubSub
+ ) {}
+
+ get todos() {
+ return this.ref.todos
+ }
+
+ get versioned() {
+ return this.ref.versioned
+ }
+
+ pipe() {
+ return pipeArguments(this, arguments)
+ }
+
+ get changes(): STM.STM> {
+ return STM.gen(this, function*() {
+ const first = yield* TQueue.unbounded()
+ yield* TQueue.offer(first, yield* TRef.get(this.ref))
+ return new TDequeueMerge(first, yield* TPubSub.subscribe(this.pubsub))
+ })
+ }
+
+ modify(f: (a: A) => readonly [B, A]): STM.STM {
+ return pipe(
+ TRef.get(this.ref),
+ STM.map(f),
+ STM.flatMap(([b, a]) =>
+ pipe(
+ TRef.set(this.ref, a),
+ STM.as(b),
+ STM.zipLeft(TPubSub.publish(this.pubsub, a))
+ )
+ )
+ )
+ }
+}
+
+/** @internal */
+export const make = (value: A): STM.STM> =>
+ pipe(
+ STM.all([
+ TPubSub.unbounded(),
+ TRef.make(value)
+ ]),
+ STM.map(([pubsub, ref]) => new TSubscriptionRefImpl(ref, pubsub))
+ )
+
+/** @internal */
+export const get = (self: TSubscriptionRef.TSubscriptionRef) => TRef.get(self.ref)
+
+/** @internal */
+export const set = dual<
+ (value: A) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM,
+ (self: TSubscriptionRef.TSubscriptionRef, value: A) => STM.STM
+>(
+ 2,
+ (self: TSubscriptionRef.TSubscriptionRef, value: A): STM.STM =>
+ self.modify((): [void, A] => [void 0, value])
+)
+
+/** @internal */
+export const getAndSet = dual<
+ (value: A) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM,
+ (self: TSubscriptionRef.TSubscriptionRef, value: A) => STM.STM
+>(2, (self, value) => self.modify((a) => [a, value]))
+
+/** @internal */
+export const getAndUpdate = dual<
+ (f: (a: A) => A) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM,
+ (self: TSubscriptionRef.TSubscriptionRef, f: (a: A) => A) => STM.STM
+>(2, (self, f) => self.modify((a) => [a, f(a)]))
+
+/** @internal */
+export const getAndUpdateSome = dual<
+ (f: (a: A) => Option.Option) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM,
+ (self: TSubscriptionRef.TSubscriptionRef, f: (a: A) => Option.Option) => STM.STM
+>(2, (self, f) =>
+ self.modify((a) =>
+ Option.match(f(a), {
+ onNone: () => [a, a],
+ onSome: (b) => [a, b]
+ })
+ ))
+
+/** @internal */
+export const setAndGet = dual<
+ (value: A) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM,
+ (self: TSubscriptionRef.TSubscriptionRef, value: A) => STM.STM
+>(2, (self, value) => self.modify(() => [value, value]))
+
+/** @internal */
+export const modify = dual<
+ (f: (a: A) => readonly [B, A]) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM,
+ (self: TSubscriptionRef.TSubscriptionRef, f: (a: A) => readonly [B, A]) => STM.STM
+>(2, (self, f) => self.modify(f))
+
+/** @internal */
+export const modifySome = dual<
+ (
+ fallback: B,
+ f: (a: A) => Option.Option
+ ) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM,
+ (
+ self: TSubscriptionRef.TSubscriptionRef,
+ fallback: B,
+ f: (a: A) => Option.Option
+ ) => STM.STM
+>(3, (self, fallback, f) =>
+ self.modify((a) =>
+ Option.match(f(a), {
+ onNone: () => [fallback, a],
+ onSome: (b) => b
+ })
+ ))
+
+/** @internal */
+export const update = dual<
+ (f: (a: A) => A) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM,
+ (self: TSubscriptionRef.TSubscriptionRef, f: (a: A) => A) => STM.STM
+>(2, (self, f) => self.modify((a) => [void 0, f(a)]))
+
+/** @internal */
+export const updateAndGet = dual<
+ (f: (a: A) => A) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM,
+ (self: TSubscriptionRef.TSubscriptionRef, f: (a: A) => A) => STM.STM
+>(2, (self, f) =>
+ self.modify((a) => {
+ const b = f(a)
+ return [b, b]
+ }))
+
+/** @internal */
+export const updateSome = dual<
+ (f: (a: A) => Option.Option) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM,
+ (self: TSubscriptionRef.TSubscriptionRef, f: (a: A) => Option.Option) => STM.STM
+>(
+ 2,
+ (self, f) =>
+ self.modify((a) => [
+ void 0,
+ Option.match(f(a), {
+ onNone: () => a,
+ onSome: (b) => b
+ })
+ ])
+)
+
+/** @internal */
+export const updateSomeAndGet = dual<
+ (f: (a: A) => Option.Option) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM,
+ (self: TSubscriptionRef.TSubscriptionRef, f: (a: A) => Option.Option) => STM.STM
+>(
+ 2,
+ (self, f) =>
+ self.modify((a) =>
+ Option.match(f(a), {
+ onNone: () => [a, a],
+ onSome: (b) => [b, b]
+ })
+ )
+)
+
+/** @internal */
+export const changesScoped = (self: TSubscriptionRef.TSubscriptionRef) =>
+ Effect.acquireRelease(self.changes, TQueue.shutdown)
+
+/** @internal */
+export const changesStream = (self: TSubscriptionRef.TSubscriptionRef) =>
+ stream.unwrap(Effect.map(self.changes, stream.fromTQueue))
diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts
index c5abc3e0987..3a3ca4402d0 100644
--- a/packages/effect/src/internal/stream.ts
+++ b/packages/effect/src/internal/stream.ts
@@ -31,6 +31,8 @@ import type * as Stream from "../Stream.js"
import type * as Emit from "../StreamEmit.js"
import * as HaltStrategy from "../StreamHaltStrategy.js"
import type * as Take from "../Take.js"
+import * as TPubSub from "../TPubSub.js"
+import * as TQueue from "../TQueue.js"
import type * as Tracer from "../Tracer.js"
import * as Tuple from "../Tuple.js"
import type { NoInfer, TupleOf } from "../Types.js"
@@ -3133,6 +3135,14 @@ export const fromPubSub: {
return options?.shutdown ? ensuring(stream, PubSub.shutdown(pubsub)) : stream
}
+/** @internal */
+export const fromTPubSub = (pubsub: TPubSub.TPubSub): Stream.Stream => {
+ return unwrapScoped(Effect.map(
+ TPubSub.subscribeScoped(pubsub),
+ (queue) => fromTQueue(queue)
+ ))
+}
+
/** @internal */
export const fromIterable = (iterable: Iterable): Stream.Stream =>
suspend(() =>
@@ -3224,6 +3234,24 @@ export const fromQueue = (
options?.shutdown ? ensuring(Queue.shutdown(queue)) : identity
)
+/** @internal */
+export const fromTQueue = (queue: TQueue.TDequeue): Stream.Stream =>
+ pipe(
+ TQueue.take(queue),
+ Effect.map(Chunk.of),
+ Effect.catchAllCause((cause) =>
+ pipe(
+ TQueue.isShutdown(queue),
+ Effect.flatMap((isShutdown) =>
+ isShutdown && Cause.isInterrupted(cause) ?
+ pull.end() :
+ pull.failCause(cause)
+ )
+ )
+ ),
+ repeatEffectChunkOption
+ )
+
/** @internal */
export const fromSchedule = (schedule: Schedule.Schedule): Stream.Stream =>
pipe(
diff --git a/packages/effect/test/TSubscriptionRef.test.ts b/packages/effect/test/TSubscriptionRef.test.ts
new file mode 100644
index 00000000000..2cc54e89b92
--- /dev/null
+++ b/packages/effect/test/TSubscriptionRef.test.ts
@@ -0,0 +1,148 @@
+import { Chunk, Deferred, Effect, Equal, Exit, Fiber, pipe, Random, STM, Stream } from "effect"
+import * as Number from "effect/Number"
+import * as it from "effect/test/utils/extend"
+import * as TSubscriptionRef from "effect/TSubscriptionRef"
+import { assert, describe } from "vitest"
+
+describe.concurrent("TSubscriptionRef", () => {
+ it.effect("only emits comitted values", () =>
+ Effect.gen(function*($) {
+ const subscriptionRef = yield* $(TSubscriptionRef.make(0))
+
+ const transaction = pipe(
+ TSubscriptionRef.update(subscriptionRef, (n) => n + 1),
+ STM.tap(() => TSubscriptionRef.update(subscriptionRef, (n) => n + 1))
+ )
+
+ const subscriber = yield* $(pipe(
+ TSubscriptionRef.changesStream(subscriptionRef),
+ Stream.take(1),
+ Stream.runCollect,
+ Effect.fork
+ ))
+ // stream doesn't work properly without a yield, it will drop values
+ yield* $(Effect.yieldNow())
+ yield* $(STM.commit(transaction))
+ yield* $(Effect.yieldNow())
+ const result = yield* $(Fiber.join(subscriber))
+
+ assert.deepStrictEqual(Array.from(result), [2])
+ }))
+
+ it.effect("emits every comitted value", () =>
+ Effect.gen(function*($) {
+ const subscriptionRef = yield* $(TSubscriptionRef.make(0))
+
+ const transaction = pipe(
+ TSubscriptionRef.update(subscriptionRef, (n) => n + 1),
+ STM.commit,
+ // stream doesn't work properly without a yield, it will drop the first value without this
+ Effect.tap(() => Effect.yieldNow()),
+ Effect.flatMap(() => TSubscriptionRef.update(subscriptionRef, (n) => n + 1))
+ )
+
+ const subscriber = yield* $(pipe(
+ TSubscriptionRef.changesStream(subscriptionRef),
+ Stream.take(2),
+ Stream.runCollect,
+ Effect.fork
+ ))
+ // stream doesn't work properly without a yield, it will drop the first value without this
+ yield* $(Effect.yieldNow())
+ yield* $(transaction)
+ const result = yield* $(Fiber.join(subscriber))
+
+ assert.deepStrictEqual(Array.from(result), [1, 2])
+ }))
+
+ it.effect("multiple subscribers can receive committed values", () =>
+ Effect.gen(function*($) {
+ const subscriptionRef = yield* $(TSubscriptionRef.make(0))
+ const deferred1 = yield* $(Deferred.make())
+ const deferred2 = yield* $(Deferred.make())
+ const subscriber1 = yield* $(pipe(
+ TSubscriptionRef.changesStream(subscriptionRef),
+ Stream.tap(() => Deferred.succeed(deferred1, void 0)),
+ Stream.take(3),
+ Stream.runCollect,
+ Effect.fork
+ ))
+ yield* $(Deferred.await(deferred1))
+ yield* $(TSubscriptionRef.update(subscriptionRef, (n) => n + 1))
+ const subscriber2 = yield* $(pipe(
+ TSubscriptionRef.changesStream(subscriptionRef),
+ Stream.tap(() => Deferred.succeed(deferred2, void 0)),
+ Stream.take(2),
+ Stream.runCollect,
+ Effect.fork
+ ))
+ yield* $(Deferred.await(deferred2))
+ yield* $(TSubscriptionRef.update(subscriptionRef, (n) => n + 1))
+ const result1 = yield* $(Fiber.join(subscriber1))
+ const result2 = yield* $(Fiber.join(subscriber2))
+ assert.deepStrictEqual(Array.from(result1), [0, 1, 2])
+ assert.deepStrictEqual(Array.from(result2), [1, 2])
+ }))
+
+ it.effect("subscriptions are interruptible", () =>
+ Effect.gen(function*($) {
+ const ref = yield* $(TSubscriptionRef.make(0))
+ const deferred1 = yield* $(Deferred.make())
+ const deferred2 = yield* $(Deferred.make())
+ const subscriber1 = yield* $(
+ TSubscriptionRef.changesStream(ref),
+ Stream.tap(() => Deferred.succeed(deferred1, void 0)),
+ Stream.take(5),
+ Stream.runCollect,
+ Effect.fork
+ )
+ yield* $(Deferred.await(deferred1))
+ yield* $(TSubscriptionRef.update(ref, (n) => n + 1))
+ const subscriber2 = yield* $(
+ TSubscriptionRef.changesStream(ref),
+ Stream.tap(() => Deferred.succeed(deferred2, void 0)),
+ Stream.take(2),
+ Stream.runCollect,
+ Effect.fork
+ )
+ yield* $(Deferred.await(deferred2))
+ yield* $(TSubscriptionRef.update(ref, (n) => n + 1))
+ const result1 = yield* $(Fiber.interrupt(subscriber1))
+ const result2 = yield* $(Fiber.join(subscriber2))
+ assert.isTrue(Exit.isInterrupted(result1))
+ assert.deepStrictEqual(Array.from(result2), [1, 2])
+ }))
+
+ it.effect("concurrent subscribes and unsubscribes are handled correctly", () =>
+ Effect.gen(function*($) {
+ const subscriber = (subscriptionRef: TSubscriptionRef.TSubscriptionRef) =>
+ pipe(
+ Random.nextIntBetween(0, 200),
+ Effect.flatMap((n) =>
+ pipe(
+ TSubscriptionRef.changesStream(subscriptionRef),
+ Stream.take(n),
+ Stream.runCollect
+ )
+ )
+ )
+ const ref = yield* $(TSubscriptionRef.make(0))
+ const fiber = yield* $(
+ TSubscriptionRef.update(ref, (n) => n + 1),
+ Effect.forever,
+ Effect.fork
+ )
+ const result = yield* $(
+ Effect.map(
+ Effect.all(
+ Array.from({ length: 2 }, () => subscriber(ref)),
+ { concurrency: 2 }
+ ),
+ Chunk.unsafeFromArray
+ )
+ )
+ yield* $(Fiber.interrupt(fiber))
+ const isSorted = Chunk.every(result, (chunk) => Equal.equals(chunk, Chunk.sort(chunk, Number.Order)))
+ assert.isTrue(isSorted)
+ }))
+})