Skip to content

Commit

Permalink
fix: crud retrying merges with previous creates/updates instead of ov…
Browse files Browse the repository at this point in the history
…erwriting
  • Loading branch information
jmeistrich committed Nov 2, 2024
1 parent 33f612e commit 91b8060
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 14 deletions.
79 changes: 66 additions & 13 deletions src/sync-plugins/crud.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
ObservableEvent,
ObservableParam,
RetryOptions,
UpdateFnParams,
WaitForSetFnParams,
applyChanges,
Expand Down Expand Up @@ -129,6 +130,50 @@ function computeLastSync(data: any[], fieldUpdatedAt: string | undefined, fieldC
return newLastSync;
}

const queuedRetries = {
create: new Map<string, any>(),
update: new Map<string, any>(),
delete: new Map<string, any>(),
};
function retrySet(
params: SyncedSetParams<any>,
retry: RetryOptions | undefined,
action: 'create' | 'update' | 'delete',
itemKey: string,
itemValue: any,
actionFn: (value: any, params: SyncedSetParams<any>) => Promise<any>,
saveResult: (itemKey: string, itemValue: any, result: any, isCreate: boolean) => void,
) {
// If delete then remove from create/update, and vice versa
if (action === 'delete') {
if (queuedRetries.create.has(itemKey)) {
queuedRetries.create.delete(itemKey);
}
if (queuedRetries.update.has(itemKey)) {
queuedRetries.update.delete(itemKey);
}
} else {
if (queuedRetries.delete.has(itemKey)) {
queuedRetries.delete.delete(itemKey);
}
}

// Get the currently queued value and assigned the new changes onto it
const queuedRetry = queuedRetries[action]!.get(itemKey);
if (queuedRetry) {
itemValue = Object.assign(queuedRetry, itemValue);
}

queuedRetries[action].set(itemKey, itemValue);

return runWithRetry(params, retry, 'create_' + itemKey, () =>
actionFn!(itemValue, params).then((result) => {
queuedRetries[action]!.delete(itemKey);
return saveResult(itemKey, itemValue, result as any, true);
}),
);
}

// The get version
export function syncedCrud<TRemote extends object, TLocal = TRemote>(
props: SyncedCrudPropsSingle<TRemote, TLocal> & SyncedCrudPropsBase<TRemote, TLocal>,
Expand Down Expand Up @@ -517,10 +562,10 @@ export function syncedCrud<TRemote extends object, TLocal = TRemote, TAsOption e
await waitForSet(waitForSetParam as any, changes, itemValue, { type: 'create' });
}
const createObj = (await transformOut(itemValue as any, transform?.save)) as TRemote;
return runWithRetry(params, retry, 'create_' + itemKey, () =>
createFn!(createObj, params)
.then((result) => saveResult(itemKey, createObj, result as any, true))
.finally(() => pendingCreates.delete(itemKey)),
return retrySet(params, retry, 'create', itemKey, createObj, createFn!, saveResult).then(
() => {
pendingCreates.delete(itemKey);
},
);
}),

Expand All @@ -531,11 +576,7 @@ export function syncedCrud<TRemote extends object, TLocal = TRemote, TAsOption e
}
const changed = (await transformOut(itemValue as TLocal, transform?.save)) as TRemote;
if (Object.keys(changed).length > 0) {
return runWithRetry(params, retry, 'update_' + itemKey, () =>
updateFn!(changed, params).then(
(result) => result && saveResult(itemKey, changed, result as any, false),
),
);
return retrySet(params, retry, 'update', itemKey, changed, updateFn!, saveResult);
}
}),

Expand All @@ -554,14 +595,26 @@ export function syncedCrud<TRemote extends object, TLocal = TRemote, TAsOption e
}

if (deleteFn) {
return runWithRetry(params, retry, 'delete_' + valueId, () =>
deleteFn(valuePrevious, params),
return retrySet(
params,
retry,
'delete',
valueId,
valuePrevious,
deleteFn!,
saveResult,
);
}

if (fieldDeleted && updateFn) {
return runWithRetry(params, retry, 'delete_' + valueId, () =>
updateFn({ [fieldId]: valueId, [fieldDeleted]: true } as any, params),
return retrySet(
params,
retry,
'delete',
valueId,
{ [fieldId]: valueId, [fieldDeleted]: true } as any,
updateFn!,
saveResult,
);
}

Expand Down
12 changes: 12 additions & 0 deletions src/sync/retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ function createRetryTimeout(retryOptions: RetryOptions, retryNum: number, fn: ()

const mapRetryTimeouts = new Map<any, number>();

export function runWithRetry<T>(
state: SyncedGetSetBaseParams<any>,
retryOptions: RetryOptions | undefined,
retryId: any,
fn: (params: OnErrorRetryParams) => Promise<T>,
): Promise<T>;
export function runWithRetry<T>(
state: SyncedGetSetBaseParams<any>,
retryOptions: RetryOptions | undefined,
retryId: any,
fn: (params: OnErrorRetryParams) => T,
): T;
export function runWithRetry<T>(
state: SyncedGetSetBaseParams<any>,
retryOptions: RetryOptions | undefined,
Expand Down
69 changes: 68 additions & 1 deletion tests/keel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ describe('keel', () => {
expect(Array.from(errors)).toEqual([]);
expect(Array.from(creates)).toEqual([newItem2, newItem]);
});
test('setting error retries with onError', async () => {
test('setting error retries multiple creates', async () => {
let shouldError = true;
const errors: Set<BasicValue> = new Set();
const creates: Set<BasicValue> = new Set();
Expand Down Expand Up @@ -450,4 +450,71 @@ describe('keel', () => {
expect(Array.from(errors)).toEqual([]);
expect(Array.from(creates).sort((a, b) => a.id.localeCompare(b.id))).toEqual([newItem, newItem2]);
});
test('setting error retries updates on multiple fields', async () => {
let shouldError = true;
const errors: Set<BasicValue> = new Set();
const updates: Set<BasicValue> = new Set();
const obs$ = observable(
syncedKeel({
list: async () => fakeKeelList([{ ...ItemBasicValue(), other: 2, another: 3 }]),
update(value) {
if (shouldError) {
return { error: { message: 'test' } };
} else {
updates.add(value as any);
return {
data: value,
} as any;
}
},
retry: {
infinite: true,
delay: 1,
},
onError(error, params) {
errors.add(params.input);
},
}),
);

obs$.get();

const item$ = obs$.id1;

await promiseTimeout(1);

item$.other.set(4);

await promiseTimeout(1);

item$.another.set(5);

await promiseTimeout(1);

expect(Array.from(errors)).toEqual([
{
id: 'id1',
other: 4,
another: 5,
},
]);

errors.clear();
shouldError = false;

await promiseTimeout(5);

expect(Array.from(errors)).toEqual([]);
expect(Array.from(updates)).toEqual([
{
values: {
other: 4,
another: 5,
},
where: {
id: 'id1',
},
},
]);
});
});

0 comments on commit 91b8060

Please sign in to comment.