Skip to content

Commit

Permalink
[breaking] Abort commands not running when max processes < N (#460)
Browse files Browse the repository at this point in the history
  • Loading branch information
gustavohenke authored Mar 24, 2024
1 parent bae6fe5 commit 8ab6ff1
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 87 deletions.
33 changes: 31 additions & 2 deletions src/completion-listener.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,41 @@ const createController = (successCondition?: SuccessCondition) =>
scheduler,
});

const emitFakeCloseEvent = (command: FakeCommand, event?: Partial<CloseEvent>) =>
command.close.next(createFakeCloseEvent({ ...event, command, index: command.index }));
const emitFakeCloseEvent = (command: FakeCommand, event?: Partial<CloseEvent>) => {
const fakeEvent = createFakeCloseEvent({ ...event, command, index: command.index });
command.state = 'exited';
command.close.next(fakeEvent);
return fakeEvent;
};

const flushPromises = () => new Promise((resolve) => setTimeout(resolve, 0));

describe('listen', () => {
it('completes only when commands emit a close event, returns close event', async () => {
const abortCtrl = new AbortController();
const result = createController('all').listen(commands, abortCtrl.signal);

commands[0].state = 'started';
abortCtrl.abort();

const event = emitFakeCloseEvent(commands[0]);
scheduler.flush();

await expect(result).resolves.toHaveLength(1);
await expect(result).resolves.toEqual([event]);
});

it('completes when abort signal is received and command is stopped, returns nothing', async () => {
const abortCtrl = new AbortController();
// Use success condition = first to test index access when there are no close events
const result = createController('first').listen([new FakeCommand()], abortCtrl.signal);

abortCtrl.abort();
scheduler.flush();

await expect(result).resolves.toHaveLength(0);
});

it('check for success once all commands have emitted at least a single close event', async () => {
const finallyCallback = jest.fn();
const result = createController().listen(commands).finally(finallyCallback);
Expand Down
57 changes: 43 additions & 14 deletions src/completion-listener.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as Rx from 'rxjs';
import { filter, map, switchMap, take } from 'rxjs/operators';
import { delay, filter, map, switchMap, take } from 'rxjs/operators';

import { CloseEvent, Command } from './command';

Expand Down Expand Up @@ -48,6 +48,11 @@ export class CompletionListener {
}

private isSuccess(events: CloseEvent[]) {
if (!events.length) {
// When every command was aborted, consider a success.
return true;
}

if (this.successCondition === 'first') {
return events[0].exitCode === 0;
} else if (this.successCondition === 'last') {
Expand All @@ -56,7 +61,7 @@ export class CompletionListener {

const commandSyntaxMatch = this.successCondition.match(/^!?command-(.+)$/);
if (commandSyntaxMatch == null) {
// If not a `command-` syntax, then it's an 'all' condition, or it's treated as such.
// If not a `command-` syntax, then it's an 'all' condition or it's treated as such.
return events.every(({ exitCode }) => exitCode === 0);
}

Expand All @@ -73,7 +78,7 @@ export class CompletionListener {
(event) => targetCommandsEvents.includes(event) || event.exitCode === 0,
);
}
// Only the specified commands must exit successfully
// Only the specified commands must exit succesfully
return (
targetCommandsEvents.length > 0 &&
targetCommandsEvents.every((event) => event.exitCode === 0)
Expand All @@ -84,23 +89,47 @@ export class CompletionListener {
* Given a list of commands, wait for all of them to exit and then evaluate their exit codes.
*
* @returns A Promise that resolves if the success condition is met, or rejects otherwise.
* In either case, the value is a list of close events for commands that spawned.
* Commands that didn't spawn are filtered out.
*/
listen(commands: Command[]): Promise<CloseEvent[]> {
const closeStreams = commands.map((command) => command.close);
listen(commands: Command[], abortSignal?: AbortSignal): Promise<CloseEvent[]> {
const abort =
abortSignal &&
Rx.fromEvent(abortSignal, 'abort', { once: true }).pipe(
// The abort signal must happen before commands are killed, otherwise new commands
// might spawn. Because of this, it's not be possible to capture the close events
// without an immediate delay
delay(0, this.scheduler),
map(() => undefined),
);

const closeStreams = commands.map((command) =>
abort
? // Commands that have been started must close.
Rx.race(command.close, abort.pipe(filter(() => command.state === 'stopped')))
: command.close,
);
return Rx.lastValueFrom(
Rx.combineLatest(closeStreams).pipe(
filter(() => commands.every((command) => command.state !== 'started')),
map((exitInfos) =>
exitInfos.sort(
(first, second) =>
first.timings.endDate.getTime() - second.timings.endDate.getTime(),
filter((events) =>
commands.every(
(command, i) => command.state !== 'started' || events[i] === undefined,
),
),
switchMap((exitInfos) =>
this.isSuccess(exitInfos)
? this.emitWithScheduler(Rx.of(exitInfos))
: this.emitWithScheduler(Rx.throwError(() => exitInfos)),
map((events) =>
events
// Filter out aborts, since they cannot be sorted and are considered success condition anyways
.filter((event): event is CloseEvent => event != null)
// Sort according to exit time
.sort(
(first, second) =>
first.timings.endDate.getTime() - second.timings.endDate.getTime(),
),
),
switchMap((events) =>
this.isSuccess(events)
? this.emitWithScheduler(Rx.of(events))
: this.emitWithScheduler(Rx.throwError(() => events)),
),
take(1),
),
Expand Down
10 changes: 10 additions & 0 deletions src/concurrently.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@ it('spawns commands up to percent based limit at once', () => {
expect(spawn).toHaveBeenCalledWith('qux', expect.objectContaining({}));
});

it('does not spawn further commands on abort signal aborted', () => {
const abortController = new AbortController();
create(['foo', 'bar'], { maxProcesses: 1, abortSignal: abortController.signal });
expect(spawn).toHaveBeenCalledTimes(1);

abortController.abort();
processes[0].emit('close', 0, null);
expect(spawn).toHaveBeenCalledTimes(1);
});

it('runs controllers with the commands', () => {
create(['echo', '"echo wrapped"']);

Expand Down
18 changes: 12 additions & 6 deletions src/concurrently.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ export type ConcurrentlyResult = {
* A promise that resolves when concurrently ran successfully according to the specified
* success condition, or reject otherwise.
*
* Both the resolved and rejected value is the list of all command's close events.
* Both the resolved and rejected value is a list of all the close events for commands that
* spawned; commands that didn't spawn are filtered out.
*/
result: Promise<CloseEvent[]>;
};
Expand Down Expand Up @@ -105,6 +106,11 @@ export type ConcurrentlyOptions = {
*/
successCondition?: SuccessCondition;

/**
* A signal to stop spawning further processes.
*/
abortSignal?: AbortSignal;

/**
* Which flow controllers should be applied on commands spawned by concurrently.
* Defaults to an empty array.
Expand Down Expand Up @@ -217,11 +223,11 @@ export function concurrently(
: Number(options.maxProcesses)) || commandsLeft.length,
);
for (let i = 0; i < maxProcesses; i++) {
maybeRunMore(commandsLeft);
maybeRunMore(commandsLeft, options.abortSignal);
}

const result = new CompletionListener({ successCondition: options.successCondition })
.listen(commands)
.listen(commands, options.abortSignal)
.finally(() => {
handleResult.onFinishCallbacks.forEach((onFinish) => onFinish());
});
Expand Down Expand Up @@ -263,14 +269,14 @@ function parseCommand(command: CommandInfo, parsers: CommandParser[]) {
);
}

function maybeRunMore(commandsLeft: Command[]) {
function maybeRunMore(commandsLeft: Command[], abortSignal?: AbortSignal) {
const command = commandsLeft.shift();
if (!command) {
if (!command || abortSignal?.aborted) {
return;
}

command.start();
command.close.subscribe(() => {
maybeRunMore(commandsLeft);
maybeRunMore(commandsLeft, abortSignal);
});
}
39 changes: 16 additions & 23 deletions src/flow-control/kill-on-signal.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import { KillOnSignal } from './kill-on-signal';
let commands: Command[];
let controller: KillOnSignal;
let process: EventEmitter;
let abortController: AbortController;
beforeEach(() => {
process = new EventEmitter();
commands = [new FakeCommand(), new FakeCommand()];
controller = new KillOnSignal({ process });
abortController = new AbortController();
controller = new KillOnSignal({ process, abortController });
});

it('returns commands that keep non-close streams from original commands', () => {
Expand Down Expand Up @@ -51,29 +53,20 @@ it('returns commands that keep non-SIGINT exit codes', () => {
expect(callback).toHaveBeenCalledWith(expect.objectContaining({ exitCode: 1 }));
});

it('kills all commands on SIGINT', () => {
controller.handle(commands);
process.emit('SIGINT');

expect(process.listenerCount('SIGINT')).toBe(1);
expect(commands[0].kill).toHaveBeenCalledWith('SIGINT');
expect(commands[1].kill).toHaveBeenCalledWith('SIGINT');
});

it('kills all commands on SIGTERM', () => {
controller.handle(commands);
process.emit('SIGTERM');
describe.each(['SIGINT', 'SIGTERM', 'SIGHUP'])('on %s', (signal) => {
it('kills all commands', () => {
controller.handle(commands);
process.emit(signal);

expect(process.listenerCount('SIGTERM')).toBe(1);
expect(commands[0].kill).toHaveBeenCalledWith('SIGTERM');
expect(commands[1].kill).toHaveBeenCalledWith('SIGTERM');
});
expect(process.listenerCount(signal)).toBe(1);
expect(commands[0].kill).toHaveBeenCalledWith(signal);
expect(commands[1].kill).toHaveBeenCalledWith(signal);
});

it('kills all commands on SIGHUP', () => {
controller.handle(commands);
process.emit('SIGHUP');
it('sends abort signal', () => {
controller.handle(commands);
process.emit(signal);

expect(process.listenerCount('SIGHUP')).toBe(1);
expect(commands[0].kill).toHaveBeenCalledWith('SIGHUP');
expect(commands[1].kill).toHaveBeenCalledWith('SIGHUP');
expect(abortController.signal.aborted).toBe(true);
});
});
11 changes: 10 additions & 1 deletion src/flow-control/kill-on-signal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,25 @@ import { FlowController } from './flow-controller';
*/
export class KillOnSignal implements FlowController {
private readonly process: EventEmitter;
private readonly abortController?: AbortController;

constructor({ process }: { process: EventEmitter }) {
constructor({
process,
abortController,
}: {
process: EventEmitter;
abortController?: AbortController;
}) {
this.process = process;
this.abortController = abortController;
}

handle(commands: Command[]) {
let caughtSignal: NodeJS.Signals;
(['SIGINT', 'SIGTERM', 'SIGHUP'] as NodeJS.Signals[]).forEach((signal) => {
this.process.on(signal, () => {
caughtSignal = signal;
this.abortController?.abort();
commands.forEach((command) => command.kill(signal));
});
});
Expand Down
Loading

0 comments on commit 8ab6ff1

Please sign in to comment.