Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Abort async implementation (promises) early if abortEarly is set #956

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 197 additions & 1 deletion library/src/schemas/object/objectAsync.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { describe, expect, test } from 'vitest';
import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest';
import { checkAsync } from '../../actions/index.ts';
import { pipeAsync } from '../../index.ts';
import type { FailureDataset, InferIssue } from '../../types/index.ts';
import {
expectNoSchemaIssueAsync,
Expand Down Expand Up @@ -289,4 +291,198 @@ describe('objectAsync', () => {
} satisfies FailureDataset<InferIssue<typeof schema>>);
});
});

describe('should abort async validation early', () => {
function timeout<T>(ms: number, result: T): Promise<T> {
return new Promise((resolve) => setTimeout(() => resolve(result), ms));
}

beforeEach(() => {
vi.useFakeTimers();
});

afterEach(() => {
vi.useRealTimers();
});

test('with sync validation failed', async () => {
const schema = objectAsync({
key1: string(),
key2: pipeAsync(
string(),
checkAsync(() => timeout(1000, true))
),
});

const resultPromise = schema['~run'](
{ value: { key1: 42, key2: 'string' } },
{ abortEarly: true }
);

const result = Promise.race([
resultPromise,
timeout(1, 'validation was not aborted early'),
]);

// advance `timeout(1)` promise to resolve
await vi.advanceTimersToNextTimerAsync();

// assert that `result` is resolved to validation result
expect(await result).toStrictEqual({
issues: [
{
abortEarly: true,
abortPipeEarly: undefined,
expected: 'string',
input: 42,
issues: undefined,
kind: 'schema',
lang: undefined,
message: 'Invalid type: Expected string but received 42',
path: [
{
input: {
key1: 42,
key2: 'string',
},
key: 'key1',
origin: 'value',
type: 'object',
value: 42,
},
],
received: '42',
requirement: undefined,
type: 'string',
},
],
typed: false,
value: {},
});
});

test('with fast async validation failed', async () => {
const schema = objectAsync({
key1: pipeAsync(
string(),
checkAsync(() => timeout(1000, false))
),
key2: pipeAsync(
string(),
checkAsync(() => timeout(5000, true))
),
});

const resultPromise = schema['~run'](
{ value: { key1: 'string', key2: 'string' } },
{ abortEarly: true }
);

const result = Promise.race([
resultPromise,
timeout(2000, 'validation was not aborted early'),
]);

// advance `timeout(1000)` validation promise and `timeout(2000)` limit promiseto resolve
await vi.advanceTimersByTimeAsync(3000);

// assert that `result` is resolved to validation result
expect(await result).toStrictEqual({
issues: [
{
abortEarly: true,
abortPipeEarly: undefined,
expected: null,
input: 'string',
issues: undefined,
kind: 'validation',
lang: undefined,
message: 'Invalid input: Received "string"',
path: [
{
input: {
key1: 'string',
key2: 'string',
},
key: 'key1',
origin: 'value',
type: 'object',
value: 'string',
},
],
received: '"string"',
requirement: expect.any(Function),
type: 'check',
},
],
typed: false,
value: {},
});
});

test('should not execute async validation at all in case of sync validation failure', async () => {
const watch = vi.fn();

const schema = objectAsync({
key1: string(),
key2: pipeAsync(
string(),
checkAsync(() => {
watch();
return timeout(1000, true);
})
),
});

const resultPromise = schema['~run'](
{ value: { key1: 42, key2: 'string' } },
{ abortEarly: true }
);

const result = Promise.race([
resultPromise,
timeout(1, 'validation was not aborted early'),
]);

// advance `timeout(1)` promise to resolve
await vi.advanceTimersToNextTimerAsync();

// assert that `result` is resolved to validation result
expect(await result).not.toEqual('validation was not aborted early');

// assert that `watch` was not called
// meaning that async validation was not executed
expect(watch).toHaveBeenCalledTimes(0);
});

test('async validation should be executed simultaneously', async () => {
const schema = objectAsync({
key1: pipeAsync(
string(),
checkAsync(() => timeout(1000, true))
),
key2: pipeAsync(
string(),
checkAsync(() => timeout(1000, true))
),
});

const resultPromise = schema['~run'](
{ value: { key1: 'string', key2: 'string' } },
{ abortEarly: true }
);

const result = Promise.race([
resultPromise,
timeout(1500, 'validation too long'), // ensure that async validation is not sequential
]);

await vi.advanceTimersByTimeAsync(2000);

// assert that `result` is resolved to validation result
// meaning that async validation was executed simultaneously,
// both promises were resolved in 1000ms
expect(await result).not.toBe('validation too long');
});
});
});
76 changes: 66 additions & 10 deletions library/src/schemas/object/objectAsync.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type {
BaseIssue,
BaseSchemaAsync,
ErrorMessage,
InferObjectInput,
Expand Down Expand Up @@ -111,16 +112,71 @@ export function objectAsync(
// Hint: We do not distinguish between missing and `undefined` entries.
// The reason for this decision is that it reduces the bundle size, and
// we also expect that most users will expect this behavior.
const valueDatasets = await Promise.all(
Object.entries(this.entries).map(async ([key, schema]) => {
const value = input[key as keyof typeof input];
return [
key,
value,
await schema['~run']({ value }, config),
] as const;
})
);

// Type of value dataset
type ValueDataset = [
/* key */ string,
/* value */ unknown,
/* result */ OutputDataset<unknown, BaseIssue<unknown>>,
];

const isPromise = (value: unknown): value is PromiseLike<unknown> =>
typeof value === 'object' && value !== null && 'then' in value;

// Flag to abort early synchronously
let shouldAbortEarly = false;

// Array of value datasets
const valueDatasets: ValueDataset[] = [];

// Array of value datasets promises
const valueDatasetsAsync: Array<Promise<ValueDataset>> = [];

// Add each value dataset promise to array, but do not await them yet
for (const [key, schema] of Object.entries(this.entries)) {
const value = input[key as keyof typeof input];
const result = schema['~run']({ value }, config);

// If validation result is a promise - add promise to array, to await later
if (isPromise(result)) {
valueDatasetsAsync.push(
result.then((resolved) => [key, value, resolved])
);
}

// If got validation result synchronously - add it to result array right away
else {
// Add sync dataset to result array
valueDatasets.push([key, value, result]);

// If there are issues, abort early
if (config.abortEarly && result.issues) {
shouldAbortEarly = true;
break;
}
}
}

// Await for async datasets
if (valueDatasetsAsync.length > 0 && !shouldAbortEarly) {
await new Promise<void>((resolve) => {
let awaited = 0;
for (const promise of valueDatasetsAsync) {
promise.then((dataset) => {
if (awaited > -1) {
valueDatasets.push(dataset);
if (
++awaited === valueDatasetsAsync.length ||
(dataset[2].issues && config.abortEarly)
) {
awaited = -1;
resolve();
}
}
});
}
});
}

// Process each value dataset
for (const [key, value, valueDataset] of valueDatasets) {
Expand Down