Skip to content

Commit

Permalink
fix(executor): do not use leaking registerAbortSignalListener, and …
Browse files Browse the repository at this point in the history
…handle listeners inside the execution context
  • Loading branch information
ardatan committed Feb 26, 2025
1 parent 6b6ba04 commit c866968
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 48 deletions.
7 changes: 7 additions & 0 deletions .changeset/swift-geese-behave.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@graphql-tools/executor': patch
'@graphql-tools/utils': patch
---

In executor, do not use leaking `registerAbortSignalListener`, and handle listeners inside the
execution context
31 changes: 17 additions & 14 deletions packages/executor/src/execution/__tests__/abort-signal.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ describe('Abort Signal', () => {
subscribe() {
return new Repeater(async (push, stop) => {
let i = 0;
stop.then(() => {
stop.finally(() => {
stopped = true;
});

Expand Down Expand Up @@ -150,7 +150,7 @@ describe('Abort Signal', () => {
didInvokeFirstFn = true;
return true;
},
async second() {
second() {
didInvokeSecondFn = true;
controller.abort();
return true;
Expand All @@ -162,18 +162,21 @@ describe('Abort Signal', () => {
},
},
});
const result$ = normalizedExecutor({
schema,
document: parse(/* GraphQL */ `
mutation {
first
second
third
}
`),
signal: controller.signal,
});
expect(result$).rejects.toBeInstanceOf(DOMException);
await expect(
Promise.resolve().then(() =>
normalizedExecutor({
schema,
document: parse(/* GraphQL */ `
mutation {
first
second
third
}
`),
signal: controller.signal,
}),
),
).rejects.toBeInstanceOf(DOMException);
expect(didInvokeFirstFn).toBe(true);
expect(didInvokeSecondFn).toBe(true);
expect(didInvokeThirdFn).toBe(false);
Expand Down
7 changes: 4 additions & 3 deletions packages/executor/src/execution/__tests__/stream-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -576,8 +576,9 @@ describe('Execute: stream directive', () => {
path: ['friendList', 2],
},
],
hasNext: false,
hasNext: true,
},
{ hasNext: false },
]);
});

Expand Down Expand Up @@ -645,10 +646,10 @@ describe('Execute: stream directive', () => {
path: ['friendList', 2],
},
],
hasNext: false,
hasNext: true,
},
},
{ done: true, value: undefined },
{ done: false, value: { hasNext: false } },
{ done: true, value: undefined },
]);
});
Expand Down
74 changes: 55 additions & 19 deletions packages/executor/src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import {
collectFields,
createGraphQLError,
fakePromise,
getAbortPromise,
getArgumentValues,
getDefinedRootType,
GraphQLResolveInfo,
Expand All @@ -52,11 +51,10 @@ import {
Path,
pathToArray,
promiseReduce,
registerAbortSignalListener,
} from '@graphql-tools/utils';
import { TypedDocumentNode } from '@graphql-typed-document-node/core';
import { DisposableSymbols } from '@whatwg-node/disposablestack';
import { handleMaybePromise } from '@whatwg-node/promise-helpers';
import { createDeferredPromise, handleMaybePromise } from '@whatwg-node/promise-helpers';
import { coerceError } from './coerceError.js';
import { flattenAsyncIterable } from './flattenAsyncIterable.js';
import { invariant } from './invariant.js';
Expand Down Expand Up @@ -127,6 +125,8 @@ export interface ExecutionContext<TVariables = any, TContext = any> {
errors: Array<GraphQLError>;
subsequentPayloads: Set<AsyncPayloadRecord>;
signal?: AbortSignal;
onSignalAbort?(handler: () => void): void;
signalPromise?: Promise<never>;
}

export interface FormattedExecutionResult<
Expand Down Expand Up @@ -421,6 +421,8 @@ export function buildExecutionContext<TData = any, TVariables = any, TContext =
signal,
} = args;

signal?.throwIfAborted();

