diff --git a/src/execution/__tests__/flattenAsyncIterable-test.ts b/src/execution/__tests__/flattenAsyncIterable-test.ts new file mode 100644 index 0000000000..d174dd06c6 --- /dev/null +++ b/src/execution/__tests__/flattenAsyncIterable-test.ts @@ -0,0 +1,219 @@ +import { expect } from 'chai'; +import { describe, it } from 'mocha'; + +import { expectPromise } from '../../__testUtils__/expectPromise.js'; + +import { flattenAsyncIterable } from '../flattenAsyncIterable.js'; + +describe('flattenAsyncIterable', () => { + it('flatten nested async generators', async () => { + async function* source() { + yield await Promise.resolve({ + value: 1, + nestedIterable: (async function* nested(): AsyncGenerator< + number, + void, + void + > { + yield await Promise.resolve(1.1); + yield await Promise.resolve(1.2); + })(), + }); + yield await Promise.resolve({ + value: 2, + nestedIterable: (async function* nested(): AsyncGenerator< + number, + void, + void + > { + yield await Promise.resolve(2.1); + yield await Promise.resolve(2.2); + })(), + }); + } + + const doubles = flattenAsyncIterable(source(), (item) => item); + + const result = []; + for await (const x of doubles) { + result.push(x); + } + expect(result).to.deep.equal([1, 1.1, 1.2, 2, 2.1, 2.2]); + }); + + it('passes through errors from a nested async generator', async () => { + async function* source() { + yield await Promise.resolve({ + value: 1, + nestedIterable: (async function* nested(): AsyncGenerator< + number, + void, + void + > { + yield await Promise.resolve(1.1); + yield await Promise.resolve(1.2); + })(), + }); + throw new Error('ouch'); + } + /* c8 ignore stop */ + + const doubles = flattenAsyncIterable(source(), (item) => item); + + expect(await doubles.next()).to.deep.equal({ value: 1, done: false }); + expect(await doubles.next()).to.deep.equal({ value: 1.1, done: false }); + expect(await doubles.next()).to.deep.equal({ value: 1.2, done: false }); + + await expectPromise(doubles.next()).toRejectWith('ouch'); + }); + + it('allows returning early from a nested async generator', async () => { + async function* source() { + yield await Promise.resolve({ + value: 1, + nestedIterable: (async function* nested(): AsyncGenerator< + number, + void, + void + > { + yield await Promise.resolve(1.1); + yield await Promise.resolve(1.2); + })(), + }); + yield await Promise.resolve({ + value: 2, + nestedIterable: (async function* nested(): AsyncGenerator< + number, + void, + void + > { + yield await Promise.resolve(2.1); /* c8 ignore start */ + // Not reachable, early return + yield await Promise.resolve(2.2); + })(), + }); + // Not reachable, early return + yield await Promise.resolve({ + value: 3, + nestedIterable: (async function* nested(): AsyncGenerator< + number, + void, + void + > { + yield await Promise.resolve(3.1); + yield await Promise.resolve(3.2); + })(), + }); + } + /* c8 ignore stop */ + + const doubles = flattenAsyncIterable(source(), (item) => item); + + expect(await doubles.next()).to.deep.equal({ value: 1, done: false }); + expect(await doubles.next()).to.deep.equal({ value: 1.1, done: false }); + expect(await doubles.next()).to.deep.equal({ value: 1.2, done: false }); + expect(await doubles.next()).to.deep.equal({ value: 2, done: false }); + expect(await doubles.next()).to.deep.equal({ value: 2.1, done: false }); + + // Early return + expect(await doubles.return()).to.deep.equal({ + value: undefined, + done: true, + }); + + // Subsequent next calls + expect(await doubles.next()).to.deep.equal({ + value: undefined, + done: true, + }); + expect(await doubles.next()).to.deep.equal({ + value: undefined, + done: true, + }); + }); + + it('allows throwing errors into a nested async generator', async () => { + async function* source() { + yield await Promise.resolve({ + value: 1, + nestedIterable: (async function* nested(): AsyncGenerator< + number, + void, + void + > { + yield await Promise.resolve(1.1); + yield await Promise.resolve(1.2); + })(), + }); + yield await Promise.resolve({ + value: 2, + nestedIterable: (async function* nested(): AsyncGenerator< + number, + void, + void + > { + yield await Promise.resolve(2.1); /* c8 ignore start */ + // Not reachable, early return + yield await Promise.resolve(2.2); + })(), + }); + // Not reachable, early return + yield await Promise.resolve({ value: 3 }); + } + /* c8 ignore stop */ + + const doubles = flattenAsyncIterable(source(), (item) => item); + + expect(await doubles.next()).to.deep.equal({ value: 1, done: false }); + expect(await doubles.next()).to.deep.equal({ value: 1.1, done: false }); + expect(await doubles.next()).to.deep.equal({ value: 1.2, done: false }); + expect(await doubles.next()).to.deep.equal({ value: 2, done: false }); + expect(await doubles.next()).to.deep.equal({ value: 2.1, done: false }); + + // Throw error + await expectPromise(doubles.throw(new Error('ouch'))).toRejectWith('ouch'); + }); + + it('completely yields sub-iterables even when next() called in parallel', async () => { + async function* source() { + yield await Promise.resolve({ + value: 1, + nestedIterable: (async function* nested(): AsyncGenerator< + number, + void, + void + > { + yield await Promise.resolve(1.1); + yield await Promise.resolve(1.2); + })(), + }); + yield await Promise.resolve({ + value: 2, + nestedIterable: (async function* nested(): AsyncGenerator< + number, + void, + void + > { + yield await Promise.resolve(2.1); + yield await Promise.resolve(2.2); + })(), + }); + } + + const result = flattenAsyncIterable(source(), (item) => item); + + const promise1 = result.next(); + const promise2 = result.next(); + const promise3 = result.next(); + expect(await promise1).to.deep.equal({ value: 1, done: false }); + expect(await promise2).to.deep.equal({ value: 1.1, done: false }); + expect(await promise3).to.deep.equal({ value: 1.2, done: false }); + expect(await result.next()).to.deep.equal({ value: 2, done: false }); + expect(await result.next()).to.deep.equal({ value: 2.1, done: false }); + expect(await result.next()).to.deep.equal({ value: 2.2, done: false }); + expect(await result.next()).to.deep.equal({ + value: undefined, + done: true, + }); + }); +}); diff --git a/src/execution/__tests__/subscribe-test.ts b/src/execution/__tests__/subscribe-test.ts index ffa1c85276..0360267faa 100644 --- a/src/execution/__tests__/subscribe-test.ts +++ b/src/execution/__tests__/subscribe-test.ts @@ -24,6 +24,7 @@ import type { ExecutionArgs } from '../execute.js'; import { createSourceEventStream, executeSubscriptionEvent, + legacyExperimentalSubscribeIncrementally, subscribe, } from '../execute.js'; import type { ExecutionResult } from '../types.js'; @@ -100,6 +101,7 @@ const emailSchema = new GraphQLSchema({ function createSubscription( pubsub: SimplePubSub, variableValues?: { readonly [variable: string]: unknown }, + useExperimentalSubscribeIncrementally = false, ) { const document = parse(` subscription ( @@ -151,7 +153,11 @@ function createSubscription( }), }; - return subscribe({ + const subscribeFn = useExperimentalSubscribeIncrementally + ? legacyExperimentalSubscribeIncrementally + : subscribe; + + return subscribeFn({ schema: emailSchema, document, rootValue: data, @@ -810,6 +816,109 @@ describe('Subscription Publish Phase', () => { }); }); + it('legacyExperimentalSubscribeIncrementally function works with @defer', async () => { + const pubsub = new SimplePubSub(); + const subscription = await createSubscription( + pubsub, + { + shouldDefer: true, + }, + true, + ); + assert(isAsyncIterable(subscription)); + // Wait for the next subscription payload. + const payload = subscription.next(); + + // A new email arrives! + expect( + pubsub.emit({ + from: 'yuzhi@graphql.org', + subject: 'Alright', + message: 'Tests are good', + unread: true, + }), + ).to.equal(true); + + // The previously waited on payload now has a value. + expectJSON(await payload).toDeepEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { from: 'yuzhi@graphql.org', subject: 'Alright' }, + }, + }, + pending: [{ id: '0', path: ['importantEmail'] }], + hasNext: true, + }, + }); + + expectJSON(await subscription.next()).toDeepEqual({ + done: false, + value: { + incremental: [ + { + data: { + inbox: { unread: 1, total: 2 }, + }, + id: '0', + }, + ], + completed: [{ id: '0' }], + hasNext: false, + }, + }); + + // Another new email arrives, after all incrementally delivered payloads are received. + expect( + pubsub.emit({ + from: 'hyo@graphql.org', + subject: 'Tools', + message: 'I <3 making things', + unread: true, + }), + ).to.equal(true); + + // The next waited on payload will have a value. + expectJSON(await subscription.next()).toDeepEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { from: 'hyo@graphql.org', subject: 'Tools' }, + }, + }, + pending: [{ id: '0', path: ['importantEmail'] }], + hasNext: true, + }, + }); + + expectJSON(await subscription.next()).toDeepEqual({ + done: false, + value: { + incremental: [ + { + data: { inbox: { unread: 2, total: 3 } }, + id: '0', + }, + ], + completed: [{ id: '0' }], + hasNext: false, + }, + }); + + expectJSON(await subscription.return()).toDeepEqual({ + done: true, + value: undefined, + }); + + // Awaiting a subscription after closing it results in completed results. + expectJSON(await subscription.next()).toDeepEqual({ + done: true, + value: undefined, + }); + }); + it('subscribe function returns errors with @stream', async () => { const pubsub = new SimplePubSub(); const subscription = await createSubscription(pubsub, { @@ -893,6 +1002,99 @@ describe('Subscription Publish Phase', () => { }); }); + it('legacyExperimentalSubscribeIncrementally function works with @stream', async () => { + const pubsub = new SimplePubSub(); + const subscription = await createSubscription( + pubsub, + { + shouldStream: true, + }, + true, + ); + assert(isAsyncIterable(subscription)); + // Wait for the next subscription payload. + const payload = subscription.next(); + + // A new email arrives! + expect( + pubsub.emit({ + from: 'yuzhi@graphql.org', + subject: 'Alright', + message: 'Tests are good', + unread: true, + }), + ).to.equal(true); + + // The previously waited on payload now has a value. + expectJSON(await payload).toDeepEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { from: 'yuzhi@graphql.org', subject: 'Alright' }, + inbox: { emails: [], unread: 1, total: 2 }, + }, + }, + pending: [{ id: '0', path: ['importantEmail', 'inbox', 'emails'] }], + hasNext: true, + }, + }); + + expectJSON(await subscription.next()).toDeepEqual({ + done: false, + value: { + incremental: [{ id: '0', items: [{}, {}] }], + completed: [{ id: '0' }], + hasNext: false, + }, + }); + + // Another new email arrives, after all incrementally delivered payloads are received. + expect( + pubsub.emit({ + from: 'hyo@graphql.org', + subject: 'Tools', + message: 'I <3 making things', + unread: true, + }), + ).to.equal(true); + + // The next waited on payload will have a value. + expectJSON(await subscription.next()).toDeepEqual({ + done: false, + value: { + data: { + importantEmail: { + email: { from: 'hyo@graphql.org', subject: 'Tools' }, + inbox: { emails: [], unread: 2, total: 3 }, + }, + }, + pending: [{ id: '0', path: ['importantEmail', 'inbox', 'emails'] }], + hasNext: true, + }, + }); + + expectJSON(await subscription.next()).toDeepEqual({ + done: false, + value: { + incremental: [{ id: '0', items: [{}, {}, {}] }], + completed: [{ id: '0' }], + hasNext: false, + }, + }); + + expectJSON(await subscription.return()).toDeepEqual({ + done: true, + value: undefined, + }); + + // Awaiting a subscription after closing it results in completed results. + expectJSON(await subscription.next()).toDeepEqual({ + done: true, + value: undefined, + }); + }); + it('produces a payload when there are multiple events', async () => { const pubsub = new SimplePubSub(); const subscription = createSubscription(pubsub); diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 8cd23ee307..03012756f2 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -65,6 +65,7 @@ import { collectFields, collectSubfields as _collectSubfields, } from './collectFields.js'; +import { flattenAsyncIterable } from './flattenAsyncIterable.js'; import { getVariableSignature } from './getVariableSignature.js'; import { buildIncrementalResponse } from './IncrementalPublisher.js'; import { mapAsyncIterable } from './mapAsyncIterable.js'; @@ -74,10 +75,12 @@ import type { ExecutionResult, ExperimentalIncrementalExecutionResults, IncrementalDataRecord, + InitialIncrementalExecutionResult, PendingExecutionGroup, StreamItemRecord, StreamItemResult, StreamRecord, + SubsequentIncrementalExecutionResult, } from './types.js'; import { DeferredFragmentRecord } from './types.js'; import type { VariableValues } from './values.js'; @@ -158,8 +161,11 @@ export interface ValidatedExecutionArgs { subscribeFieldResolver: GraphQLFieldResolver; perEventExecutor: ( validatedExecutionArgs: ValidatedExecutionArgs, - ) => PromiseOrValue; + ) => PromiseOrValue< + ExecutionResult | ExperimentalIncrementalExecutionResults + >; enableEarlyExecution: boolean; + allowIncrementalSubscriptions: boolean; hideSuggestions: boolean; abortSignal: AbortSignal | undefined; } @@ -445,6 +451,7 @@ export function executeSync(args: ExecutionArgs): ExecutionResult { */ export function validateExecutionArgs( args: ExecutionArgs, + allowIncrementalSubscriptions = false, ): ReadonlyArray | ValidatedExecutionArgs { const { schema, @@ -541,8 +548,10 @@ export function validateExecutionArgs( fieldResolver: fieldResolver ?? defaultFieldResolver, typeResolver: typeResolver ?? defaultTypeResolver, subscribeFieldResolver: subscribeFieldResolver ?? defaultFieldResolver, - perEventExecutor: perEventExecutor ?? executeSubscriptionEvent, + perEventExecutor: + perEventExecutor ?? experimentalExecuteQueryOrMutationOrSubscriptionEvent, enableEarlyExecution: enableEarlyExecution === true, + allowIncrementalSubscriptions, hideSuggestions, abortSignal: args.abortSignal ?? undefined, }; @@ -1178,7 +1187,8 @@ function getStreamUsage( ._streamUsage; } - const { operation, variableValues } = validatedExecutionArgs; + const { operation, variableValues, allowIncrementalSubscriptions } = + validatedExecutionArgs; // validation only allows equivalent streams on multiple fields, so it is // safe to only check the first fieldNode for the stream directive const stream = getDirectiveValues( @@ -1207,7 +1217,8 @@ function getStreamUsage( ); invariant( - operation.operation !== OperationTypeNode.SUBSCRIPTION, + allowIncrementalSubscriptions || + operation.operation !== OperationTypeNode.SUBSCRIPTION, '`@stream` directive not supported on subscription operations. Disable `@stream` by setting the `if` argument to `false`.', ); @@ -1907,8 +1918,9 @@ function collectAndExecuteSubfields( if (newDeferUsages.length > 0) { invariant( - validatedExecutionArgs.operation.operation !== - OperationTypeNode.SUBSCRIPTION, + validatedExecutionArgs.allowIncrementalSubscriptions || + validatedExecutionArgs.operation.operation !== + OperationTypeNode.SUBSCRIPTION, '`@defer` directive not supported on subscription operations. Disable `@defer` by setting the `if` argument to `false`.', ); } @@ -2100,10 +2112,57 @@ export function subscribe( args: ExecutionArgs, ): PromiseOrValue< AsyncGenerator | ExecutionResult +> { + const resultOrStream = legacyExperimentalSubscribeIncrementally(args, false); + + if (isPromise(resultOrStream)) { + return resultOrStream.then(handleMaybeResponseStream); + } + + return handleMaybeResponseStream(resultOrStream); +} + +function handleMaybeResponseStream( + maybeSubscription: + | ExecutionResult + | AsyncGenerator< + | ExecutionResult + | InitialIncrementalExecutionResult + | SubsequentIncrementalExecutionResult, + void, + void + >, +): AsyncGenerator | ExecutionResult { + // Return early errors. + if (!isAsyncIterable(maybeSubscription)) { + return maybeSubscription; + } + + return mapAsyncIterable(maybeSubscription, (result) => { + invariant(!('hasNext' in result), UNEXPECTED_MULTIPLE_PAYLOADS); + return result; + }); +} + +export function legacyExperimentalSubscribeIncrementally( + args: ExecutionArgs, + allowIncrementalSubscriptions = true, +): PromiseOrValue< + | AsyncGenerator< + | ExecutionResult + | InitialIncrementalExecutionResult + | SubsequentIncrementalExecutionResult, + void, + void + > + | ExecutionResult > { // If a valid execution context cannot be created due to incorrect arguments, // a "Response" with only errors is returned. - const validatedExecutionArgs = validateExecutionArgs(args); + const validatedExecutionArgs = validateExecutionArgs( + args, + allowIncrementalSubscriptions, + ); // Return early errors if execution context failed. if (!('schema' in validatedExecutionArgs)) { @@ -2114,21 +2173,52 @@ export function subscribe( if (isPromise(resultOrStream)) { return resultOrStream.then((resolvedResultOrStream) => - mapSourceToResponse(validatedExecutionArgs, resolvedResultOrStream), + handleMaybeEventStream(validatedExecutionArgs, resolvedResultOrStream), ); } - return mapSourceToResponse(validatedExecutionArgs, resultOrStream); + return handleMaybeEventStream(validatedExecutionArgs, resultOrStream); } -function mapSourceToResponse( +function handleMaybeEventStream( validatedExecutionArgs: ValidatedExecutionArgs, resultOrStream: ExecutionResult | AsyncIterable, -): AsyncGenerator | ExecutionResult { +): + | ExecutionResult + | AsyncGenerator< + | ExecutionResult + | InitialIncrementalExecutionResult + | SubsequentIncrementalExecutionResult, + void, + void + > { if (!isAsyncIterable(resultOrStream)) { return resultOrStream; } + return flattenAsyncIterable( + mapSourceToResponse(validatedExecutionArgs, resultOrStream), + (result) => + 'initialResult' in result + ? { + value: result.initialResult, + nestedIterable: result.subsequentResults, + } + : { + value: result, + nestedIterable: undefined, + }, + ); +} + +function mapSourceToResponse( + validatedExecutionArgs: ValidatedExecutionArgs, + stream: AsyncIterable, +): AsyncGenerator< + ExecutionResult | ExperimentalIncrementalExecutionResults, + void, + void +> { const abortSignal = validatedExecutionArgs.abortSignal; const abortSignalListener = abortSignal ? new AbortSignalListener(abortSignal) @@ -2140,8 +2230,8 @@ function mapSourceToResponse( // the GraphQL specification.. return mapAsyncIterable( abortSignalListener - ? cancellableIterable(resultOrStream, abortSignalListener) - : resultOrStream, + ? cancellableIterable(stream, abortSignalListener) + : stream, (payload: unknown) => { const perEventExecutionArgs: ValidatedExecutionArgs = { ...validatedExecutionArgs, diff --git a/src/execution/flattenAsyncIterable.ts b/src/execution/flattenAsyncIterable.ts new file mode 100644 index 0000000000..310e41809c --- /dev/null +++ b/src/execution/flattenAsyncIterable.ts @@ -0,0 +1,114 @@ +import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; + +/** + * Given an AsyncIterable possibly containing additional AsyncIterables, + * flatten all items into a single AsyncIterable. + */ +export function flattenAsyncIterable( + iterable: AsyncIterable, + onValue: (maybeValueWithIterable: TItem) => { + value: TSingle | TInitial; + nestedIterable?: AsyncIterable | undefined; + }, +): AsyncGenerator { + // You might think this whole function could be replaced with + // + // async function* flattenAsyncIterable(iterable) { + // for await (const item of iterable) { + // yield item.value; + // yield* item.iterable; + // } + // } + // + // but calling `.return()` on the iterable it returns won't interrupt the `for await`. + + const topIterator = iterable[Symbol.asyncIterator](); + let currentNestedIterator: AsyncIterator | undefined; + let waitForCurrentNestedIterator: Promise | undefined; + let done = false; + + async function next(): Promise< + IteratorResult + > { + if (done) { + return { value: undefined, done: true }; + } + + try { + if (!currentNestedIterator) { + // Somebody else is getting it already. + if (waitForCurrentNestedIterator) { + await waitForCurrentNestedIterator; + return await next(); + } + // Nobody else is getting it. We should! + // eslint-disable-next-line @typescript-eslint/no-invalid-void-type + const { resolve, promise } = promiseWithResolvers(); + waitForCurrentNestedIterator = promise; + const topIteratorResult = await topIterator.next(); + if (topIteratorResult.done) { + // Given that done only ever transitions from false to true, + // require-atomic-updates is being unnecessarily cautious. + // eslint-disable-next-line require-atomic-updates + done = true; + return await next(); + } + const { value, nestedIterable } = onValue(topIteratorResult.value); + if (nestedIterable) { + // eslint is making a reasonable point here, but we've explicitly protected + // ourself from the race condition by ensuring that only the single call + // that assigns to waitForCurrentNestedIterator is allowed to assign to + // currentNestedIterator or waitForCurrentNestedIterator. + // eslint-disable-next-line require-atomic-updates + currentNestedIterator = nestedIterable[Symbol.asyncIterator](); + } + // eslint-disable-next-line require-atomic-updates + waitForCurrentNestedIterator = undefined; + resolve(); + return { value, done: false }; + } + + const rememberCurrentNestedIterator = currentNestedIterator; + const nestedIteratorResult = await currentNestedIterator.next(); + if (!nestedIteratorResult.done) { + return nestedIteratorResult; + } + + // The nested iterator is done. If it's still the current one, make it not + // current. (If it's not the current one, somebody else has made us move on.) + if (currentNestedIterator === rememberCurrentNestedIterator) { + currentNestedIterator = undefined; + } + return await next(); + } catch (err) { + done = true; + throw err; + } + } + return { + next, + async return(): Promise> { + done = true; + await Promise.all([ + currentNestedIterator?.return?.(), + topIterator.return?.(), + ]); + return { value: undefined, done: true }; + }, + async throw( + error?: unknown, + ): Promise> { + done = true; + await Promise.all([ + currentNestedIterator?.throw?.(error), + topIterator.throw?.(error), + ]); + /* c8 ignore next */ + throw error; + }, + [Symbol.asyncIterator]() { + /* c8 ignore next */ + return this; + }, + }; +} diff --git a/src/execution/index.ts b/src/execution/index.ts index fe9764c64f..c07c330b47 100644 --- a/src/execution/index.ts +++ b/src/execution/index.ts @@ -11,6 +11,7 @@ export { defaultFieldResolver, defaultTypeResolver, subscribe, + legacyExperimentalSubscribeIncrementally, } from './execute.js'; export type { ExecutionArgs, ValidatedExecutionArgs } from './execute.js'; diff --git a/src/index.ts b/src/index.ts index 10f6d02ca7..784506fbf6 100644 --- a/src/index.ts +++ b/src/index.ts @@ -344,6 +344,7 @@ export { getVariableValues, getDirectiveValues, subscribe, + legacyExperimentalSubscribeIncrementally, createSourceEventStream, } from './execution/index.js';