Skip to content

Commit

Permalink
fix(comlink): improve reconnect handling
Browse files Browse the repository at this point in the history
  • Loading branch information
rdunk committed Nov 14, 2024
1 parent 2d3ffb5 commit dd7f166
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 29 deletions.
5 changes: 4 additions & 1 deletion packages/comlink/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import type {MessageType} from './types'
export const DOMAIN = 'sanity/comlink'

/** @internal */
export const RESPONSE_TIMEOUT = 10000
export const RESPONSE_TIMEOUT_DEFAULT = 3_000

/** @internal */
export const FETCH_TIMEOUT_DEFAULT = 10_000

/** @internal */
export const HEARTBEAT_INTERVAL = 1000
Expand Down
3 changes: 1 addition & 2 deletions packages/comlink/src/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,7 @@ export const createController = (input: {targetOrigin: string}): Controller => {

const stop = () => {
channels.forEach((channel) => {
channel.disconnect()
channel.stop()
cleanupChannel(channel as unknown as Channel<Message, Message>)
})
}

Expand Down
72 changes: 48 additions & 24 deletions packages/comlink/src/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
createActor,
emit,
enqueueActions,
fromCallback,

Check failure on line 8 in packages/comlink/src/node.ts

View workflow job for this annotation

GitHub Actions / Are there issues that linters can fix? 🤔

'fromCallback' is defined but never used

Check failure on line 8 in packages/comlink/src/node.ts

View workflow job for this annotation

GitHub Actions / build

'fromCallback' is defined but never used
raise,
setup,
stopChild,
Expand All @@ -13,6 +14,7 @@ import {
import {createListenLogic, listenInputFromContext} from './common'
import {
DOMAIN,
FETCH_TIMEOUT_DEFAULT,
MSG_DISCONNECT,
MSG_HANDSHAKE_ACK,
MSG_HANDSHAKE_SYN,
Expand Down Expand Up @@ -127,13 +129,15 @@ export const createNodeMachine = <
events:
| {type: 'heartbeat.received'; message: MessageEvent<ProtocolMessage<HeartbeatMessage>>}
| {type: 'message.received'; message: MessageEvent<ProtocolMessage<R>>}
| {type: 'handshake.syn'; message: MessageEvent<ProtocolMessage<R>>}
| {
type: 'post'
data: V
resolvable?: PromiseWithResolvers<S['response']>
options?: {
responseTimeout?: number
signal?: AbortSignal
suppressWarning?: boolean
suppressWarnings?: boolean
}
}
| {type: 'request.aborted'; requestId: string}
Expand Down Expand Up @@ -196,6 +200,7 @@ export const createNodeMachine = <
from: context.name,
parentRef: self,
resolvable: request.resolvable,
responseTimeout: request.options?.responseTimeout,
responseTo: request.responseTo,
signal: request.options?.signal,
sources: context.target!,
Expand Down Expand Up @@ -293,15 +298,15 @@ export const createNodeMachine = <
}),
'set connection config': assign({
connectionId: ({event}) => {
assertEvent(event, 'message.received')
assertEvent(event, 'handshake.syn')
return event.message.data.connectionId
},
target: ({event}) => {
assertEvent(event, 'message.received')
assertEvent(event, 'handshake.syn')
return event.message.source || undefined
},
targetOrigin: ({event}) => {
assertEvent(event, 'message.received')
assertEvent(event, 'handshake.syn')
return event.message.origin
},
}),
Expand All @@ -323,6 +328,17 @@ export const createNodeMachine = <
target: undefined,
targetOrigin: null,
}),
// Always listen for handshake syn messages. The channel could have
// disconnected without being able to notify the node, and so need to
// re-establish the connection.
invoke: {
id: 'listen for handshake syn',
src: 'listen',
input: listenInputFromContext({
include: MSG_HANDSHAKE_SYN,
responseType: 'handshake.syn',
}),
},
on: {
'request.success': {
actions: 'remove request',
Expand All @@ -333,32 +349,22 @@ export const createNodeMachine = <
'request.aborted': {
actions: 'remove request',
},
'handshake.syn': {
actions: 'set connection config',
target: '.handshaking',
},
},
initial: 'idle',
states: {
idle: {
invoke: {
id: 'listen for handshake syn',
src: 'listen',
input: listenInputFromContext({
include: MSG_HANDSHAKE_SYN,
count: 1,
}),
onDone: {
target: 'handshaking',
guard: 'hasSource',
},
},
on: {
'message.received': {
actions: 'set connection config',
},
'post': {
post: {
actions: 'buffer message',
},
},
},
handshaking: {
guard: 'hasSource',
entry: 'send handshake syn ack',
invoke: [
{
Expand Down Expand Up @@ -387,7 +393,13 @@ export const createNodeMachine = <
id: 'listen for messages',
src: 'listen',
input: listenInputFromContext({
exclude: [MSG_DISCONNECT, MSG_HANDSHAKE_ACK, MSG_HEARTBEAT, MSG_RESPONSE],
exclude: [
MSG_DISCONNECT,
MSG_HANDSHAKE_SYN,
MSG_HANDSHAKE_ACK,
MSG_HEARTBEAT,
MSG_RESPONSE,
],
}),
},
],
Expand All @@ -413,7 +425,13 @@ export const createNodeMachine = <
id: 'listen for messages',
src: 'listen',
input: listenInputFromContext({
exclude: [MSG_RESPONSE, MSG_HEARTBEAT],
exclude: [
MSG_DISCONNECT,
MSG_HANDSHAKE_SYN,
MSG_HANDSHAKE_ACK,
MSG_HEARTBEAT,
MSG_RESPONSE,
],
}),
},
{
Expand Down Expand Up @@ -504,14 +522,20 @@ export const createNode = <R extends Message, S extends Message>(

const fetch = (
data: WithoutResponse<S>,
options?: {signal?: AbortSignal; suppressWarnings?: boolean},
options?: {
responseTimeout?: number
signal?: AbortSignal
suppressWarnings?: boolean
},
) => {
const {responseTimeout = FETCH_TIMEOUT_DEFAULT, signal, suppressWarnings} = options || {}

const resolvable = Promise.withResolvers<S['response']>()
actor.send({
type: 'post',
data,
resolvable,
options,
options: {responseTimeout, signal, suppressWarnings},
})
return resolvable.promise as never
}
Expand Down
7 changes: 5 additions & 2 deletions packages/comlink/src/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
type ActorRefFrom,
type AnyActorRef,
} from 'xstate'
import {MSG_RESPONSE, RESPONSE_TIMEOUT} from './constants'
import {MSG_RESPONSE, RESPONSE_TIMEOUT_DEFAULT} from './constants'
import type {Message, MessageData, MessageType, ProtocolMessage, ResponseMessage} from './types'

const throwOnEvent =
Expand All @@ -34,6 +34,7 @@ export interface RequestMachineContext<S extends Message> {
parentRef: AnyActorRef
resolvable: PromiseWithResolvers<S['response']> | undefined
response: S['response'] | null
responseTimeout: number | undefined
responseTo: string | undefined
signal: AbortSignal | undefined
suppressWarnings: boolean | undefined
Expand Down Expand Up @@ -82,6 +83,7 @@ export const createRequestMachine = <
from: string
parentRef: AnyActorRef
resolvable?: PromiseWithResolvers<S['response']>
responseTimeout?: number
responseTo?: string
signal?: AbortSignal
sources: Set<MessageEventSource> | MessageEventSource
Expand Down Expand Up @@ -176,7 +178,7 @@ export const createRequestMachine = <
},
delays: {
initialTimeout: 0,
responseTimeout: RESPONSE_TIMEOUT,
responseTimeout: ({context}) => context.responseTimeout ?? RESPONSE_TIMEOUT_DEFAULT,
},
}).createMachine({
/** @xstate-layout N4IgpgJg5mDOIC5QAoC2BDAxgCwJYDswBKAOlwgBswBiAD1gBd0GwT0AzFgJ2QNwdzoKAFVyowAewCuDItTRY8hUuSoBtAAwBdRKAAOE2P1wT8ukLUQBGAEwBWEgBYAnK+eOAzB7sB2DzY8rABoQAE9rDQc3V0cNTw8fAA4NHwBfVJCFHAJiElgwfAgCKGpNHSQQAyMBU3NLBDsrDxI7DTaAjQA2OOcNDxDwhHsNJx9Ou0TOq2cJxP9HdMyMbOU8gqL8ErUrcv1DY1qK+sbm1vaPLp6+gcRnGydo9wDGycWQLKVc9AB3dGNN6jiWCwdAwMrmKoHMxHRCJRKOEiJHwuZKBZwXKzBMKIGyYkhtAkXOweTqOHw2RJvD45Ug-P4CAH0JgsNicMA8LhwAz4fKicTSWTyZafWm-f5QcEVSE1aGgepwhFIlF9aYYrGDC4+JzEppjGzOUkeGbpDIgfASCBwczU5QQ-YyuqIAC0nRuCBd+IJXu9KSpwppZEoYDt1RMsosiEcNjdVjiJEeGisiSTHkcVgWpptuXyhWKIahjqGzi1BqRJINnVcdkcbuTLS9VYC8ISfsUAbp4vzDphCHJIyjBvJNlxNmRNexQ3sJGH43GPj8jWJrZWuXYfyoEC7YcLsbrgRsjkcvkmdgNbopVhIPhVfnsh8ClMz-tWsCkmEwcHgUvt257u8v+6Hse4xnhOdZnImVidPqCRNB4JqpEAA */
Expand All @@ -191,6 +193,7 @@ export const createRequestMachine = <
parentRef: input.parentRef,
resolvable: input.resolvable,
response: null,
responseTimeout: input.responseTimeout,
responseTo: input.responseTo,
signal: input.signal,
sources: input.sources instanceof Set ? input.sources : new Set([input.sources]),
Expand Down
1 change: 1 addition & 0 deletions packages/comlink/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ export interface RequestData<S extends Message> {
type: MessageType
resolvable?: PromiseWithResolvers<S['response']>
options?: {
responseTimeout?: number
signal?: AbortSignal
suppressWarnings?: boolean
}
Expand Down

0 comments on commit dd7f166

Please sign in to comment.