-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
4ab3e81
commit 84b2614
Showing
14 changed files
with
376 additions
and
118 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,14 @@ | ||
import { ReplaySubject, share, timer } from 'rxjs'; | ||
import { ReplaySubject, share, tap, timer } from 'rxjs'; | ||
|
||
export const cache = ttl => { | ||
return source => | ||
source.pipe( | ||
share({ | ||
// TODO: check if a buffer size is neccessary | ||
connector: () => new ReplaySubject(), | ||
resetOnComplete: () => timer(ttl) | ||
// resetOnError: false, | ||
resetOnComplete: () => timer(ttl), | ||
resetOnRefCountZero: false | ||
}) | ||
); | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
31 changes: 0 additions & 31 deletions
31
packages/operators/src/request/concurrentRequest.sample.js
This file was deleted.
Oops, something went wrong.
106 changes: 74 additions & 32 deletions
106
packages/operators/src/request/concurrentRequest.test.js
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,54 +1,96 @@ | ||
import { concatMap, delay, of, tap, toArray } from 'rxjs'; | ||
import { concatAll, concatMap, delay, from, map, of, tap, toArray } from 'rxjs'; | ||
import { TestScheduler } from 'rxjs/testing'; | ||
import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest'; | ||
import { afterEach, beforeAll, beforeEach, describe, expect, test, vi } from 'vitest'; | ||
|
||
import { concurrentRequest } from './concurrentRequest'; | ||
import { log } from '../log'; | ||
import { resolveJSON } from './response'; | ||
|
||
describe('multi fetch33', function () { | ||
describe('concurrent request - mocked', function () { | ||
const testScheduler = new TestScheduler((actual, expected) => { | ||
expect(actual).to.eql(expected); | ||
}); | ||
|
||
const getTriggerValues = () => ({ | ||
a: { t: 2, v: 'a' }, | ||
b: { t: 5, v: 'b' }, | ||
c: { t: 1, v: 'c' }, | ||
d: { t: 3, v: 'd' }, | ||
e: { t: 4, v: 'e' } | ||
}); | ||
|
||
beforeEach(function () { | ||
vi.mock('./request.js', async importOriginal => { | ||
return { | ||
request: () => source => source.pipe(concatMap(({ v, t }) => of(v).pipe(delay(t)))) | ||
}; | ||
}); | ||
vi.doMock('./request', importOriginal => ({ | ||
request: () => source => source.pipe(concatMap(({ v, t }) => of(v).pipe(delay(t)))) | ||
})); | ||
}); | ||
|
||
afterEach(() => { | ||
vi.resetAllMocks(); | ||
vi.doUnmock('./request'); | ||
}); | ||
|
||
test('test', async () => { | ||
const values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10].map(v => ({ v, t: Math.random() * 1000 })); | ||
const sortedResult = [...values].sort((a, b) => a.t - b.t).map(({ v }) => v); | ||
test('classic testing', async () => { | ||
const { concurrentRequest } = await import('./concurrentRequest'); | ||
|
||
await new Promise(done => { | ||
of(...values) | ||
.pipe(concurrentRequest(values.length), toArray()) | ||
.subscribe({ next: e => expect(e).to.eql(sortedResult), complete: () => done() }); | ||
const triggerVal = Object.values(getTriggerValues()); | ||
const sortedVal = [...triggerVal].sort((a, b) => a.t - b.t).map(({ v }) => v); | ||
|
||
await new Promise((done, error) => { | ||
from(triggerVal) | ||
.pipe(concurrentRequest(triggerVal.length), toArray()) | ||
.subscribe({ | ||
next: e => expect(e).toStrictEqual(sortedVal), | ||
complete: () => done(), | ||
error: e => error(e) | ||
}); | ||
}); | ||
}); | ||
|
||
test('test2', async () => { | ||
const triggerValues = { | ||
a: { t: 2, v: 'a' }, | ||
b: { t: 5, v: 'b' }, | ||
c: { t: 0, v: 'c' }, | ||
d: { t: 1, v: 'd' }, | ||
e: { t: 1, v: 'd' } | ||
}; | ||
const expectedValues = Object.fromEntries( | ||
Array.from(Object.entries(triggerValues)).map(([k, { v }]) => [k, v]) | ||
); | ||
test('marble testing', async () => { | ||
const { concurrentRequest } = await import('./concurrentRequest'); | ||
|
||
const triggerVal = getTriggerValues(); | ||
const expectedVal = Object.fromEntries(Object.entries(triggerVal).map(([k, { v }]) => [k, v])); | ||
|
||
testScheduler.run(({ cold, expectObservable }) => { | ||
expectObservable(cold('-a-b-c-(de)', triggerValues).pipe(concurrentRequest(3), tap())).toBe( | ||
'---a-c--(bde)', | ||
expectedValues | ||
); | ||
expectObservable( | ||
cold('-a-b-(cd)-e----', triggerVal).pipe(concurrentRequest(Object.keys(triggerVal).length)) | ||
).toBe('---a--c-(bd)--e', expectedVal); | ||
}); | ||
}); | ||
}); | ||
|
||
describe.skip('concurrent request - demo', function () { | ||
beforeAll(function () { | ||
vi.resetModules(); | ||
}); | ||
|
||
test('sample testing', async () => { | ||
const { concurrentRequest } = await import('./concurrentRequest'); | ||
|
||
await new Promise(done => { | ||
of( | ||
new URL('https://dummyjson.com/products?limit=10&skip=0&select=title,price'), | ||
new URL('https://dummyjson.com/products?limit=10&skip=10&select=title,price'), | ||
new URL('https://dummyjson.com/products?limit=10&skip=20&select=title,price'), | ||
new URL('https://dummyjson.com/products?limit=10&skip=30&select=title,price'), | ||
new URL('https://dummyjson.com/products?limit=10&skip=40&select=title,price'), | ||
new URL('https://dummyjson.com/products?limit=10&skip=50&select=title,price'), | ||
new URL('https://dummyjson.com/products?limit=10&skip=60&select=title,price'), | ||
new URL('https://dummyjson.com/products?limit=10&skip=70&select=title,price'), | ||
new URL('https://dummyjson.com/products?limit=10&skip=80&select=title,price') | ||
) | ||
.pipe( | ||
concurrentRequest(4), | ||
log(false), | ||
resolveJSON(), | ||
log(false), | ||
map(({ products }) => products), | ||
concatAll() | ||
) | ||
.subscribe({ | ||
next: e => console.log(e), | ||
complete: () => done() | ||
}); | ||
}); | ||
}); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.