-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathretryBackoff.ts
64 lines (62 loc) · 2.1 KB
/
retryBackoff.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import { defer, iif, Observable, throwError, timer } from 'rxjs';
import { concatMap, retryWhen, tap } from 'rxjs/operators';
import { exponentialBackoffDelay, getDelay } from '../utils';
export interface RetryBackoffConfig {
// Initial interval. It will eventually go as high as maxInterval.
initialInterval: number;
// Maximum number of retry attempts.
maxRetries?: number;
// Maximum delay between retries.
maxInterval?: number;
// When set to `true` every successful emission will reset the delay and the
// error count.
resetOnSuccess?: boolean;
// Conditional retry.
shouldRetry?: (error: any) => boolean;
backoffDelay?: (iteration: number, initialInterval: number) => number;
}
/**
* Returns an Observable that mirrors the source Observable with the exception
* of an error. If the source Observable calls error, rather than propagating
* the error call this method will resubscribe to the source Observable with
* exponentially increasing interval and up to a maximum of count
* resubscriptions (if provided). Retrying can be cancelled at any point if
* shouldRetry returns false.
*/
export function retryBackoff(
config: number | RetryBackoffConfig
): <T>(source: Observable<T>) => Observable<T> {
const {
initialInterval,
maxRetries = Infinity,
maxInterval = Infinity,
shouldRetry = () => true,
resetOnSuccess = false,
backoffDelay = exponentialBackoffDelay,
} = typeof config === 'number' ? { initialInterval: config } : config;
return <T>(source: Observable<T>) =>
defer(() => {
let index = 0;
return source.pipe(
retryWhen<T>(errors =>
errors.pipe(
concatMap(error => {
const attempt = index++;
return iif(
() => attempt < maxRetries && shouldRetry(error),
timer(
getDelay(backoffDelay(attempt, initialInterval), maxInterval)
),
throwError(error)
);
})
)
),
tap(() => {
if (resetOnSuccess) {
index = 0;
}
})
);
});
}