Skip to content

Commit

Permalink
switch to state union
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Apr 16, 2024
1 parent 06370ab commit 2e17e93
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 71 deletions.
57 changes: 32 additions & 25 deletions packages/effect/src/FiberHandle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@ export interface FiberHandle<out A = unknown, out E = unknown> extends Pipeable,
readonly [TypeId]: TypeId
readonly deferred: Deferred.Deferred<void, unknown>
/** @internal */
backing: Fiber.RuntimeFiber<A, E> | undefined
/** @internal */
closed: boolean
state: {
readonly _tag: "Open"
fiber: Fiber.RuntimeFiber<A, E> | undefined
} | {
readonly _tag: "Closed"
}
}

/**
Expand All @@ -57,7 +60,7 @@ const Proto = {
toJSON(this: FiberHandle) {
return {
_id: "FiberHandle",
backing: this.backing
state: this.state
}
},
[Inspectable.NodeInspectSymbol](this: FiberHandle) {
Expand All @@ -72,9 +75,8 @@ const unsafeMake = <A = unknown, E = unknown>(
deferred: Deferred.Deferred<void, E>
): FiberHandle<A, E> => {
const self = Object.create(Proto)
self.backing = undefined
self.state = { _tag: "Open", fiber: undefined }
self.deferred = deferred
self.closed = false
return self
}

Expand Down Expand Up @@ -108,10 +110,13 @@ export const make = <A = unknown, E = unknown>(): Effect.Effect<FiberHandle<A, E
Effect.acquireRelease(
Effect.map(Deferred.make<void, E>(), (deferred) => unsafeMake<A, E>(deferred)),
(handle) =>
Effect.suspend(() => {
handle.closed = true
return Effect.zipRight(clear(handle), Deferred.done(handle.deferred, Exit.void))
})
Effect.zipRight(
clear(handle),
Effect.suspend(() => {
handle.state = { _tag: "Closed" }
return Deferred.done(handle.deferred, Exit.void)
})
)
)

/**
Expand Down Expand Up @@ -168,25 +173,25 @@ export const unsafeSet: {
readonly onlyIfMissing?: boolean | undefined
}
): void => {
if (self.closed) {
if (self.state._tag === "Closed") {
fiber.unsafeInterruptAsFork(options?.interruptAs ?? FiberId.none)
return
} else if (self.backing !== undefined) {
} else if (self.state.fiber !== undefined) {
if (options?.onlyIfMissing === true) {
fiber.unsafeInterruptAsFork(options?.interruptAs ?? FiberId.none)
return
} else if (self.backing === fiber) {
} else if (self.state.fiber === fiber) {
return
}
self.backing.unsafeInterruptAsFork(options?.interruptAs ?? FiberId.none)
self.backing === undefined
self.state.fiber.unsafeInterruptAsFork(options?.interruptAs ?? FiberId.none)
self.state.fiber === undefined
}

;(fiber as FiberRuntime<unknown, unknown>).setFiberRef(FiberRef.unhandledErrorLogLevel, Option.none())
self.backing = fiber
self.state.fiber = fiber
fiber.addObserver((exit) => {
if (fiber === self.backing) {
self.backing = undefined
if (self.state._tag === "Open" && fiber === self.state.fiber) {
self.state.fiber = undefined
}
if (Exit.isFailure(exit) && !Cause.isInterruptedOnly(exit.cause)) {
Deferred.unsafeDone(self.deferred, exit as any)
Expand Down Expand Up @@ -239,7 +244,7 @@ export const set: {
* @categories combinators
*/
export const unsafeGet = <A, E>(self: FiberHandle<A, E>): Option.Option<Fiber.RuntimeFiber<A, E>> =>
Option.fromNullable(self.backing)
self.state._tag === "Closed" ? Option.none() : Option.fromNullable(self.state.fiber)

/**
* Retrieve the fiber from the FiberHandle.
Expand All @@ -248,7 +253,7 @@ export const unsafeGet = <A, E>(self: FiberHandle<A, E>): Option.Option<Fiber.Ru
* @categories combinators
*/
export const get = <A, E>(self: FiberHandle<A, E>): Effect.Effect<Fiber.RuntimeFiber<A, E>, NoSuchElementException> =>
Effect.suspend(() => Option.fromNullable(self.backing))
Effect.suspend(() => unsafeGet(self))

