-
Notifications
You must be signed in to change notification settings - Fork 3k
/
Copy pathshareReplay.ts
145 lines (139 loc) · 4.61 KB
/
shareReplay.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import { Observable } from '../Observable';
import { ReplaySubject } from '../ReplaySubject';
import { Subscription } from '../Subscription';
import { MonoTypeOperatorFunction, SchedulerLike } from '../types';
import { Subscriber } from '../Subscriber';
export interface ShareReplayConfig {
bufferSize?: number;
windowTime?: number;
refCount: boolean;
scheduler?: SchedulerLike;
}
/**
* Share source and replay specified number of emissions on subscription.
*
* This operator is a specialization of `replay` that connects to a source observable
* and multicasts through a `ReplaySubject` constructed with the specified arguments.
* A successfully completed source will stay cached in the `shareReplayed observable` forever,
* but an errored source can be retried.
*
* ## Why use shareReplay?
* You generally want to use `shareReplay` when you have side-effects or taxing computations
* that you do not wish to be executed amongst multiple subscribers.
* It may also be valuable in situations where you know you will have late subscribers to
* a stream that need access to previously emitted values.
* This ability to replay values on subscription is what differentiates {@link share} and `shareReplay`.
*
* ![](shareReplay.png)
*
* ## Example
* ```ts
* import { interval } from 'rxjs';
* import { shareReplay, take } from 'rxjs/operators';
*
* const obs$ = interval(1000);
* const shared$ = obs$.pipe(
* take(4),
* shareReplay(3)
* );
* shared$.subscribe(x => console.log('source A: ', x));
* shared$.subscribe(y => console.log('source B: ', y));
*
* ```
*
* @see {@link publish}
* @see {@link share}
* @see {@link publishReplay}
*
* @param {Number} [bufferSize=Number.POSITIVE_INFINITY] Maximum element count of the replay buffer.
* @param {Number} [windowTime=Number.POSITIVE_INFINITY] Maximum time length of the replay buffer in milliseconds.
* @param {Scheduler} [scheduler] Scheduler where connected observers within the selector function
* will be invoked on.
* @return {Observable} An observable sequence that contains the elements of a sequence produced
* by multicasting the source sequence within a selector function.
* @method shareReplay
* @owner Observable
*/
export function shareReplay<T>(
config: ShareReplayConfig
): MonoTypeOperatorFunction<T>;
export function shareReplay<T>(
bufferSize?: number,
windowTime?: number,
scheduler?: SchedulerLike
): MonoTypeOperatorFunction<T>;
export function shareReplay<T>(
configOrBufferSize?: ShareReplayConfig | number,
windowTime?: number,
scheduler?: SchedulerLike
): MonoTypeOperatorFunction<T> {
let config: ShareReplayConfig;
if (configOrBufferSize && typeof configOrBufferSize === 'object') {
config = configOrBufferSize as ShareReplayConfig;
} else {
config = {
bufferSize: configOrBufferSize as number | undefined,
windowTime,
refCount: false,
scheduler,
};
}
return (source: Observable<T>) => source.lift(shareReplayOperator(config));
}
function shareReplayOperator<T>({
bufferSize = Number.POSITIVE_INFINITY,
windowTime = Number.POSITIVE_INFINITY,
refCount: useRefCount,
scheduler,
}: ShareReplayConfig) {
let subject: ReplaySubject<T> | undefined;
let refCount = 0;
let subscription: Subscription | undefined;
let hasError = false;
let isComplete = false;
return function shareReplayOperation(
this: Subscriber<T>,
source: Observable<T>
) {
refCount++;
let innerSub: Subscription;
if (!subject || hasError) {
hasError = false;
subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler);
innerSub = subject.subscribe(this);
subscription = source.subscribe({
next(value) {
subject.next(value);
},
error(err) {
hasError = true;
subject.error(err);
},
complete() {
isComplete = true;
subscription = undefined;
subject.complete();
},
});
// Here we need to check to see if the source synchronously completed. Although
// we're setting `subscription = undefined` in the completion handler, if the source
// is synchronous, that will happen *before* subscription is set by the return of
// the `subscribe` call.
if (isComplete) {
subscription = undefined;
}
} else {
innerSub = subject.subscribe(this);
}
this.add(() => {
refCount--;
innerSub.unsubscribe();
innerSub = undefined;
if (subscription && !isComplete && useRefCount && refCount === 0) {
subscription.unsubscribe();
subscription = undefined;
subject = undefined;
}
});
};
}