-
-
Notifications
You must be signed in to change notification settings - Fork 236
/
Copy pathcompletion-listener.ts
138 lines (126 loc) · 5.24 KB
/
completion-listener.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import * as Rx from 'rxjs';
import { delay, filter, map, switchMap, take } from 'rxjs/operators';
import { CloseEvent, Command } from './command';
/**
* Defines which command(s) in a list must exit successfully (with an exit code of `0`):
*
* - `first`: only the first specified command;
* - `last`: only the last specified command;
* - `all`: all commands.
* - `command-{name|index}`: only the commands with the specified names or index.
* - `!command-{name|index}`: all commands but the ones with the specified names or index.
*/
export type SuccessCondition =
| 'first'
| 'last'
| 'all'
| `command-${string | number}`
| `!command-${string | number}`;
/**
* Provides logic to determine whether lists of commands ran successfully.
*/
export class CompletionListener {
private readonly successCondition: SuccessCondition;
private readonly scheduler?: Rx.SchedulerLike;
constructor({
successCondition = 'all',
scheduler,
}: {
/**
* How this instance will define that a list of commands ran successfully.
* Defaults to `all`.
*
* @see {SuccessCondition}
*/
successCondition?: SuccessCondition;
/**
* For testing only.
*/
scheduler?: Rx.SchedulerLike;
}) {
this.successCondition = successCondition;
this.scheduler = scheduler;
}
private isSuccess(events: CloseEvent[]) {
if (!events.length) {
// When every command was aborted, consider a success.
return true;
}
if (this.successCondition === 'first') {
return events[0].exitCode === 0;
} else if (this.successCondition === 'last') {
return events[events.length - 1].exitCode === 0;
}
const commandSyntaxMatch = this.successCondition.match(/^!?command-(.+)$/);
if (commandSyntaxMatch == null) {
// If not a `command-` syntax, then it's an 'all' condition or it's treated as such.
return events.every(({ exitCode }) => exitCode === 0);
}
// Check `command-` syntax condition.
// Note that a command's `name` is not necessarily unique,
// in which case all of them must meet the success condition.
const nameOrIndex = commandSyntaxMatch[1];
const targetCommandsEvents = events.filter(
({ command, index }) => command.name === nameOrIndex || index === Number(nameOrIndex),
);
if (this.successCondition.startsWith('!')) {
// All commands except the specified ones must exit successfully
return events.every(
(event) => targetCommandsEvents.includes(event) || event.exitCode === 0,
);
}
// Only the specified commands must exit succesfully
return (
targetCommandsEvents.length > 0 &&
targetCommandsEvents.every((event) => event.exitCode === 0)
);
}
/**
* Given a list of commands, wait for all of them to exit and then evaluate their exit codes.
*
* @returns A Promise that resolves if the success condition is met, or rejects otherwise.
* In either case, the value is a list of close events for commands that spawned.
* Commands that didn't spawn are filtered out.
*/
listen(commands: Command[], abortSignal?: AbortSignal): Promise<CloseEvent[]> {
const abort =
abortSignal &&
Rx.fromEvent(abortSignal, 'abort', { once: true }).pipe(
// The abort signal must happen before commands are killed, otherwise new commands
// might spawn. Because of this, it's not be possible to capture the close events
// without an immediate delay
delay(0, this.scheduler),
map(() => undefined),
);
const closeStreams = commands.map((command) =>
abort
? // Commands that have been started must close.
Rx.race(command.close, abort.pipe(filter(() => command.state === 'stopped')))
: command.close,
);
return Rx.lastValueFrom(
Rx.combineLatest(closeStreams).pipe(
filter(() => commands.every((command) => command.state !== 'started')),
map((events) =>
events
// Filter out aborts, since they cannot be sorted and are considered success condition anyways
.filter((event): event is CloseEvent => event != null)
// Sort according to exit time
.sort(
(first, second) =>
first.timings.endDate.getTime() - second.timings.endDate.getTime(),
),
),
switchMap((events) =>
this.isSuccess(events)
? this.emitWithScheduler(Rx.of(events))
: this.emitWithScheduler(Rx.throwError(() => events)),
),
take(1),
),
);
}
private emitWithScheduler<O>(input: Rx.Observable<O>): Rx.Observable<O> {
return this.scheduler ? input.pipe(Rx.observeOn(this.scheduler)) : input;
}
}