// If the schema used for execution is invalid, throw an error.
assertValidSchema(schema);

Expand Down Expand Up @@ -489,6 +491,31 @@ export function buildExecutionContext<TData = any, TVariables = any, TContext =
return coercedVariableValues.errors;
}

signal?.throwIfAborted();

let onSignalAbort: ExecutionContext['onSignalAbort'];
let signalPromise: ExecutionContext['signalPromise'];

if (signal) {
const listeners = new Set<() => void>();
const signalDeferred = createDeferredPromise<never>();
signalPromise = signalDeferred.promise;
const sharedListener = () => {
signalDeferred.reject(signal.reason);
signal.removeEventListener('abort', sharedListener);
};
signal.addEventListener('abort', sharedListener, { once: true });
signalPromise.catch(() => {
for (const listener of listeners) {
listener();
}
listeners.clear();
});
onSignalAbort = handler => {
listeners.add(handler);
};
}

return {
schema,
fragments,
Expand All @@ -502,6 +529,8 @@ export function buildExecutionContext<TData = any, TVariables = any, TContext =
subsequentPayloads: new Set(),
errors: [],
signal,
onSignalAbort,
signalPromise,
};
}

Expand Down Expand Up @@ -626,9 +655,9 @@ function executeFields(
}
}
} catch (error) {
if (containsPromise) {
if (error !== exeContext.signal?.reason && containsPromise) {
// Ensure that any promises returned by other fields are handled, as they may also reject.
return promiseForObject(results, exeContext.signal).finally(() => {
return promiseForObject(results, exeContext.signal, exeContext.signalPromise).finally(() => {
throw error;
});
}
Expand All @@ -643,7 +672,7 @@ function executeFields(
// Otherwise, results is a map from field name to the result of resolving that
// field, which is possibly a promise. Return a promise that will return this
// same map, but with any promises replaced with the values they resolved to.
return promiseForObject(results, exeContext.signal);
return promiseForObject(results, exeContext.signal, exeContext.signalPromise);
}

/**
Expand Down Expand Up @@ -673,6 +702,7 @@ function executeField(

// Get the resolve function, regardless of if its result is normal or abrupt (error).
try {
exeContext.signal?.throwIfAborted();
// Build a JS object of arguments from the field.arguments AST, using the
// variables scope to fulfill any variable references.
// TODO: find a way to memoize, in case this field is within a List type.
Expand Down Expand Up @@ -967,8 +997,9 @@ async function completeAsyncIteratorValue(
iterator: AsyncIterator<unknown>,
asyncPayloadRecord?: AsyncPayloadRecord,
): Promise<ReadonlyArray<unknown>> {
if (exeContext.signal && iterator.return) {
registerAbortSignalListener(exeContext.signal, () => {
exeContext.signal?.throwIfAborted();
if (iterator.return) {
exeContext.onSignalAbort?.(() => {
iterator.return?.();
});
}
Expand Down Expand Up @@ -1746,18 +1777,25 @@ function executeSubscription(exeContext: ExecutionContext): MaybePromise<AsyncIt
const result = resolveFn(rootValue, args, contextValue, info);

if (isPromise(result)) {
return result.then(assertEventStream).then(undefined, error => {
throw locatedError(error, fieldNodes, pathToArray(path));
});
return result
.then(result => assertEventStream(result, exeContext.signal, exeContext.onSignalAbort))
.then(undefined, error => {
throw locatedError(error, fieldNodes, pathToArray(path));
});
}

return assertEventStream(result, exeContext.signal);
return assertEventStream(result, exeContext.signal, exeContext.onSignalAbort);
} catch (error) {
throw locatedError(error, fieldNodes, pathToArray(path));
}
}

function assertEventStream(result: unknown, signal?: AbortSignal): AsyncIterable<unknown> {
function assertEventStream(
result: unknown,
signal?: AbortSignal,
onSignalAbort?: (handler: () => void) => void,
): AsyncIterable<unknown> {
signal?.throwIfAborted();
if (result instanceof Error) {
throw result;
}
Expand All @@ -1768,13 +1806,13 @@ function assertEventStream(result: unknown, signal?: AbortSignal): AsyncIterable
'Subscription field must return Async Iterable. ' + `Received: ${inspect(result)}.`,
);
}
if (signal) {
if (onSignalAbort) {
return {
[Symbol.asyncIterator]() {
const asyncIterator = result[Symbol.asyncIterator]();

if (asyncIterator.return) {
registerAbortSignalListener(signal, () => {
onSignalAbort?.(() => {
asyncIterator.return?.();
});
}
Expand Down Expand Up @@ -2101,8 +2139,6 @@ function yieldSubsequentPayloads(
): AsyncGenerator<SubsequentIncrementalExecutionResult, void, void> {
let isDone = false;

const abortPromise = exeContext.signal ? getAbortPromise(exeContext.signal) : undefined;

async function next(): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> {
if (isDone) {
return { value: undefined, done: true };
Expand All @@ -2112,8 +2148,8 @@ function yieldSubsequentPayloads(
record => record.promise,
);

if (abortPromise) {
await Promise.race([abortPromise, ...subSequentPayloadPromises]);
if (exeContext.signalPromise) {
await Promise.race([exeContext.signalPromise, ...subSequentPayloadPromises]);
} else {
await Promise.race(subSequentPayloadPromises);
}
Expand Down
33 changes: 22 additions & 11 deletions packages/executor/src/execution/promiseForObject.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { getAbortPromise } from '@graphql-tools/utils';
import { isPromise } from '@graphql-tools/utils';

type ResolvedObject<TData> = {
[TKey in keyof TData]: TData[TKey] extends Promise<infer TValue> ? TValue : TData[TKey];
Expand All @@ -11,19 +11,30 @@ type ResolvedObject<TData> = {
* This is akin to bluebird's `Promise.props`, but implemented only using
* `Promise.all` so it will work with any implementation of ES6 promises.
*/
export async function promiseForObject<TData>(
export function promiseForObject<TData>(
object: TData,
signal?: AbortSignal,
signalPromise?: Promise<never>,
): Promise<ResolvedObject<TData>> {
signal?.throwIfAborted();
const resolvedObject = Object.create(null);
const promises = Promise.all(
Object.entries(object as any).map(async ([key, value]) => {
resolvedObject[key] = await value;
}),
);
if (signal) {
const abortPromise = getAbortPromise(signal);
return Promise.race([abortPromise, promises]).then(() => resolvedObject);
const promises: Promise<void>[] = [];
for (const key in object) {
const value = object[key];
if (isPromise(value)) {
promises.push(
value.then(value => {
signal?.throwIfAborted();
resolvedObject[key] = value;
}),
);
} else {
resolvedObject[key] = value;
}
}
return promises.then(() => resolvedObject);
const promiseAll = Promise.all(promises);
if (signalPromise) {
return Promise.race([signalPromise, promiseAll]).then(() => resolvedObject);
}
return promiseAll.then(() => resolvedObject);
}
6 changes: 5 additions & 1 deletion packages/utils/src/registerAbortSignalListener.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { fakeRejectPromise } from '@whatwg-node/promise-helpers';
import { memoize1 } from './memoize.js';

// AbortSignal handler cache to avoid the "possible EventEmitter memory leak detected"
Expand Down Expand Up @@ -32,8 +33,11 @@ export function registerAbortSignalListener(signal: AbortSignal, listener: () =>
}

export const getAbortPromise = memoize1(function getAbortPromise(signal: AbortSignal) {
// If the signal is already aborted, return a rejected promise
if (signal.aborted) {
return fakeRejectPromise(signal.reason);
}
return new Promise<void>((_resolve, reject) => {
// If the signal is already aborted, return a rejected promise
if (signal.aborted) {
reject(signal.reason);
return;
Expand Down

0 comments on commit c866968

Please sign in to comment.