/**
* @since 2.0.0
Expand All @@ -257,13 +262,15 @@ export const get = <A, E>(self: FiberHandle<A, E>): Effect.Effect<Fiber.RuntimeF
export const clear = <A, E>(self: FiberHandle<A, E>): Effect.Effect<void> =>
Effect.uninterruptibleMask((restore) =>
Effect.suspend(() => {
if (self.backing === undefined) {
if (self.state._tag === "Closed" || self.state.fiber === undefined) {
return Effect.void
}
return Effect.zipRight(
restore(Fiber.interrupt(self.backing!)),
restore(Fiber.interrupt(self.state.fiber)),
Effect.sync(() => {
self.backing = undefined
if (self.state._tag === "Open") {
self.state.fiber = undefined
}
})
)
})
Expand Down Expand Up @@ -298,7 +305,7 @@ export const run: {
const options = arguments[1] as { readonly onlyIfMissing?: boolean } | undefined
return (effect: Effect.Effect<unknown, unknown, any>) =>
Effect.suspend(() => {
if (self.closed) {
if (self.state._tag === "Closed") {
return Effect.interrupt
}
return Effect.uninterruptibleMask((restore) =>
Expand All @@ -312,7 +319,7 @@ export const run: {
const effect = arguments[1] as Effect.Effect<any, any, any>
const options = arguments[2] as { readonly onlyIfMissing?: boolean } | undefined
return Effect.suspend(() => {
if (self.closed) {
if (self.state._tag === "Closed") {
return Effect.interrupt
}
return Effect.uninterruptibleMask((restore) =>
Expand Down
71 changes: 46 additions & 25 deletions packages/effect/src/FiberMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import * as FiberRef from "./FiberRef.js"
import { dual } from "./Function.js"
import * as Inspectable from "./Inspectable.js"
import type { FiberRuntime } from "./internal/fiberRuntime.js"
import * as Iterable from "./Iterable.js"
import * as MutableHashMap from "./MutableHashMap.js"
import * as Option from "./Option.js"
import { type Pipeable, pipeArguments } from "./Pipeable.js"
Expand Down Expand Up @@ -40,11 +41,14 @@ export interface FiberMap<in out K, out A = unknown, out E = unknown>
{
readonly [TypeId]: TypeId
/** @internal */
readonly backing: MutableHashMap.MutableHashMap<K, Fiber.RuntimeFiber<A, E>>
/** @internal */
readonly deferred: Deferred.Deferred<void, unknown>
/** @internal */
closed: boolean
state: {
readonly _tag: "Open"
readonly backing: MutableHashMap.MutableHashMap<K, Fiber.RuntimeFiber<A, E>>
} | {
readonly _tag: "Closed"
}
}

