From dd7f1663f9fc0f10d38c1df0b72b2116f9ab7d24 Mon Sep 17 00:00:00 2001 From: rdunk Date: Thu, 14 Nov 2024 16:40:41 +0000 Subject: [PATCH] fix(comlink): improve reconnect handling --- packages/comlink/src/constants.ts | 5 ++- packages/comlink/src/controller.ts | 3 +- packages/comlink/src/node.ts | 72 ++++++++++++++++++++---------- packages/comlink/src/request.ts | 7 ++- packages/comlink/src/types.ts | 1 + 5 files changed, 59 insertions(+), 29 deletions(-) diff --git a/packages/comlink/src/constants.ts b/packages/comlink/src/constants.ts index cf4e546a5..b9b774461 100644 --- a/packages/comlink/src/constants.ts +++ b/packages/comlink/src/constants.ts @@ -4,7 +4,10 @@ import type {MessageType} from './types' export const DOMAIN = 'sanity/comlink' /** @internal */ -export const RESPONSE_TIMEOUT = 10000 +export const RESPONSE_TIMEOUT_DEFAULT = 3_000 + +/** @internal */ +export const FETCH_TIMEOUT_DEFAULT = 10_000 /** @internal */ export const HEARTBEAT_INTERVAL = 1000 diff --git a/packages/comlink/src/controller.ts b/packages/comlink/src/controller.ts index a7a963411..0ade2d86e 100644 --- a/packages/comlink/src/controller.ts +++ b/packages/comlink/src/controller.ts @@ -260,8 +260,7 @@ export const createController = (input: {targetOrigin: string}): Controller => { const stop = () => { channels.forEach((channel) => { - channel.disconnect() - channel.stop() + cleanupChannel(channel as unknown as Channel) }) } diff --git a/packages/comlink/src/node.ts b/packages/comlink/src/node.ts index 35ca4c8a2..1cdf2b699 100644 --- a/packages/comlink/src/node.ts +++ b/packages/comlink/src/node.ts @@ -5,6 +5,7 @@ import { createActor, emit, enqueueActions, + fromCallback, raise, setup, stopChild, @@ -13,6 +14,7 @@ import { import {createListenLogic, listenInputFromContext} from './common' import { DOMAIN, + FETCH_TIMEOUT_DEFAULT, MSG_DISCONNECT, MSG_HANDSHAKE_ACK, MSG_HANDSHAKE_SYN, @@ -127,13 +129,15 @@ export const createNodeMachine = < events: | {type: 'heartbeat.received'; message: MessageEvent>} | {type: 'message.received'; message: MessageEvent>} + | {type: 'handshake.syn'; message: MessageEvent>} | { type: 'post' data: V resolvable?: PromiseWithResolvers options?: { + responseTimeout?: number signal?: AbortSignal - suppressWarning?: boolean + suppressWarnings?: boolean } } | {type: 'request.aborted'; requestId: string} @@ -196,6 +200,7 @@ export const createNodeMachine = < from: context.name, parentRef: self, resolvable: request.resolvable, + responseTimeout: request.options?.responseTimeout, responseTo: request.responseTo, signal: request.options?.signal, sources: context.target!, @@ -293,15 +298,15 @@ export const createNodeMachine = < }), 'set connection config': assign({ connectionId: ({event}) => { - assertEvent(event, 'message.received') + assertEvent(event, 'handshake.syn') return event.message.data.connectionId }, target: ({event}) => { - assertEvent(event, 'message.received') + assertEvent(event, 'handshake.syn') return event.message.source || undefined }, targetOrigin: ({event}) => { - assertEvent(event, 'message.received') + assertEvent(event, 'handshake.syn') return event.message.origin }, }), @@ -323,6 +328,17 @@ export const createNodeMachine = < target: undefined, targetOrigin: null, }), + // Always listen for handshake syn messages. The channel could have + // disconnected without being able to notify the node, and so need to + // re-establish the connection. + invoke: { + id: 'listen for handshake syn', + src: 'listen', + input: listenInputFromContext({ + include: MSG_HANDSHAKE_SYN, + responseType: 'handshake.syn', + }), + }, on: { 'request.success': { actions: 'remove request', @@ -333,32 +349,22 @@ export const createNodeMachine = < 'request.aborted': { actions: 'remove request', }, + 'handshake.syn': { + actions: 'set connection config', + target: '.handshaking', + }, }, initial: 'idle', states: { idle: { - invoke: { - id: 'listen for handshake syn', - src: 'listen', - input: listenInputFromContext({ - include: MSG_HANDSHAKE_SYN, - count: 1, - }), - onDone: { - target: 'handshaking', - guard: 'hasSource', - }, - }, on: { - 'message.received': { - actions: 'set connection config', - }, - 'post': { + post: { actions: 'buffer message', }, }, }, handshaking: { + guard: 'hasSource', entry: 'send handshake syn ack', invoke: [ { @@ -387,7 +393,13 @@ export const createNodeMachine = < id: 'listen for messages', src: 'listen', input: listenInputFromContext({ - exclude: [MSG_DISCONNECT, MSG_HANDSHAKE_ACK, MSG_HEARTBEAT, MSG_RESPONSE], + exclude: [ + MSG_DISCONNECT, + MSG_HANDSHAKE_SYN, + MSG_HANDSHAKE_ACK, + MSG_HEARTBEAT, + MSG_RESPONSE, + ], }), }, ], @@ -413,7 +425,13 @@ export const createNodeMachine = < id: 'listen for messages', src: 'listen', input: listenInputFromContext({ - exclude: [MSG_RESPONSE, MSG_HEARTBEAT], + exclude: [ + MSG_DISCONNECT, + MSG_HANDSHAKE_SYN, + MSG_HANDSHAKE_ACK, + MSG_HEARTBEAT, + MSG_RESPONSE, + ], }), }, { @@ -504,14 +522,20 @@ export const createNode = ( const fetch = ( data: WithoutResponse, - options?: {signal?: AbortSignal; suppressWarnings?: boolean}, + options?: { + responseTimeout?: number + signal?: AbortSignal + suppressWarnings?: boolean + }, ) => { + const {responseTimeout = FETCH_TIMEOUT_DEFAULT, signal, suppressWarnings} = options || {} + const resolvable = Promise.withResolvers() actor.send({ type: 'post', data, resolvable, - options, + options: {responseTimeout, signal, suppressWarnings}, }) return resolvable.promise as never } diff --git a/packages/comlink/src/request.ts b/packages/comlink/src/request.ts index 8462351d7..de57e0141 100644 --- a/packages/comlink/src/request.ts +++ b/packages/comlink/src/request.ts @@ -8,7 +8,7 @@ import { type ActorRefFrom, type AnyActorRef, } from 'xstate' -import {MSG_RESPONSE, RESPONSE_TIMEOUT} from './constants' +import {MSG_RESPONSE, RESPONSE_TIMEOUT_DEFAULT} from './constants' import type {Message, MessageData, MessageType, ProtocolMessage, ResponseMessage} from './types' const throwOnEvent = @@ -34,6 +34,7 @@ export interface RequestMachineContext { parentRef: AnyActorRef resolvable: PromiseWithResolvers | undefined response: S['response'] | null + responseTimeout: number | undefined responseTo: string | undefined signal: AbortSignal | undefined suppressWarnings: boolean | undefined @@ -82,6 +83,7 @@ export const createRequestMachine = < from: string parentRef: AnyActorRef resolvable?: PromiseWithResolvers + responseTimeout?: number responseTo?: string signal?: AbortSignal sources: Set | MessageEventSource @@ -176,7 +178,7 @@ export const createRequestMachine = < }, delays: { initialTimeout: 0, - responseTimeout: RESPONSE_TIMEOUT, + responseTimeout: ({context}) => context.responseTimeout ?? RESPONSE_TIMEOUT_DEFAULT, }, }).createMachine({ /** @xstate-layout N4IgpgJg5mDOIC5QAoC2BDAxgCwJYDswBKAOlwgBswBiAD1gBd0GwT0AzFgJ2QNwdzoKAFVyowAewCuDItTRY8hUuSoBtAAwBdRKAAOE2P1wT8ukLUQBGAEwBWEgBYAnK+eOAzB7sB2DzY8rABoQAE9rDQc3V0cNTw8fAA4NHwBfVJCFHAJiElgwfAgCKGpNHSQQAyMBU3NLBDsrDxI7DTaAjQA2OOcNDxDwhHsNJx9Ou0TOq2cJxP9HdMyMbOU8gqL8ErUrcv1DY1qK+sbm1vaPLp6+gcRnGydo9wDGycWQLKVc9AB3dGNN6jiWCwdAwMrmKoHMxHRCJRKOEiJHwuZKBZwXKzBMKIGyYkhtAkXOweTqOHw2RJvD45Ug-P4CAH0JgsNicMA8LhwAz4fKicTSWTyZafWm-f5QcEVSE1aGgepwhFIlF9aYYrGDC4+JzEppjGzOUkeGbpDIgfASCBwczU5QQ-YyuqIAC0nRuCBd+IJXu9KSpwppZEoYDt1RMsosiEcNjdVjiJEeGisiSTHkcVgWpptuXyhWKIahjqGzi1BqRJINnVcdkcbuTLS9VYC8ISfsUAbp4vzDphCHJIyjBvJNlxNmRNexQ3sJGH43GPj8jWJrZWuXYfyoEC7YcLsbrgRsjkcvkmdgNbopVhIPhVfnsh8ClMz-tWsCkmEwcHgUvt257u8v+6Hse4xnhOdZnImVidPqCRNB4JqpEAA */ @@ -191,6 +193,7 @@ export const createRequestMachine = < parentRef: input.parentRef, resolvable: input.resolvable, response: null, + responseTimeout: input.responseTimeout, responseTo: input.responseTo, signal: input.signal, sources: input.sources instanceof Set ? input.sources : new Set([input.sources]), diff --git a/packages/comlink/src/types.ts b/packages/comlink/src/types.ts index 8f93daf1e..1cbb05a8a 100644 --- a/packages/comlink/src/types.ts +++ b/packages/comlink/src/types.ts @@ -46,6 +46,7 @@ export interface RequestData { type: MessageType resolvable?: PromiseWithResolvers options?: { + responseTimeout?: number signal?: AbortSignal suppressWarnings?: boolean }