Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inter-Process Communication #498

Merged
merged 9 commits into from
Aug 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ For more details, visit https://github.com/open-cli-tools/concurrently
### `concurrently(commands[, options])`

- `commands`: an array of either strings (containing the commands to run) or objects
with the shape `{ command, name, prefixColor, env, cwd }`.
with the shape `{ command, name, prefixColor, env, cwd, ipc }`.

- `options` (optional): an object containing any of the below:
- `cwd`: the working directory to be used by all commands. Can be overriden per command.
Expand Down Expand Up @@ -405,11 +405,33 @@ It has the following properties:
- `stderr`: an RxJS observable to the command's `stderr`.
- `error`: an RxJS observable to the command's error events (e.g. when it fails to spawn).
- `timer`: an RxJS observable to the command's timing events (e.g. starting, stopping).
- `messages`: an object with the following properties:

- `incoming`: an RxJS observable for the IPC messages received from the underlying process.
- `outgoing`: an RxJS observable for the IPC messages sent to the underlying process.

Both observables emit [`MessageEvent`](#messageevent)s.<br>
Note that if the command wasn't spawned with IPC support, these won't emit any values.

- `close`: an RxJS observable to the command's close events.
See [`CloseEvent`](#CloseEvent) for more information.
- `start()`: starts the command, setting up all
- `start()`: starts the command and sets up all of the above streams
- `send(message[, handle, options])`: sends a message to the underlying process via IPC channels,
returning a promise that resolves once the message has been sent.
See [Node.js docs](https://nodejs.org/docs/latest/api/child_process.html#subprocesssendmessage-sendhandle-options-callback).
- `kill([signal])`: kills the command, optionally specifying a signal (e.g. `SIGTERM`, `SIGKILL`, etc).

### `MessageEvent`

An object that represents a message that was received from/sent to the underlying command process.<br>
It has the following properties:

- `message`: the message itself.
- `handle`: a [`net.Socket`](https://nodejs.org/docs/latest/api/net.html#class-netsocket),
[`net.Server`](https://nodejs.org/docs/latest/api/net.html#class-netserver) or
[`dgram.Socket`](https://nodejs.org/docs/latest/api/dgram.html#class-dgramsocket),
if one was sent, or `undefined`.

### `CloseEvent`

An object with information about a command's closing event.<br>
Expand Down
165 changes: 164 additions & 1 deletion src/command.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { autoUnsubscribe, subscribeSpyTo } from '@hirez_io/observer-spy';
import { SpawnOptions } from 'child_process';
import { SendHandle, SpawnOptions } from 'child_process';
import { EventEmitter } from 'events';
import * as Rx from 'rxjs';
import { Readable, Writable } from 'stream';
Expand All @@ -16,14 +16,19 @@ import {
type CommandValues = { error: unknown; close: CloseEvent; timer: unknown[] };

let process: ChildProcess;
let sendMessage: jest.Mock;
let spawn: jest.Mocked<SpawnCommand>;
let killProcess: KillProcess;

const IPC_FD = 3;

autoUnsubscribe();

beforeEach(() => {
sendMessage = jest.fn();
process = new (class extends EventEmitter {
readonly pid = 1;
send = sendMessage;
readonly stdout = new Readable({
read() {
// do nothing
Expand Down Expand Up @@ -248,6 +253,164 @@ describe('#start()', () => {

expect((await stderr).toString()).toBe('dang');
});

describe('on incoming messages', () => {
it('does not share to the incoming messages stream, if IPC is disabled', () => {
const { command } = createCommand();
const spy = subscribeSpyTo(command.messages.incoming);
command.start();

process.emit('message', {});
expect(spy.getValuesLength()).toBe(0);
});

it('shares to the incoming messages stream, if IPC is enabled', () => {
const { command } = createCommand({ ipc: IPC_FD });
const spy = subscribeSpyTo(command.messages.incoming);
command.start();

const message1 = {};
process.emit('message', message1, undefined);

const message2 = {};
const handle = {} as SendHandle;
process.emit('message', message2, handle);

expect(spy.getValuesLength()).toBe(2);
expect(spy.getValueAt(0)).toEqual({ message: message1, handle: undefined });
expect(spy.getValueAt(1)).toEqual({ message: message2, handle });
});
});

describe('on outgoing messages', () => {
it('calls onSent with an error if the process does not have IPC enabled', () => {
const { command } = createCommand({ ipc: IPC_FD });
command.start();

Object.assign(process, {
// The TS types don't assume `send` can be undefined,
// despite the Node docs saying so
send: undefined,
});

const onSent = jest.fn();
command.messages.outgoing.next({ message: {}, onSent });
expect(onSent).toHaveBeenCalledWith(expect.any(Error));
});

it('sends the message to the process', () => {
const { command } = createCommand({ ipc: IPC_FD });
command.start();

const message1 = {};
command.messages.outgoing.next({ message: message1, onSent() {} });

const message2 = {};
const handle = {} as SendHandle;
command.messages.outgoing.next({ message: message2, handle, onSent() {} });

const message3 = {};
const options = {};
command.messages.outgoing.next({ message: message3, options, onSent() {} });

expect(process.send).toHaveBeenCalledTimes(3);
expect(process.send).toHaveBeenNthCalledWith(
1,
message1,
undefined,
undefined,
expect.any(Function),
);
expect(process.send).toHaveBeenNthCalledWith(
2,
message2,
handle,
undefined,
expect.any(Function),
);
expect(process.send).toHaveBeenNthCalledWith(
3,
message3,
undefined,
options,
expect.any(Function),
);
});

it('sends the message to the process, if it starts late', () => {
const { command } = createCommand({ ipc: IPC_FD });
command.messages.outgoing.next({ message: {}, onSent() {} });
expect(process.send).not.toHaveBeenCalled();

command.start();
expect(process.send).toHaveBeenCalled();
});

it('calls onSent with the result of sending the message', () => {
const { command } = createCommand({ ipc: IPC_FD });
command.start();

const onSent = jest.fn();
command.messages.outgoing.next({ message: {}, onSent });
expect(onSent).not.toHaveBeenCalled();

sendMessage.mock.calls[0][3]();
expect(onSent).toHaveBeenCalledWith(undefined);

const error = new Error();
sendMessage.mock.calls[0][3](error);
expect(onSent).toHaveBeenCalledWith(error);
});
});
});

describe('#send()', () => {
it('throws if IPC is not set up', () => {
const { command } = createCommand();
const fn = () => command.send({});
expect(fn).toThrow();
});

it('pushes the message on the outgoing messages stream', () => {
const { command } = createCommand({ ipc: IPC_FD });
const spy = subscribeSpyTo(command.messages.outgoing);

const message1 = { foo: true };
command.send(message1);

const message2 = { bar: 123 };
const handle = {} as SendHandle;
command.send(message2, handle);

const message3 = { baz: 'yes' };
const options = {};
command.send(message3, undefined, options);

expect(spy.getValuesLength()).toBe(3);
expect(spy.getValueAt(0)).toMatchObject({
message: message1,
handle: undefined,
options: undefined,
});
expect(spy.getValueAt(1)).toMatchObject({ message: message2, handle, options: undefined });
expect(spy.getValueAt(2)).toMatchObject({ message: message3, handle: undefined, options });
});

it('resolves when onSent callback is called with no arguments', async () => {
const { command } = createCommand({ ipc: IPC_FD });
const spy = subscribeSpyTo(command.messages.outgoing);
const promise = command.send({});
spy.getFirstValue().onSent();
await expect(promise).resolves.toBeUndefined();
});

it('rejects when onSent callback is called with an argument', async () => {
const { command } = createCommand({ ipc: IPC_FD });
const spy = subscribeSpyTo(command.messages.outgoing);
const promise = command.send({});
spy.getFirstValue().onSent('foo');
await expect(promise).rejects.toBe('foo');
});
});

describe('#kill()', () => {
Expand Down
Loading
Loading