Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run async migrations in sequence #90

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -205,7 +205,22 @@ Migrations.add({
});
```

For Meteor 2.8+ you can pass async function directly.
Starting from Meteor 2.8+, you can use async functions directly in your migrations:

```js
Migrations.add({
version: 3,
name: 'Add belts to people wearing pants.',
up: async function () {
// Asynchronous migration code
await SomeCollection.updateAsync({ wearsPants: true }, { $set: { hasBelt: true } }, { multi: true });
},
down: async function () {
// Asynchronous rollback code
await SomeCollection.updateAsync({}, { $unset: { hasBelt: true } }, { multi: true });
},
});
```

* Note: You may want to call migration after startup in case your host (such as Heroku) limits the amount of time given for startup
``` javascript
186 changes: 115 additions & 71 deletions migrations_server.js
Original file line number Diff line number Diff line change
@@ -7,37 +7,50 @@ import { Log } from 'meteor/logging';
Adds migration capabilities. Migrations are defined like:
Migrations.add({
up: function() {}, //*required* code to run to migrate upwards
version: 1, //*required* number to identify migration order
down: function() {}, //*optional* code to run to migrate downwards
name: 'Something' //*optional* display name for the migration
name: 'Something', //*optional* display name for the migration
up: async function() { //*required* code to run to migrate upwards
// Your migration code here
// This function can be asynchronous
},
down: async function() { //*optional* code to run to migrate downwards
// Your migration rollback code here
// This function can be asynchronous
}
});
The ordering of migrations is determined by the version you set.
To run the migrations, set the MIGRATE environment variable to either
'latest' or the version number you want to migrate to. Optionally, append
',exit' if you want the migrations to exit the meteor process, e.g if you're
',exit' if you want the migrations to exit the Meteor process, e.g., if you're
migrating from a script (remember to pass the --once parameter).
e.g:
MIGRATE="latest" mrt # ensure we'll be at the latest version and run the app
MIGRATE="latest,exit" mrt --once # ensure we'll be at the latest version and exit
MIGRATE="2,exit" mrt --once # migrate to version 2 and exit
e.g.:
MIGRATE="latest" meteor # ensure we'll be at the latest version and run the app
MIGRATE="latest,exit" meteor --once # ensure we'll be at the latest version and exit
MIGRATE="2,exit" meteor --once # migrate to version 2 and exit
Note: Migrations will lock ensuring only 1 app can be migrating at once. If
Note: Migrations will lock ensuring only one app can be migrating at once. If
a migration crashes, the control record in the migrations collection will
remain locked and at the version it was at previously, however the db could
be in an inconsistent state.
remain locked and at the version it was at previously; however, the database
could be in an inconsistent state.
**Async Migrations**:
- The `up` and `down` functions can be asynchronous (return a Promise or be async functions).
- The migration runner will await these functions, ensuring that migrations run sequentially
and complete before the next one starts.
*/

// since we'll be at version 0 by default, we should have a migration set for
// it.
const DefaultMigration = { version: 0, up: function() {} };

