forked from ReactiveX/rxjs
-
Notifications
You must be signed in to change notification settings - Fork 1
/
combineLatest-support.ts
83 lines (71 loc) · 2.34 KB
/
combineLatest-support.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import {Operator} from '../Operator';
import {Observable} from '../Observable';
import {Subscriber} from '../Subscriber';
import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import {OuterSubscriber} from '../OuterSubscriber';
import {subscribeToResult} from '../util/subscribeToResult';
export class CombineLatestOperator<T extends Observable<U>, U, R> implements Operator<T, U[] | R> {
constructor(private project?: (...values: U[]) => R) {
}
call(subscriber: Subscriber<U[] | R>): Subscriber<T> {
return new CombineLatestSubscriber<T, U, R>(subscriber, this.project);
}
}
export class CombineLatestSubscriber<T extends Observable<U>, U, R> extends OuterSubscriber<T, R> {
private active: number = 0;
private values: any[] = [];
private observables: any[] = [];
private toRespond: number[] = [];
constructor(destination: Subscriber<U[] | R>, private project?: (...values: U[]) => R) {
super(destination);
}
protected _next(observable: T) {
const toRespond = this.toRespond;
toRespond.push(toRespond.length);
this.observables.push(observable);
}
protected _complete() {
const observables = this.observables;
const len = observables.length;
if (len === 0) {
this.destination.complete();
} else {
this.active = len;
for (let i = 0; i < len; i++) {
const observable = observables[i];
this.add(subscribeToResult(this, observable, observable, i));
}
}
}
notifyComplete(unused: Subscriber<R>): void {
if ((this.active -= 1) === 0) {
this.destination.complete();
}
}
notifyNext(observable: any, value: R, outerIndex: number, innerIndex: number) {
const values = this.values;
values[outerIndex] = value;
const toRespond = this.toRespond;
if (toRespond.length > 0) {
const found = toRespond.indexOf(outerIndex);
if (found !== -1) {
toRespond.splice(found, 1);
}
}
if (toRespond.length === 0) {
const project = this.project;
const destination = this.destination;
if (project) {
const result = tryCatch(project).apply(this, values);
if (result === errorObject) {
destination.error(errorObject.e);
} else {
destination.next(result);
}
} else {
destination.next(values);
}
}
}
}