/**
Expand All @@ -56,15 +60,18 @@ export const isFiberMap = (u: unknown): u is FiberMap<unknown> => Predicate.hasP
const Proto = {
[TypeId]: TypeId,
[Symbol.iterator](this: FiberMap<unknown>) {
return this.backing[Symbol.iterator]()
if (this.state._tag === "Closed") {
return Iterable.empty()
}
return this.state.backing[Symbol.iterator]()
},
toString(this: FiberMap<unknown>) {
return Inspectable.format(this.toJSON())
},
toJSON(this: FiberMap<unknown>) {
return {
_id: "FiberMap",
backing: this.backing.toJSON()
state: this.state
}
},
[Inspectable.NodeInspectSymbol](this: FiberMap<unknown>) {
Expand All @@ -80,9 +87,8 @@ const unsafeMake = <K, A = unknown, E = unknown>(
deferred: Deferred.Deferred<void, E>
): FiberMap<K, A, E> => {
const self = Object.create(Proto)
self.backing = backing
self.state = { _tag: "Open", backing }
self.deferred = deferred
self.closed = false
return self
}

Expand Down Expand Up @@ -119,10 +125,13 @@ export const make = <K, A = unknown, E = unknown>(): Effect.Effect<FiberMap<K, A
deferred
)),
(map) =>
Effect.suspend(() => {
map.closed = true
return Effect.zipRight(clear(map), Deferred.done(map.deferred, Exit.void))
})
Effect.zipRight(
clear(map),
Effect.suspend(() => {
map.state = { _tag: "Closed" }
return Deferred.done(map.deferred, Exit.void)
})
)
)

/**
Expand Down Expand Up @@ -177,12 +186,12 @@ export const unsafeSet: {
interruptAs?: FiberId.FiberId
) => void
>((args) => isFiberMap(args[0]), (self, key, fiber, interruptAs) => {
if (self.closed) {
if (self.state._tag === "Closed") {
fiber.unsafeInterruptAsFork(interruptAs ?? FiberId.none)
return
}

const previous = MutableHashMap.get(self.backing, key)
const previous = MutableHashMap.get(self.state.backing, key)
if (previous._tag === "Some") {
if (previous.value === fiber) {
return
Expand All @@ -191,11 +200,14 @@ export const unsafeSet: {
}

;(fiber as FiberRuntime<unknown, unknown>).setFiberRef(FiberRef.unhandledErrorLogLevel, Option.none())
MutableHashMap.set(self.backing, key, fiber)
MutableHashMap.set(self.state.backing, key, fiber)
fiber.addObserver((exit) => {
const current = MutableHashMap.get(self.backing, key)
if (self.state._tag === "Closed") {
return
}
const current = MutableHashMap.get(self.state.backing, key)
if (Option.isSome(current) && fiber === current.value) {
MutableHashMap.remove(self.backing, key)
MutableHashMap.remove(self.state.backing, key)
}
if (Exit.isFailure(exit) && !Cause.isInterruptedOnly(exit.cause)) {
Deferred.unsafeDone(self.deferred, exit as any)
Expand Down Expand Up @@ -252,7 +264,7 @@ export const unsafeGet: {
self: FiberMap<K, A, E>,
key: K
) => Option.Option<Fiber.RuntimeFiber<A, E>>
>(2, (self, key) => MutableHashMap.get(self.backing, key))
>(2, (self, key) => self.state._tag === "Closed" ? Option.none() : MutableHashMap.get(self.state.backing, key))

/**
* Retrieve a fiber from the FiberMap.
Expand All @@ -271,7 +283,7 @@ export const get: {
self: FiberMap<K, A, E>,
key: K
) => Effect.Effect<Fiber.RuntimeFiber<A, E>, NoSuchElementException>
>(2, (self, key) => Effect.suspend(() => MutableHashMap.get(self.backing, key)))
>(2, (self, key) => Effect.suspend(() => unsafeGet(self, key)))

/**
* Remove a fiber from the FiberMap, interrupting it if it exists.
Expand All @@ -292,7 +304,10 @@ export const remove: {
) => Effect.Effect<void>
>(2, (self, key) =>
Effect.suspend(() => {
const fiber = MutableHashMap.get(self.backing, key)
if (self.state._tag === "Closed") {
return Effect.void
}
const fiber = MutableHashMap.get(self.state.backing, key)
if (fiber._tag === "None") {
return Effect.void
}
Expand All @@ -305,9 +320,15 @@ export const remove: {
* @categories combinators
*/
export const clear = <K, A, E>(self: FiberMap<K, A, E>): Effect.Effect<void> =>
Effect.forEach(self.backing, ([, fiber]) =>
// will be removed by the observer
Fiber.interrupt(fiber))
Effect.suspend(() => {
if (self.state._tag === "Closed") {
return Effect.void
}

return Effect.forEach(self.state.backing, ([, fiber]) =>
// will be removed by the observer
Fiber.interrupt(fiber))
})

/**
* Run an Effect and add the forked fiber to the FiberMap.
Expand All @@ -334,7 +355,7 @@ export const run: {
const key = arguments[1]
return (effect: Effect.Effect<any, any, any>) =>
Effect.suspend(() => {
if (self.closed) {
if (self.state._tag === "Closed") {
return Effect.interrupt
}
return Effect.uninterruptibleMask((restore) =>
Expand All @@ -349,7 +370,7 @@ export const run: {
const key = arguments[1]
const effect = arguments[2] as Effect.Effect<any, any, any>
return Effect.suspend(() => {
if (self.closed) {
if (self.state._tag === "Closed") {
return Effect.interrupt
}
return Effect.uninterruptibleMask((restore) =>
Expand Down Expand Up @@ -420,7 +441,7 @@ export const runtime: <K, A, E>(
* @categories combinators
*/
export const size = <K, A, E>(self: FiberMap<K, A, E>): Effect.Effect<number> =>
Effect.sync(() => MutableHashMap.size(self.backing))
Effect.sync(() => self.state._tag === "Closed" ? 0 : MutableHashMap.size(self.state.backing))

/**
* Join all fibers in the FiberMap. If any of the Fiber's in the map terminate with a failure,
Expand Down
Loading

0 comments on commit 2e17e93

Please sign in to comment.