Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fromCallback #288

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions lib/buffer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { getIterator } from './get-iterator'
import { defer, IDeferred } from './defer'
import { defer, Deferred } from 'inside-out-async'
import { AnyIterable, UnwrapAnyIterable } from './types'

interface IValueObj<T> {
Expand All @@ -10,14 +10,14 @@ interface IValueObj<T> {
function _buffer<T>(size: number, iterable: AsyncIterable<T>): AsyncIterableIterator<T> {
const iterator = getIterator(iterable)
const resultQueue: IValueObj<T>[] = []
const readQueue: IDeferred<IteratorResult<T>>[] = []
const readQueue: Deferred<IteratorResult<T>>[] = []

let reading = false
let ended = false

function fulfillReadQueue() {
while (readQueue.length > 0 && resultQueue.length > 0) {
const readDeferred = readQueue.shift() as IDeferred<IteratorResult<T>>
const readDeferred = readQueue.shift() as Deferred<IteratorResult<T>>
const { error, value } = resultQueue.shift() as IValueObj<T>
if (error) {
readDeferred.reject(error)
Expand All @@ -26,7 +26,7 @@ function _buffer<T>(size: number, iterable: AsyncIterable<T>): AsyncIterableIter
}
}
while (readQueue.length > 0 && ended) {
const { resolve } = readQueue.shift() as IDeferred<IteratorResult<T>>
const { resolve } = readQueue.shift() as Deferred<IteratorResult<T>>
resolve({ done: true, value: undefined } as any)
}
}
Expand Down
19 changes: 0 additions & 19 deletions lib/defer.ts

This file was deleted.

8 changes: 4 additions & 4 deletions lib/flat-transform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { AnyIterable, FlatMapValue } from './types'
import { flatten } from './flatten'
import { filter } from './filter'
import { getIterator } from './get-iterator'
import { defer, IDeferred } from './defer'
import { defer, Deferred } from 'inside-out-async'

function _flatTransform<T, R>(
concurrency: number,
Expand All @@ -12,7 +12,7 @@ function _flatTransform<T, R>(
const iterator = getIterator(iterable)

const resultQueue: R[] = []
const readQueue: IDeferred<IteratorResult<R>>[] = []
const readQueue: Deferred<IteratorResult<R>>[] = []

let ended = false
let reading = false
Expand All @@ -21,12 +21,12 @@ function _flatTransform<T, R>(

function fulfillReadQueue() {
while (readQueue.length > 0 && resultQueue.length > 0) {
const { resolve } = readQueue.shift() as IDeferred<IteratorResult<R>>
const { resolve } = readQueue.shift() as Deferred<IteratorResult<R>>
const value = resultQueue.shift() as R
resolve({ done: false, value } as any)
}
while (readQueue.length > 0 && inflightCount === 0 && ended) {
const { resolve, reject } = readQueue.shift() as IDeferred<IteratorResult<R>>
const { resolve, reject } = readQueue.shift() as Deferred<IteratorResult<R>>
if (lastError) {
reject(lastError)
lastError = null
Expand Down
38 changes: 38 additions & 0 deletions lib/from-callback-test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { assert } from 'chai'
import { fromCallback } from './from-callback'
import { collect } from './collect'
import EventEmitter from 'events'

describe('fromCallback', () => {
it('buffers values', async () => {
const itr = fromCallback()
itr.yield(1)
itr.yield(2)
itr.yield(3)
itr.end()
const values = await collect(itr)
assert.deepEqual(values, [1,2,3])
})
it('works with event emitters', async () => {
const emitter = new EventEmitter()
const itr = fromCallback()
emitter.on('data', itr.yield)
emitter.on('close', itr.end)
emitter.emit('data', 1)
emitter.emit('data', 2)
emitter.emit('data', 3)
emitter.emit('close')
const values = await collect(itr)
assert.deepEqual(values, [1,2,3])
})
it('ignores values after end', async () => {
const itr = fromCallback()
itr.yield(1)
itr.yield(2)
itr.yield(3)
itr.end()
itr.yield(5)
const values = await collect(itr)
assert.deepEqual(values, [1,2,3])
})
})
59 changes: 59 additions & 0 deletions lib/from-callback.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { deferGenerator } from 'inside-out-async'

export interface CallbackIterable<T> extends AsyncIterable<T> {
yield(data: T): void
end(): void
}

/**
* Returns an iterable with methods to help turn event emitters or callbacks into async iterables.

This leverages the [`inside-out-async`](https://www.npmjs.com/package/inside-out-async#deferGenerator) package which can be used directly if you want something similar for generators. (It is bundled so it's not a dependency.)

It adds two methods to the returned iterable.

- `itr.yield(data: T): void` queues data to be read
- `itr.end(): void` ends the iterable

And will buffer *all* data given to `yield()` until it's read.

```ts
import { fromCallback } from 'streaming-iterables'

const pokeLog = fromCallback()
itr.yield('Charmander')
itr.yield('Ash')
itr.yield('Pokeball')
itr.end()

for await (const pokeData of pokeLog) {
console.log(pokeData) // Charmander, Ash, Pokeball
}

// To use it as a callback
const emitter = new EventEmitter()
const consoles = fromCallback()
emitter.on('data', consoles.yield)
emitter.on('close', consoles.end)

emitter.emit('data', 'nintendo')
emitter.emit('data', 'sony')
emitter.emit('data', 'sega')
emitter.emit('close')

for await (const console of consoles) {
console.log(console) // 'nintendo', 'sony', 'sega'
}

```
*/
export function fromCallback<T>(): CallbackIterable<T> {
const { generator, queueValue, queueReturn } = deferGenerator<T, T, undefined>()

const cbIterable: CallbackIterable<T> = {
...generator,
yield: queueValue,
end: () => queueReturn()
}
return cbIterable
}
1 change: 1 addition & 0 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ export { throttle } from './throttle'
export { time, TimeConfig, CurriedTimeResult } from './time'
export { transform } from './transform'
export { writeToStream, WritableStreamish } from './write-to-stream'
export { fromCallback } from './from-callback'
8 changes: 4 additions & 4 deletions lib/transform.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { AnyIterable } from './types'
import { getIterator } from './get-iterator'
import { defer, IDeferred } from './defer'
import { defer, Deferred } from 'inside-out-async'

function _transform<T, R>(
concurrency: number,
Expand All @@ -10,7 +10,7 @@ function _transform<T, R>(
const iterator = getIterator(iterable)

const resultQueue: R[] = []
const readQueue: IDeferred<IteratorResult<R>>[] = []
const readQueue: Deferred<IteratorResult<R>>[] = []

let ended = false
let reading = false
Expand All @@ -19,12 +19,12 @@ function _transform<T, R>(

function fulfillReadQueue() {
while (readQueue.length > 0 && resultQueue.length > 0) {
const { resolve } = readQueue.shift() as IDeferred<IteratorResult<R>>
const { resolve } = readQueue.shift() as Deferred<IteratorResult<R>>
const value = resultQueue.shift() as R
resolve({ done: false, value } as any)
}
while (readQueue.length > 0 && inflightCount === 0 && ended) {
const { resolve, reject } = readQueue.shift() as IDeferred<IteratorResult<R>>
const { resolve, reject } = readQueue.shift() as Deferred<IteratorResult<R>>
if (lastError) {
reject(lastError)
lastError = null
Expand Down
2 changes: 1 addition & 1 deletion lib/write-to-stream-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { assert } from 'chai'
import { writeToStream } from './'
import { PassThrough, Transform, Writable } from 'stream'
import { promiseImmediate } from './util-test'
import { defer } from './defer'
import { defer } from 'inside-out-async'

describe('writeToStream', () => {
it('writes values to a stream', async () => {
Expand Down
Loading