/**
*
* @type {{_list: {up: DefaultMigration.up, version: number}[], options: {logIfLatest: boolean, log: boolean, logger: null, collectionName: string}, config: Migrations.config}}
* @type {{
* _list: {up: DefaultMigration.up, version: number}[],
* options: {logIfLatest: boolean, log: boolean, logger: null, collectionName: string},
* config: Migrations.config
* }}
*/
export const Migrations = {
_list: [DefaultMigration],
@@ -98,21 +111,21 @@ let log;
Meteor.startup(async function() {
const options = Migrations.options;

// collection holding the control record
// Collection holding the control record
Migrations._collection = new Mongo.Collection(options.collectionName);

log = createLogger('Migrations');

['info', 'warn', 'error', 'debug'].forEach(function(level) {
log[level] = (message) => log(level, message)
log[level] = (message) => log(level, message);
});

if (process.env.MIGRATE) {
try {
await Migrations.migrateTo(process.env.MIGRATE);
} catch (e) {
log.error('Failed to run migrations')
log.error(e.message || e.reason)
log.error('Failed to run migrations');
log.error(e.message || e.reason);
}
}
});
@@ -121,9 +134,9 @@ Meteor.startup(async function() {
* Add a new migration
* @param migration {Object}
* @param migration.version {Number} required
* @param migration.up {function} required migration function
* @param migration.name {String} Optional name for the migration step
* @param migration.down {function} Optional function to migrate back from this step to previous version
* @param migration.up {function} required migration function (can be async)
* @param migration.down {function} Optional function to migrate back from this step to previous version (can be async)
*/
Migrations.add = function(migration) {
if (typeof migration.up !== 'function')
@@ -139,50 +152,56 @@ Migrations.add = function(migration) {
Object.freeze(migration);

this._list.push(migration);
this._list.sort((a, b) => (a.version > b.version) ? 1 : ((b.version > a.version) ? -1 : 0));
this._list.sort((a, b) =>
a.version > b.version ? 1 : b.version > a.version ? -1 : 0,
);
};

/**
* Attempts to run the migrations using command in the form of:
* e.g 'latest', 'latest,exit', 2
* e.g., 'latest', 'latest,exit', 2
* use 'XX,rerun' to re-run the migration at that version
* @param command {string|number}
* @returns {Promise}
*/
Migrations.migrateTo = async function(command) {
if (typeof command === 'undefined' || command === '' || this._list.length === 0)
if (
typeof command === 'undefined' ||
command === '' ||
this._list.length === 0
)
throw new Error('Cannot migrate using invalid command: ' + command);

let version;
let subcommand;
if (typeof command === 'number') {
version = command;
} else {
version = command.split(',')[0]; //.trim();
subcommand = command.split(',')[1]; //.trim();
version = command.split(',')[0];
subcommand = command.split(',')[1];
}

if (version === 'latest') {
await this._migrateTo(this._list[this._list.length -1].version);
await this._migrateTo(this._list[this._list.length - 1].version);
} else {
await this._migrateTo(parseInt(version), subcommand === 'rerun');
}

// remember to run meteor with --once otherwise it will restart
// Remember to run Meteor with --once otherwise it will restart
if (subcommand === 'exit') process.exit(0);
};

/**
* Just returns the current version
* @returns {Promise<void>}
* @returns {Promise<number>}
*/
Migrations.getVersion = async function() {
const control = await this._getControl()
const control = await this._getControl();
return control.version;
};

/**
* migrates to the specific version passed in
* Migrates to the specific version passed in
* @param version {number}
* @param rerun {boolean}
* @returns {Promise<void>}
@@ -193,25 +212,31 @@ Migrations._migrateTo = async function(version, rerun) {
const control = await this._getControl(); // Side effect: upserts control document.
let currentVersion = control.version;

//Avoid unneeded locking, check if migration actually is going to run
// Avoid unneeded locking, check if migration actually is going to run
if (!rerun && currentVersion === version) {
if (Migrations.options.logIfLatest) {
log.info('Not migrating, already at version ' + version);
}
return;
}

const isLock = await lock()
if (isLock === false) {
const isLocked = await lock();
if (isLocked === false) {
log.info('Not migrating, control is locked.');
return;
}

if (rerun) {
log.info('Rerunning version ' + version);
migrate('up', this._findIndexByVersion(version));
log.info('Finished migrating.');
await unlock();
try {
await migrate('up', this._findIndexByVersion(version));
log.info('Finished migrating.');
} catch (error) {
log.error('Migration failed:', error);
throw error;
} finally {
await unlock();
}
return;
}

@@ -220,20 +245,25 @@ Migrations._migrateTo = async function(version, rerun) {

// log.info('startIdx:' + startIdx + ' endIdx:' + endIdx);
log.info(
'Migrating from version ' +
'Migrating from version ' +
this._list[startIdx].version +
' -> ' +
this._list[endIdx].version,
);

// run the actual migration
function migrate(direction, idx) {
// Run the actual migration
/**
* Runs a single migration step.
* @param direction {'up'|'down'}
* @param idx {number} Index in the migration list
*/
async function migrate(direction, idx) {
const migration = self._list[idx];

if (typeof migration[direction] !== 'function') {
unlock();
await unlock();
throw new Meteor.Error(
'Cannot migrate ' + direction + ' on version ' + migration.version,
'Cannot migrate ' + direction + ' on version ' + migration.version,
);
}

@@ -242,27 +272,27 @@ Migrations._migrateTo = async function(version, rerun) {
}

log.info(
'Running ' +
'Running ' +
direction +
'() on version ' +
migration.version +
maybeName(),
);

migration[direction](migration);
// Await the migration function to ensure it completes
await migration[direction](migration);
}

// Returns true if lock was acquired.
async function lock() {
// This is atomic. The selector ensures only one caller at a time will see
// the unlocked control, and locking occurs in the same update's modifier.
// All other simultaneous callers will get false back from the update.
return (
await self._collection.updateAsync(
const result = await self._collection.updateAsync(
{ _id: 'control', locked: false },
{ $set: { locked: true, lockedAt: new Date() } },
) === 1
);
return result === 1;
}

// Side effect: saves version.
@@ -274,52 +304,63 @@ Migrations._migrateTo = async function(version, rerun) {
await self._setControl({ locked: true, version: currentVersion });
}

if (currentVersion < version) {
for (let i = startIdx; i < endIdx; i++) {
migrate('up', i + 1);
currentVersion = self._list[i + 1].version;
await updateVersion();
}
} else {
for (let i = startIdx; i > endIdx; i--) {
migrate('down', i);
currentVersion = self._list[i - 1].version;
await updateVersion();
try {
if (currentVersion < version) {
for (let i = startIdx; i < endIdx; i++) {
await migrate('up', i + 1);
currentVersion = self._list[i + 1].version;
await updateVersion();
}
} else {
for (let i = startIdx; i > endIdx; i--) {
await migrate('down', i);
currentVersion = self._list[i - 1].version;
await updateVersion();
}
}
log.info('Finished migrating.');
} catch (error) {
log.error('Migration failed:', error);
throw error;
} finally {
await unlock();
}

await unlock();
log.info('Finished migrating.');
};

/**
* gets the current control record, optionally creating it if non-existent
* Gets the current control record, optionally creating it if non-existent
* @returns {Promise<{ version: number, locked: boolean }>}
* @private
*/
Migrations._getControl = async function() {
const control = await this._collection.findOneAsync({ _id: 'control' });

return control || await this._setControl({ version: 0, locked: false });
return control || (await this._setControl({ version: 0, locked: false }));
};

/**
* sets the control record
* Sets the control record
* @param control {Object}
* @param control.version {number}
* @param control.locked {boolean}
* @returns {Promise<*>}
* @private
*/
Migrations._setControl = async function(control) {
// be quite strict
// Be quite strict
check(control.version, Number);
check(control.locked, Boolean);

await this._collection.updateAsync(
{ _id: 'control' },
{ $set: { version: control.version, locked: control.locked } },
{ upsert: true },
{ _id: 'control' },
{
$set: {
version: control.version,
locked: control.locked,
lockedAt: new Date(),
},
},
{ upsert: true },
);

return control;
@@ -340,7 +381,7 @@ Migrations._findIndexByVersion = function(version) {
};

/**
* reset (mainly intended for tests)
* Reset (mainly intended for tests)
* @returns {Promise<number>}
* @private
*/
@@ -350,9 +391,12 @@ Migrations._reset = async function() {
};

/**
* unlock control
* Unlock control
* @returns {Promise<number>}
*/
Migrations.unlock = async function() {
return await this._collection.updateAsync({ _id: 'control' }, { $set: { locked: false } });
};
return await this._collection.updateAsync(
{ _id: 'control' },
{ $set: { locked: false } },
);
};
340 changes: 168 additions & 172 deletions migrations_tests.js

Large diffs are not rendered by default.

734 changes: 0 additions & 734 deletions npm-shrinkwrap.json

This file was deleted.

5 changes: 0 additions & 5 deletions package-lock.json
2 changes: 1 addition & 1 deletion package.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package.describe({
summary: 'Define and run db migrations.',
version: '2.0.0',
version: '2.0.1',
name: 'percolate:migrations',
git: 'https://github.com/percolatestudio/meteor-migrations.git',
});