Skip to content

Commit

Permalink
SyncEngineLevel respects most recent startSync interval. (#887)
Browse files Browse the repository at this point in the history
This PR slightly modifies the way sync works.

- startSync now does a sync operation immediately as well as setting an interval, this way you don't have to wait the entire interval.
- calling subsequent startSync will:
  - replace the interval
  - if there is a sync already running it will not perform a sync immediately, if no sync is running it will also perform an immediate sync.
- calling a sync performs a one-shot sync.
- calling a sync while a sync is already in progress will throw.
  • Loading branch information
LiranCohen authored Sep 5, 2024
1 parent 4527444 commit da3630a
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 37 deletions.
71 changes: 45 additions & 26 deletions packages/agent/src/sync-engine-level.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ export class SyncEngineLevel implements SyncEngine {

private _db: AbstractLevel<string | Buffer | Uint8Array>;
private _syncIntervalId?: ReturnType<typeof setInterval>;
private _syncLock = false;
private _ulidFactory: ULIDFactory;

constructor({ agent, dataPath, db }: SyncEngineLevelParams) {
Expand Down Expand Up @@ -264,45 +265,54 @@ export class SyncEngineLevel implements SyncEngine {
}

public async sync(direction?: 'push' | 'pull'): Promise<void> {
if (this._syncIntervalId) {
throw new Error('SyncEngineLevel: Cannot call sync while a sync interval is active. Call `stopSync()` first.');
if (this._syncLock) {
throw new Error('SyncEngineLevel: Sync operation is already in progress.');
}

if (!direction || direction === 'push') {
await this.push();
}
if (!direction || direction === 'pull') {
await this.pull();
this._syncLock = true;
try {
if (!direction || direction === 'push') {
await this.push();
}
if (!direction || direction === 'pull') {
await this.pull();
}
} finally {
this._syncLock = false;
}
}

public startSync({ interval }: {
public async startSync({ interval }: {
interval: string
}): Promise<void> {
// Convert the interval string to milliseconds.
const intervalMilliseconds = ms(interval);

return new Promise((resolve, reject) => {

const intervalSync = async () => {
if (this._syncIntervalId) {
clearInterval(this._syncIntervalId);
}
const intervalSync = async () => {
if (this._syncLock) {
return;
}

try {
await this.push();
await this.pull();
} catch (error: any) {
this.stopSync();
reject(error);
}
clearInterval(this._syncIntervalId);
this._syncIntervalId = undefined;
await this.sync();

// then we start sync again
if (!this._syncIntervalId) {
this._syncIntervalId = setInterval(intervalSync, intervalMilliseconds);
};
}
};

this._syncIntervalId = setInterval(intervalSync, intervalMilliseconds);
});
if (this._syncIntervalId) {
clearInterval(this._syncIntervalId);
}

// Set up a new interval.
this._syncIntervalId = setInterval(intervalSync, intervalMilliseconds);

// initiate an immediate sync
if (!this._syncLock) {
await this.sync();
}
}

public stopSync(): void {
Expand Down Expand Up @@ -526,7 +536,16 @@ export class SyncEngineLevel implements SyncEngine {

// iterate over all registered identities
for await (const [ did, options ] of this._db.sublevel('registeredIdentities').iterator()) {
const { protocols, delegateDid } = JSON.parse(options) as SyncIdentityOptions;

const { protocols, delegateDid } = await new Promise<SyncIdentityOptions>((resolve) => {
try {
const { protocols, delegateDid } = JSON.parse(options) as SyncIdentityOptions;
resolve({ protocols, delegateDid });
} catch(error: any) {
resolve({ protocols: [] });
}
});

// First, confirm the DID can be resolved and extract the DWN service endpoint URLs.
const dwnEndpointUrls = await getDwnServiceEndpointUrls(did, this.agent.did);
if (dwnEndpointUrls.length === 0) {
Expand Down
30 changes: 30 additions & 0 deletions packages/agent/src/types/sync.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,43 @@
import type { Web5PlatformAgent } from './agent.js';

/**
* The SyncEngine is responsible for syncing messages between the agent and the platform.
*/
export type SyncIdentityOptions = {
/**
* The delegate DID that should be used to sign the sync messages.
*/
delegateDid?: string;
/**
* The protocols that should be synced for this identity, if an empty array is provided, all messages for all protocols will be synced.
*/
protocols: string[]
}
export interface SyncEngine {
/**
* The agent that the SyncEngine is attached to.
*/
agent: Web5PlatformAgent;
/**
* Register an identity to be managed by the SyncEngine for syncing.
* The options can define specific protocols that should only be synced, or a delegate DID that should be used to sign the sync messages.
*/
registerIdentity(params: { did: string, options?: SyncIdentityOptions }): Promise<void>;
/**
* Preforms a one-shot sync operation. If no direction is provided, it will perform both push and pull.
* @param direction which direction you'd like to perform the sync operation.
*
* @throws {Error} if a sync is already in progress or the sync operation fails.
*/
sync(direction?: 'push' | 'pull'): Promise<void>;
/**
* Starts a periodic sync that runs at an interval. Subsequent calls to startSync will update the interval.
*
* @param params { interval: string } the interval at which the sync operation should be performed. ex: '30s', '1m', '10m'
*/
startSync(params: { interval: string }): Promise<void>;
/**
* Stops the periodic sync operation, will complete the current sync operation if one is already in progress.
*/
stopSync(): void;
}
134 changes: 123 additions & 11 deletions packages/agent/tests/sync-engine-level.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -465,24 +465,41 @@ describe('SyncEngineLevel', () => {
expect(pullSpy.calledOnce).to.be.true;
});

it('throws if sync is attempted while an interval sync is running', async () => {
it('throws an error if the sync is currently already running', async () => {
// Register Alice's DID to be synchronized.
await testHarness.agent.sync.registerIdentity({
did: alice.did.uri,
});

// start the sync engine with an interval of 10 seconds
syncEngine.startSync({ interval: '10s' });
const clock = sinon.useFakeTimers();
sinon.stub(syncEngine as any, 'push').resolves();
const pullSpy = sinon.stub(syncEngine as any, 'pull');
pullSpy.returns(new Promise<void>((resolve) => {
clock.setTimeout(() => {
resolve();
}, 90);
}));

// do not await
syncEngine.sync();

await clock.tickAsync(50);

// do not block for subsequent syncs
pullSpy.returns(Promise.resolve());
try {
// Execute Sync to push and pull all records from Alice's remote DWN to Alice's local DWN.
await syncEngine.sync();

expect.fail('Expected an error to be thrown');
} catch (error: any) {
// Execute Sync to push and pull all records from Alice's remote DWN to Alice's local DWN.
expect(error.message).to.equal('SyncEngineLevel: Cannot call sync while a sync interval is active. Call `stopSync()` first.');
} catch(error:any) {
expect(error.message).to.equal('SyncEngineLevel: Sync operation is already in progress.');
}

await clock.tickAsync(50);

// no error thrown
await syncEngine.sync();

clock.restore();
});
});

Expand Down Expand Up @@ -2436,8 +2453,9 @@ describe('SyncEngineLevel', () => {
pushSpy.restore();
clock.restore();

expect(pullSpy.callCount).to.equal(2, 'push');
expect(pushSpy.callCount).to.equal(2, 'pull');
// one when starting the sync, and another for each interval
expect(pullSpy.callCount).to.equal(3, 'push');
expect(pushSpy.callCount).to.equal(3, 'pull');
});

it('does not call sync() again until a sync round finishes', async () => {
Expand All @@ -2461,18 +2479,112 @@ describe('SyncEngineLevel', () => {

await clock.tickAsync(1_400); // less time than the push

// only once for when starting the sync
expect(pullSpy.callCount).to.equal(1, 'pull');
expect(pullSpy.callCount).to.equal(1, 'push');

await clock.tickAsync(600); //remaining time for a 2nd sync
await clock.tickAsync(200); //remaining time and one interval

// once when starting, and once for the interval
expect(pullSpy.callCount).to.equal(2, 'pull');
expect(pushSpy.callCount).to.equal(2, 'push');

await clock.tickAsync(500); // one more interval

// one more for the interval
expect(pullSpy.callCount).to.equal(3, 'pull');
expect(pushSpy.callCount).to.equal(3, 'push');

pullSpy.restore();
pushSpy.restore();
clock.restore();
});

it('calls sync once per interval with the latest interval timer being respected', async () => {
await testHarness.agent.sync.registerIdentity({
did: alice.did.uri,
});

const clock = sinon.useFakeTimers();

const syncSpy = sinon.stub(SyncEngineLevel.prototype as any, 'sync');
// set to be a sync time longer than the interval
syncSpy.returns(new Promise<void>((resolve) => {
clock.setTimeout(() => {
resolve();
}, 1_000);
}));

testHarness.agent.sync.startSync({ interval: '500ms' });

await clock.tickAsync(1_400); // less than the initial interval + the sync time

// once for the initial call and once for each interval call
expect(syncSpy.callCount).to.equal(2);

// set to be a short sync time
syncSpy.returns(new Promise<void>((resolve) => {
clock.setTimeout(() => {
resolve();
}, 15);
}));

testHarness.agent.sync.startSync({ interval: '300ms' });

await clock.tickAsync(301); // exactly the new interval + 1

// one for the initial 'startSync' call and one for each interval call
expect(syncSpy.callCount).to.equal(4);


await clock.tickAsync(601); // two more intervals

expect(syncSpy.callCount).to.equal(6);

syncSpy.restore();
clock.restore();
});

it('should replace the interval timer with the latest interval timer', async () => {

await testHarness.agent.sync.registerIdentity({
did: alice.did.uri,
});

const clock = sinon.useFakeTimers();

const syncSpy = sinon.stub(SyncEngineLevel.prototype as any, 'sync');
// set to be a sync time longer than the interval
syncSpy.returns(new Promise<void>((resolve) => {
clock.setTimeout(() => {
resolve();
}, 100);
}));

testHarness.agent.sync.startSync({ interval: '500ms' });

// two intervals
await clock.tickAsync(1_001);

// this should equal 3, once for the initial call and once for each interval call
expect(syncSpy.callCount).to.equal(3);

syncSpy.resetHistory();
testHarness.agent.sync.startSync({ interval: '200ms' });

await clock.tickAsync(401); // two intervals

// one for the initial 'startSync' call and one for each interval call
expect(syncSpy.callCount).to.equal(3);

await clock.tickAsync(401); // two more intervals

// one additional calls for each interval
expect(syncSpy.callCount).to.equal(5);

syncSpy.restore();
clock.restore();
});
});
});
});
5 changes: 5 additions & 0 deletions packages/api/tests/dwn-api.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ describe('DwnApi', () => {
await delegateHarness.createAgentDid();
});

after(async () => {
await delegateHarness.clearStorage();
await delegateHarness.closeStorage();
});

beforeEach(async () => {
sinon.restore();
await delegateHarness.syncStore.clear();
Expand Down
5 changes: 5 additions & 0 deletions packages/api/tests/record.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ describe('Record', () => {
await delegateHarness.createAgentDid();
});

after(async () => {
await delegateHarness.clearStorage();
await delegateHarness.closeStorage();
});

beforeEach(async () => {
sinon.restore();
await delegateHarness.syncStore.clear();
Expand Down

0 comments on commit da3630a

Please sign in to comment.