Skip to content

Commit

Permalink
Make the migration script more robust (#675)
Browse files Browse the repository at this point in the history
* Make sure MongoDB -> SQL migration can run with the standard executable

* Make the migration script more robust

So wlk.yt migrates correctly.

* Migrate active playlist

* Batch media inserts to somewhat improve performance

It would be better if we could use a prepared statement here.

* semicolon
  • Loading branch information
goto-bus-stop authored Nov 28, 2024
1 parent 9a88ab0 commit 888ef62
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 28 deletions.
6 changes: 6 additions & 0 deletions bin/u-wave-core.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ const envSchema = {
type: 'number',
default: 6042,
},
MONGODB_URL: {
type: 'string',
format: 'uri',
},
REDIS_URL: {
type: 'string',
format: 'uri',
Expand Down Expand Up @@ -108,6 +112,8 @@ const uw = uwave({
redis: config.REDIS_URL,
sqlite: config.SQLITE_PATH,
secret,
// This property is untyped, it is propagated to the also-untyped MongoDB -> SQL migration
mongo: config.MONGODB_URL,
});

uw.on('redisError', (err) => {
Expand Down
112 changes: 84 additions & 28 deletions src/migrations/003-populate-sql.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,42 @@ const userSchema = new mongoose.Schema({
minimize: false,
});

async function* asyncChunks(iter, chunkSize) {
let chunk = [];
for await (const element of iter) {
chunk.push(element);
if (chunk.length >= chunkSize) {
yield chunk;
chunk = [];
}
}
if (chunk.length > 0) {
yield chunk;
}
}

function zip(a, b) {
const iterA = a[Symbol.iterator]();
const iterB = b[Symbol.iterator]();
const iter = {
next() {
const itemA = iterA.next();
const itemB = iterB.next();

if (itemA.done !== itemB.done) {
throw new Error('zip: iterators have different lengths');
}

return {
value: [itemA.value, itemB.value],
done: itemA.done,
};
},
};
iter[Symbol.iterator] = () => iter;
return iter;
}

/**
* @param {import('umzug').MigrationParams<import('../Uwave').default>} params
*/
Expand Down Expand Up @@ -427,7 +463,13 @@ async function up({ context: uw }) {
const motd = await uw.redis.get('motd');

/** @type {Map<string, string>} */
const idMap = new Map();
const mediaIDs = new Map();
/** @type {Map<string, string>} */
const userIDs = new Map();
/** @type {Map<string, string>} */
const playlistIDs = new Map();
/** @type {Map<string, string>} */
const playlistItemIDs = new Map();

await db.transaction().execute(async (tx) => {
for await (const config of models.Config.find().lean()) {
Expand All @@ -443,27 +485,29 @@ async function up({ context: uw }) {
.execute();
}

for await (const media of models.Media.find().lean()) {
const id = randomUUID();
await tx.insertInto('media')
.values({
id,
for await (const medias of asyncChunks(models.Media.find().lean(), 50)) {
const rows = await tx.insertInto('media')
.values(medias.map((media) => ({
id: randomUUID(),
sourceType: media.sourceType,
sourceID: media.sourceID,
sourceData: jsonb(media.sourceData),
artist: media.artist,
title: media.title,
title: media.title ?? '',
duration: media.duration,
thumbnail: media.thumbnail,
createdAt: media.createdAt.toISOString(),
updatedAt: media.updatedAt.toISOString(),
})
createdAt: (media.createdAt ?? media.updatedAt ?? new Date()).toISOString(),
updatedAt: (media.updatedAt ?? new Date()).toISOString(),
})))
.onConflict((conflict) => conflict.columns(['sourceType', 'sourceID']).doUpdateSet({
updatedAt: (eb) => eb.ref('excluded.updatedAt'),
}))
.returning('id')
.execute();

idMap.set(media._id.toString(), id);
for (const [media, row] of zip(medias, rows)) {
mediaIDs.set(media._id.toString(), row.id);
}
}

const roles = await models.AclRole.find().lean();
Expand Down Expand Up @@ -491,15 +535,15 @@ async function up({ context: uw }) {

for await (const user of models.User.find().lean()) {
const userID = randomUUID();
idMap.set(user._id.toString(), userID);
userIDs.set(user._id.toString(), userID);

await tx.insertInto('users')
.values({
id: userID,
username: user.username,
slug: user.slug,
createdAt: user.createdAt.toISOString(),
updatedAt: user.updatedAt.toISOString(),
updatedAt: (user.updatedAt ?? user.createdAt).toISOString(),
})
.execute();

Expand All @@ -511,35 +555,38 @@ async function up({ context: uw }) {

for await (const playlist of models.Playlist.where('author', user._id).lean()) {
const playlistID = randomUUID();
idMap.set(playlist._id.toString(), playlistID);
playlistIDs.set(playlist._id.toString(), playlistID);

await tx.insertInto('playlists')
.values({
id: playlistID,
name: playlist.name,
userID,
createdAt: playlist.createdAt.toISOString(),
updatedAt: playlist.updatedAt.toISOString(),
// Old objects use the `.created` property
createdAt: (playlist.createdAt ?? playlist.created).toISOString(),
updatedAt: (playlist.updatedAt ?? playlist.created).toISOString(),
})
.execute();

const items = [];
for (const itemMongoID of playlist.media) {
const itemID = randomUUID();
idMap.set(itemMongoID.toString(), itemID);
playlistItemIDs.set(itemMongoID.toString(), itemID);

const item = await models.PlaylistItem.findById(itemMongoID).lean();
const mediaID = mediaIDs.get(item.media.toString());

await tx.insertInto('playlistItems')
.values({
id: itemID,
playlistID,
mediaID: idMap.get(item.media.toString()),
mediaID,
artist: item.artist,
title: item.title,
start: item.start,
end: item.end,
createdAt: item.createdAt.toISOString(),
updatedAt: item.updatedAt.toISOString(),
end: item.end ?? 0, // Not ideal, but what can we do
createdAt: (item.createdAt ?? item.updatedAt ?? new Date()).toISOString(),
updatedAt: (item.updatedAt ?? new Date()).toISOString(),
})
.execute();

Expand All @@ -551,10 +598,20 @@ async function up({ context: uw }) {
.set({ items: jsonb(items) })
.execute();
}

if (user.activePlaylist != null) {
const activePlaylistID = playlistIDs.get(user.activePlaylist.toString());
if (activePlaylistID != null) {
await tx.updateTable('users')
.where('id', '=', userID)
.set({ activePlaylistID })
.execute();
}
}
}

for await (const entry of models.Authentication.find().lean()) {
const userID = idMap.get(entry.user.toString());
const userID = userIDs.get(entry.user.toString());
if (userID == null) {
throw new Error('Migration failure: unknown user ID');
}
Expand All @@ -576,9 +633,8 @@ async function up({ context: uw }) {

for await (const entry of models.HistoryEntry.find().lean()) {
const entryID = randomUUID();
idMap.set(entry._id.toString(), entryID);
const userID = idMap.get(entry.user.toString());
const mediaID = idMap.get(entry.media.media.toString());
const userID = userIDs.get(entry.user.toString());
const mediaID = mediaIDs.get(entry.media.media.toString());
await tx.insertInto('historyEntries')
.values({
id: entryID,
Expand All @@ -597,14 +653,14 @@ async function up({ context: uw }) {
for (const id of entry.upvotes) {
feedback.set(id.toString(), {
historyEntryID: entryID,
userID: idMap.get(id.toString()),
userID: userIDs.get(id.toString()),
vote: 1,
});
}
for (const id of entry.downvotes) {
feedback.set(id.toString(), {
historyEntryID: entryID,
userID: idMap.get(id.toString()),
userID: userIDs.get(id.toString()),
vote: -1,
});
}
Expand All @@ -615,7 +671,7 @@ async function up({ context: uw }) {
} else {
feedback.set(id.toString(), {
historyEntryID: entryID,
userID: idMap.get(id.toString()),
userID: userIDs.get(id.toString()),
favorite: 1,
});
}
Expand Down

0 comments on commit 888ef62

Please sign in to comment.