diff --git a/.changeset/gentle-dingos-fail.md b/.changeset/gentle-dingos-fail.md
new file mode 100644
index 00000000000..ef94ab86376
--- /dev/null
+++ b/.changeset/gentle-dingos-fail.md
@@ -0,0 +1,8 @@
+---
+"effect": patch
+"@effect/experimental": patch
+"@effect/rpc": patch
+"@effect/sql": patch
+---
+
+change: BatchedRequestResolver works with NonEmptyArray
diff --git a/packages/effect/src/RequestResolver.ts b/packages/effect/src/RequestResolver.ts
index 49dce15a04a..4e8f79ec7de 100644
--- a/packages/effect/src/RequestResolver.ts
+++ b/packages/effect/src/RequestResolver.ts
@@ -2,6 +2,7 @@
* @since 2.0.0
*/
+import type { NonEmptyArray } from "./Array.js"
import * as Context from "./Context.js"
import * as Effect from "./Effect.js"
import type * as Either from "./Either.js"
@@ -138,7 +139,7 @@ export const makeWithEntry: (
* @category constructors
*/
export const makeBatched: , R>(
- run: (requests: Array) => Effect.Effect
+ run: (requests: NonEmptyArray) => Effect.Effect
) => RequestResolver = internal.makeBatched
/**
@@ -269,7 +270,7 @@ export const fromFunction: >(
* @category constructors
*/
export const fromFunctionBatched: >(
- f: (chunk: Array) => Iterable>
+ f: (chunk: NonEmptyArray) => Iterable>
) => RequestResolver = internal.fromFunctionBatched
/**
diff --git a/packages/effect/src/internal/dataSource.ts b/packages/effect/src/internal/dataSource.ts
index fd67947bc73..5c2fdff599a 100644
--- a/packages/effect/src/internal/dataSource.ts
+++ b/packages/effect/src/internal/dataSource.ts
@@ -24,21 +24,21 @@ export const makeWithEntry = (
/** @internal */
export const makeBatched = , R>(
- run: (requests: Array) => Effect.Effect
+ run: (requests: RA.NonEmptyArray) => Effect.Effect
): RequestResolver.RequestResolver =>
new core.RequestResolverImpl(
(requests) => {
if (requests.length > 1) {
return core.forEachSequentialDiscard(requests, (block) => {
const filtered = block.filter((_) => !_.state.completed).map((_) => _.request)
- if (filtered.length === 0) {
+ if (!RA.isNonEmptyArray(filtered)) {
return core.void
}
return invokeWithInterrupt(run(filtered), block)
})
} else if (requests.length === 1) {
const filtered = requests[0].filter((_) => !_.state.completed).map((_) => _.request)
- if (filtered.length === 0) {
+ if (!RA.isNonEmptyArray(filtered)) {
return core.void
}
return run(filtered)
@@ -210,7 +210,7 @@ export const eitherWith = dual<
export const fromFunction = >(
f: (request: A) => Request.Request.Success
): RequestResolver.RequestResolver =>
- makeBatched((requests: Array) =>
+ makeBatched((requests: RA.NonEmptyArray) =>
core.forEachSequentialDiscard(
requests,
(request) => complete(request, core.exitSucceed(f(request)) as any)
@@ -219,9 +219,9 @@ export const fromFunction = >(
/** @internal */
export const fromFunctionBatched = >(
- f: (chunk: Array) => Iterable>
+ f: (chunk: RA.NonEmptyArray) => Iterable>
): RequestResolver.RequestResolver =>
- makeBatched((as: Array) =>
+ makeBatched((as: RA.NonEmptyArray) =>
Effect.forEach(
f(as),
(res, i) => complete(as[i], core.exitSucceed(res) as any),
@@ -233,7 +233,7 @@ export const fromFunctionBatched = >(
export const fromEffect = >(
f: (a: A) => Effect.Effect, Request.Request.Error, R>
): RequestResolver.RequestResolver =>
- makeBatched((requests: Array) =>
+ makeBatched((requests: RA.NonEmptyArray) =>
Effect.forEach(
requests,
(a) => Effect.flatMap(Effect.exit(f(a)), (e) => complete(a, e as any)),
@@ -261,7 +261,7 @@ export const fromEffectTagged = <
A,
ReturnType extends Effect.Effect ? R : never
> =>
- makeBatched((requests: Array) => {
+ makeBatched((requests: RA.NonEmptyArray) => {
const grouped: Record> = {}
const tags: Array = []
for (let i = 0, len = requests.length; i < len; i++) {
diff --git a/packages/experimental/src/RequestResolver.ts b/packages/experimental/src/RequestResolver.ts
index 00cd82f3208..6d7949f0b7a 100644
--- a/packages/experimental/src/RequestResolver.ts
+++ b/packages/experimental/src/RequestResolver.ts
@@ -178,7 +178,7 @@ export const persisted: {
result: Request.Request.Result
): Effect.Effect => Effect.ignoreLogged(storage.set(request as any, result))
- return RequestResolver.makeBatched((requests: Array) =>
+ return RequestResolver.makeBatched((requests: Arr.NonEmptyArray) =>
Effect.flatMap(partition(requests), ([remaining, results]) => {
const completeCached = Effect.forEach(
results,
diff --git a/packages/experimental/test/RequestResolver.test.ts b/packages/experimental/test/RequestResolver.test.ts
index 28fe2567a43..9bb7689a629 100644
--- a/packages/experimental/test/RequestResolver.test.ts
+++ b/packages/experimental/test/RequestResolver.test.ts
@@ -7,6 +7,7 @@ import { NodeContext } from "@effect/platform-node"
import { Schema } from "@effect/schema"
import * as it from "@effect/vitest"
import { Array, Effect, Exit, Layer, PrimaryKey, Request, RequestResolver, TestClock } from "effect"
+import type { NonEmptyArray } from "effect/Array"
import { assert, describe } from "vitest"
class User extends Schema.Class("User")({
@@ -42,7 +43,7 @@ describe("RequestResolver", () => {
it.effect(storeId, () =>
Effect.gen(function*(_) {
let count = 0
- const baseResolver = RequestResolver.makeBatched((reqs: Array) => {
+ const baseResolver = RequestResolver.makeBatched((reqs: NonEmptyArray) => {
count += reqs.length
return Effect.forEach(reqs, (req) => {
if (req.id === -1) return Request.fail(req, "not found")
diff --git a/packages/rpc/src/Resolver.ts b/packages/rpc/src/Resolver.ts
index 3185eac5e26..a58061a8a06 100644
--- a/packages/rpc/src/Resolver.ts
+++ b/packages/rpc/src/Resolver.ts
@@ -31,7 +31,7 @@ export const make =
(
const getDecode = withRequestTag((req) => Schema.decodeUnknown(Serializable.exitSchema(req)))
const getDecodeChunk = withRequestTag((req) => Schema.decodeUnknown(Schema.Chunk(Serializable.exitSchema(req))))
- return RequestResolver.makeBatched((requests: Array>) => {
+ return RequestResolver.makeBatched((requests: Arr.NonEmptyArray>) => {
const [effectRequests, streamRequests] = Arr.partition(
requests,
(_): _ is Rpc.Request => StreamRequestTypeId in _.request
diff --git a/packages/rpc/src/ResolverNoStream.ts b/packages/rpc/src/ResolverNoStream.ts
index 6719c2861ac..509506affac 100644
--- a/packages/rpc/src/ResolverNoStream.ts
+++ b/packages/rpc/src/ResolverNoStream.ts
@@ -3,6 +3,7 @@
*/
import * as Schema from "@effect/schema/Schema"
import * as Serializable from "@effect/schema/Serializable"
+import type { NonEmptyArray } from "effect/Array"
import * as Channel from "effect/Channel"
import * as Chunk from "effect/Chunk"
import * as Effect from "effect/Effect"
@@ -29,7 +30,7 @@ export const make =
(
const getDecode = withRequestTag((req) => Schema.decodeUnknown(Serializable.exitSchema(req)))
const getDecodeChunk = withRequestTag((req) => Schema.decodeUnknown(Schema.Chunk(Serializable.exitSchema(req))))
- return RequestResolver.makeBatched((requests: Array>) =>
+ return RequestResolver.makeBatched((requests: NonEmptyArray>) =>
pipe(
Effect.forEach(requests, (_) =>
Effect.map(
diff --git a/packages/sql/src/Resolver.ts b/packages/sql/src/Resolver.ts
index 5ae65c0464b..68a8df89487 100644
--- a/packages/sql/src/Resolver.ts
+++ b/packages/sql/src/Resolver.ts
@@ -3,6 +3,7 @@
*/
import type { ParseError } from "@effect/schema/ParseResult"
import * as Schema from "@effect/schema/Schema"
+import type { NonEmptyArray } from "effect/Array"
import * as Context from "effect/Context"
import * as Effect from "effect/Effect"
import * as Equal from "effect/Equal"
@@ -218,7 +219,7 @@ export const ordered = => {
const decodeResults = Schema.decodeUnknown(Schema.Array(options.Result))
const resolver = RequestResolver.makeBatched(
- (requests: Array>) => {
+ (requests: NonEmptyArray>) => {
const [inputs, spanLinks] = partitionRequests(requests)
return options.execute(inputs as any).pipe(
Effect.filterOrFail(
@@ -286,7 +287,7 @@ export const grouped = , E, RI>, never, RA | R> => {
const decodeResults = Schema.decodeUnknown(Schema.Array(options.Result))
const resolver = RequestResolver.makeBatched(
- (requests: Array, E>>) => {
+ (requests: NonEmptyArray, E>>) => {
const [inputs, spanLinks] = partitionRequests(requests)
const resultMap = new Map>()
return options.execute(inputs as any).pipe(
@@ -360,7 +361,7 @@ export const findById = , E, RI>, never, RA | R> => {
const decodeResults = Schema.decodeUnknown(Schema.Array(options.Result))
const resolver = RequestResolver.makeBatched(
- (requests: Array, E>>) => {
+ (requests: NonEmptyArray, E>>) => {
const [inputs, spanLinks, idMap] = partitionRequestsById()(requests)
return options.execute(inputs as any).pipe(
Effect.bindTo("rawResults"),
@@ -425,7 +426,7 @@ const void_ = (
}
): Effect.Effect, never, R> => {
const resolver = RequestResolver.makeBatched(
- (requests: Array>) => {
+ (requests: NonEmptyArray>) => {
const [inputs, spanLinks] = partitionRequests(requests)
return options.execute(inputs as any).pipe(
Effect.andThen(