From bf4bfa2ac801f6cc3a646dd9f6964e9f9a015bff Mon Sep 17 00:00:00 2001 From: Grant Forrest Date: Thu, 20 Jun 2024 09:59:48 -0400 Subject: [PATCH] refactor migration code --- packages/store/src/client/Client.ts | 14 +- packages/store/src/client/ClientDescriptor.ts | 10 +- packages/store/src/entities/EntityStore.ts | 10 +- packages/store/src/idb.ts | 21 +- packages/store/src/migration/db.ts | 82 +- packages/store/src/migration/engine.ts | 248 ++++++ packages/store/src/migration/migrations.ts | 347 ++++++++ packages/store/src/migration/openDatabase.ts | 749 ------------------ .../store/src/migration/openQueryDatabase.ts | 63 ++ .../store/src/migration/openWIPDatabase.ts | 97 +++ packages/store/src/migration/types.ts | 4 + .../store/src/queries/QueryableStorage.ts | 2 +- 12 files changed, 860 insertions(+), 787 deletions(-) create mode 100644 packages/store/src/migration/engine.ts create mode 100644 packages/store/src/migration/migrations.ts delete mode 100644 packages/store/src/migration/openDatabase.ts create mode 100644 packages/store/src/migration/openQueryDatabase.ts create mode 100644 packages/store/src/migration/openWIPDatabase.ts create mode 100644 packages/store/src/migration/types.ts diff --git a/packages/store/src/client/Client.ts b/packages/store/src/client/Client.ts index 6bb9431a..3000d821 100644 --- a/packages/store/src/client/Client.ts +++ b/packages/store/src/client/Client.ts @@ -1,27 +1,25 @@ import { - assert, debounce, DocumentBaseline, EventSubscriber, Migration, Operation, - SchemaCollection, } from '@verdant-web/common'; import { Context } from '../context.js'; import { DocumentManager } from '../DocumentManager.js'; +import { EntityStore } from '../entities/EntityStore.js'; import { FileManager, FileManagerConfig } from '../files/FileManager.js'; +import { ReturnedFileData } from '../files/FileStorage.js'; import { closeDatabase, deleteAllDatabases, getSizeOfObjectStore, } from '../idb.js'; import { ExportData, Metadata } from '../metadata/Metadata.js'; -import { openDocumentDatabase } from '../migration/openDatabase.js'; -import { EntityStore } from '../entities/EntityStore.js'; -import { NoSync, ServerSync, ServerSyncOptions, Sync } from '../sync/Sync.js'; +import { openQueryDatabase } from '../migration/openQueryDatabase.js'; import { CollectionQueries } from '../queries/CollectionQueries.js'; import { QueryCache } from '../queries/QueryCache.js'; -import { ReturnedFileData } from '../files/FileStorage.js'; +import { NoSync, ServerSync, ServerSyncOptions, Sync } from '../sync/Sync.js'; interface ClientConfig { syncConfig?: ServerSyncOptions; @@ -382,7 +380,7 @@ export class Client extends EventSubscriber<{ } // now open the document DB empty at the specified version // and initialize it from the meta DB - this.context.documentDb = await openDocumentDatabase({ + this.context.documentDb = await openQueryDatabase({ meta: this.meta, migrations: this.config.migrations, context: this.context, @@ -402,7 +400,7 @@ export class Client extends EventSubscriber<{ this.context.log('Migrating up to latest schema...'); // put the schema back this.context.schema = currentSchema; - this.context.documentDb = await openDocumentDatabase({ + this.context.documentDb = await openQueryDatabase({ meta: this.meta, migrations: this.config.migrations, context: this.context, diff --git a/packages/store/src/client/ClientDescriptor.ts b/packages/store/src/client/ClientDescriptor.ts index 4fb6ab2e..d835c846 100644 --- a/packages/store/src/client/ClientDescriptor.ts +++ b/packages/store/src/client/ClientDescriptor.ts @@ -12,10 +12,7 @@ import { openMetadataDatabase, openWIPMetadataDatabase, } from '../metadata/openMetadataDatabase.js'; -import { - openDocumentDatabase, - openWIPDocumentDatabase, -} from '../migration/openDatabase.js'; +import { openWIPDatabase } from '../migration/openWIPDatabase.js'; import { ServerSyncOptions } from '../sync/Sync.js'; import { UndoHistory } from '../UndoHistory.js'; import { Client } from './Client.js'; @@ -26,6 +23,7 @@ import { } from '../idb.js'; import { FakeWeakRef } from '../FakeWeakRef.js'; import { METADATA_VERSION_KEY } from './constants.js'; +import { openQueryDatabase } from '../migration/openQueryDatabase.js'; export interface ClientDescriptorOptions { /** The schema used to create this client */ @@ -193,7 +191,7 @@ export class ClientDescriptor< getNow: () => meta.now, }); - const documentDb = await openDocumentDatabase({ + const documentDb = await openQueryDatabase({ context: contextWithNow, version: init.schema.version, meta, @@ -261,7 +259,7 @@ export class ClientDescriptor< // verify schema integrity await meta.updateSchema(init.schema, init.overrideSchemaConflict); - const documentDb = await openWIPDocumentDatabase({ + const documentDb = await openWIPDatabase({ context: contextWithNow, version: init.schema.version, meta, diff --git a/packages/store/src/entities/EntityStore.ts b/packages/store/src/entities/EntityStore.ts index cd931440..cd4807b5 100644 --- a/packages/store/src/entities/EntityStore.ts +++ b/packages/store/src/entities/EntityStore.ts @@ -42,12 +42,12 @@ export type EntityStoreEvents = { resetAll: WeakEvent; }; -type IncomingData = { +export interface IncomingData { operations?: Operation[]; baselines?: DocumentBaseline[]; reset?: boolean; isLocal?: boolean; -}; +} export class EntityStore extends Disposable { private ctx; @@ -137,6 +137,12 @@ export class EntityStore extends Disposable { await this.processData(data); }; + empty = async () => { + await this.queryableStorage.reset(); + this.events.resetAll.invoke(this); + this.cache.clear(); + }; + private resetData = async () => { if (this.disposed) { this.ctx.log('warn', 'EntityStore is disposed, not resetting local data'); diff --git a/packages/store/src/idb.ts b/packages/store/src/idb.ts index 60f2999e..2d5bad08 100644 --- a/packages/store/src/idb.ts +++ b/packages/store/src/idb.ts @@ -1,5 +1,8 @@ import { roughSizeOfObject } from '@verdant-web/common'; +export const globalIDB = + typeof window !== 'undefined' ? window.indexedDB : (undefined as any); + export function isAbortError(err: unknown) { return err instanceof Error && err.name === 'AbortError'; } @@ -109,7 +112,7 @@ export async function closeDatabase(db: IDBDatabase) { export async function deleteAllDatabases( namespace: string, - indexedDB: IDBFactory = window.indexedDB, + indexedDB: IDBFactory = globalIDB, ) { const req1 = indexedDB.deleteDatabase([namespace, 'meta'].join('_')); const req2 = indexedDB.deleteDatabase([namespace, 'collections'].join('_')); @@ -163,3 +166,19 @@ export function createAbortableTransaction( } return tx; } + +/** + * Empties all data in a database without changing + * its structure. + */ +export function emptyDatabase(db: IDBDatabase) { + const storeNames = Array.from(db.objectStoreNames); + const tx = db.transaction(storeNames, 'readwrite'); + for (const storeName of storeNames) { + tx.objectStore(storeName).clear(); + } + return new Promise((resolve, reject) => { + tx.oncomplete = () => resolve(); + tx.onerror = () => reject(tx.error); + }); +} diff --git a/packages/store/src/migration/db.ts b/packages/store/src/migration/db.ts index 343cd735..266a7b52 100644 --- a/packages/store/src/migration/db.ts +++ b/packages/store/src/migration/db.ts @@ -1,3 +1,6 @@ +import { closeDatabase, globalIDB, storeRequestPromise } from '../idb.js'; +import { OpenDocumentDbContext } from './types.js'; + export async function getDatabaseVersion( indexedDB: IDBFactory, namespace: string, @@ -42,12 +45,6 @@ export async function getDatabaseVersion( return currentVersion; } -export async function closeDatabase(db: IDBDatabase) { - db.close(); - // FIXME: this isn't right!!!! - await new Promise((resolve) => resolve()); -} - /** * Upgrades the database to the given version, using the given upgrader function. */ @@ -61,8 +58,11 @@ export async function upgradeDatabase( event: IDBVersionChangeEvent, ) => void, log?: (...args: any[]) => void, -): Promise { - function openAndUpgrade(resolve: () => void, reject: (err: Error) => void) { +): Promise { + function openAndUpgrade( + resolve: (db: IDBDatabase) => void, + reject: (err: Error) => void, + ) { const request = indexedDb.open( [namespace, 'collections'].join('_'), version, @@ -74,9 +74,8 @@ export async function upgradeDatabase( wasUpgraded = true; }; request.onsuccess = (event) => { - request.result.close(); if (wasUpgraded) { - resolve(); + resolve(request.result); } else { reject( new Error( @@ -95,7 +94,7 @@ export async function upgradeDatabase( // }, 200); }; } - return new Promise(openAndUpgrade); + return new Promise(openAndUpgrade); } export async function acquireLock( @@ -110,15 +109,20 @@ export async function acquireLock( } } -export async function openDatabase( - indexedDb: IDBFactory, - namespace: string, - version: number, - log?: (...args: any[]) => void, -): Promise { - log?.('debug', 'Opening database', namespace, 'at version', version); +export async function openDatabase({ + indexedDB = globalIDB, + namespace, + version, + context, +}: { + indexedDB?: IDBFactory; + namespace: string; + version: number; + context: OpenDocumentDbContext; +}): Promise { + context.log('debug', 'Opening database', namespace, 'at version', version); const db = await new Promise((resolve, reject) => { - const request = indexedDb.open( + const request = indexedDB.open( [namespace, 'collections'].join('_'), version, ); @@ -126,7 +130,7 @@ export async function openDatabase( const transaction = request.transaction!; transaction.abort(); - log?.( + context.log( 'error', 'Database upgrade needed, but not expected', 'Expected', @@ -158,3 +162,41 @@ export async function openDatabase( return db; } + +export async function copyAll( + sourceDatabase: IDBDatabase, + targetDatabase: IDBDatabase, +) { + // DOMStringList... doesn't have iterable... why + const sourceStoreNames = new Array(); + for (let i = 0; i < sourceDatabase.objectStoreNames.length; i++) { + sourceStoreNames.push(sourceDatabase.objectStoreNames[i]); + } + + const copyFromTransaction = sourceDatabase.transaction( + sourceStoreNames, + 'readonly', + ); + const copyFromStores = sourceStoreNames.map((name) => + copyFromTransaction.objectStore(name), + ); + const allObjects = await Promise.all( + copyFromStores.map((store) => storeRequestPromise(store.getAll())), + ); + + const copyToTransaction = targetDatabase.transaction( + sourceStoreNames, + 'readwrite', + ); + const copyToStores = sourceStoreNames.map((name) => + copyToTransaction.objectStore(name), + ); + + for (let i = 0; i < copyToStores.length; i++) { + await Promise.all( + allObjects[i].map((obj) => { + return storeRequestPromise(copyToStores[i].put(obj)); + }), + ); + } +} diff --git a/packages/store/src/migration/engine.ts b/packages/store/src/migration/engine.ts new file mode 100644 index 00000000..02ffa879 --- /dev/null +++ b/packages/store/src/migration/engine.ts @@ -0,0 +1,248 @@ +import { + CollectionFilter, + Migration, + MigrationEngine, + ObjectIdentifier, + addFieldDefaults, + assert, + assignOidsToAllSubObjects, + cloneDeep, + createOid, + diffToPatches, + getOid, + initialToPatches, + removeOidPropertiesFromAllSubObjects, +} from '@verdant-web/common'; +import { Context } from '../context.js'; +import { Metadata } from '../metadata/Metadata.js'; +import { findAllOids, findOneOid } from '../queries/dbQueries.js'; +import { OpenDocumentDbContext } from './types.js'; + +function getMigrationMutations({ + migration, + meta, + getMigrationNow, + newOids, +}: { + migration: Migration; + newOids: string[]; + getMigrationNow: () => string; + meta: Metadata; +}) { + return migration.allCollections.reduce((acc, collectionName) => { + acc[collectionName] = { + put: async (doc: any) => { + // add defaults + addFieldDefaults(migration.newSchema.collections[collectionName], doc); + const primaryKey = + doc[migration.newSchema.collections[collectionName].primaryKey]; + const oid = createOid(collectionName, primaryKey); + newOids.push(oid); + await meta.insertLocalOperations( + initialToPatches(doc, oid, getMigrationNow), + ); + return doc; + }, + delete: async (id: string) => { + const rootOid = createOid(collectionName, id); + const allOids = await meta.getAllDocumentRelatedOids(rootOid); + return meta.insertLocalOperations( + allOids.map((oid) => ({ + oid, + timestamp: getMigrationNow(), + data: { op: 'delete' }, + })), + ); + }, + }; + return acc; + }, {} as any); +} + +function getMigrationQueries({ + migration, + context, + meta, +}: { + migration: Migration; + context: Context; + meta: Metadata; +}) { + return migration.oldCollections.reduce((acc, collectionName) => { + acc[collectionName] = { + get: async (id: string) => { + const oid = createOid(collectionName, id); + const doc = await meta.getDocumentSnapshot(oid, { + // only get the snapshot up to the previous version (newer operations may have synced) + to: meta.time.now(migration.oldSchema.version), + }); + return doc; + }, + findOne: async (filter: CollectionFilter) => { + const oid = await findOneOid({ + collection: collectionName, + index: filter, + context, + }); + if (!oid) return null; + const doc = await meta.getDocumentSnapshot(oid, { + // only get the snapshot up to the previous version (newer operations may have synced) + to: meta.time.now(migration.oldSchema.version), + }); + return doc; + }, + findAll: async (filter: CollectionFilter) => { + const oids = await findAllOids({ + collection: collectionName, + index: filter, + context, + }); + const docs = await Promise.all( + oids.map((oid) => + meta.getDocumentSnapshot(oid, { + // only get the snapshot up to the previous version (newer operations may have synced) + to: meta.time.now(migration.oldSchema.version), + }), + ), + ); + return docs; + }, + }; + return acc; + }, {} as any); +} + +export function getMigrationEngine({ + meta, + migration, + context, +}: { + log?: (...args: any[]) => void; + migration: Migration; + meta: Metadata; + context: Context; +}): MigrationEngine { + function getMigrationNow() { + return meta.time.zero(migration.version); + } + + const newOids = new Array(); + + const queries = getMigrationQueries({ + migration, + context, + meta, + }); + const mutations = getMigrationMutations({ + migration, + getMigrationNow, + newOids, + meta, + }); + const deleteCollection = async (collection: string) => { + const allOids = await meta.getAllCollectionRelatedOids(collection); + return meta.insertLocalOperations( + allOids.map((oid) => ({ + oid, + timestamp: getMigrationNow(), + data: { op: 'delete' }, + })), + ); + }; + const awaitables = new Array>(); + const engine: MigrationEngine = { + log: context.log, + newOids, + deleteCollection, + migrate: async (collection, strategy) => { + const docs = await queries[collection].findAll(); + + await Promise.all( + docs.filter(Boolean).map(async (doc: any) => { + const rootOid = getOid(doc); + assert( + !!rootOid, + `Document is missing an OID: ${JSON.stringify(doc)}`, + ); + const original = cloneDeep(doc); + // @ts-ignore - excessive type resolution + const newValue = await strategy(doc); + if (newValue) { + // the migration has altered the shape of our document. we need + // to create the operation from the diff and write it to meta as + // a migration patch + removeOidPropertiesFromAllSubObjects(original); + removeOidPropertiesFromAllSubObjects(newValue); + assignOidsToAllSubObjects(newValue); + const patches = diffToPatches( + original, + newValue, + getMigrationNow, + undefined, + [], + { + mergeUnknownObjects: true, + }, + ); + if (patches.length > 0) { + await meta.insertLocalOperations(patches); + } + } + }), + ); + }, + queries, + mutations, + awaitables, + }; + return engine; +} + +export function getInitialMigrationEngine({ + meta, + migration, + context, +}: { + context: OpenDocumentDbContext; + migration: Migration; + meta: Metadata; +}): MigrationEngine { + function getMigrationNow() { + return meta.time.zero(migration.version); + } + + const newOids = new Array(); + + const queries = new Proxy({} as any, { + get() { + throw new Error( + 'Queries are not available in initial migrations; there is no database yet!', + ); + }, + }) as any; + + const mutations = getMigrationMutations({ + migration, + getMigrationNow, + newOids, + meta, + }); + const engine: MigrationEngine = { + log: context.log, + newOids, + deleteCollection: () => { + throw new Error( + 'Calling deleteCollection() in initial migrations is not supported! Use initial migrations to seed initial data using mutations.', + ); + }, + migrate: () => { + throw new Error( + 'Calling migrate() in initial migrations is not supported! Use initial migrations to seed initial data using mutations.', + ); + }, + queries, + mutations, + awaitables: [], + }; + return engine; +} diff --git a/packages/store/src/migration/migrations.ts b/packages/store/src/migration/migrations.ts new file mode 100644 index 00000000..606ed71b --- /dev/null +++ b/packages/store/src/migration/migrations.ts @@ -0,0 +1,347 @@ +import { + Migration, + MigrationEngine, + createOid, + decomposeOid, + getIndexValues, + getOidRoot, +} from '@verdant-web/common'; +import { Metadata } from '../metadata/Metadata.js'; +import { ClientOperation } from '../metadata/OperationsStore.js'; +import { acquireLock, openDatabase, upgradeDatabase } from './db.js'; +import { getInitialMigrationEngine, getMigrationEngine } from './engine.js'; +import { OpenDocumentDbContext } from './types.js'; +import { closeDatabase } from '../idb.js'; + +const globalIDB = + typeof window !== 'undefined' ? window.indexedDB : (undefined as any); + +export async function runMigrations({ + context, + toRun, + meta, + indexedDB = globalIDB, + namespace = context.namespace, +}: { + context: OpenDocumentDbContext; + toRun: Migration[]; + meta: Metadata; + indexedDB?: IDBFactory; + /** This namespace value controls where the database being migrated is. */ + namespace?: string; +}) { + await acquireLock(namespace, async () => { + // now the fun part + for (const migration of toRun) { + // special case: if this is the version 1 migration, we have no pre-existing database + // to use for the migration. + let engine: MigrationEngine; + // migrations from 0 (i.e. initial migrations) don't attempt to open an existing db + if (migration.oldSchema.version === 0) { + engine = getInitialMigrationEngine({ + meta, + migration, + context, + }); + await migration.migrate(engine); + } else { + // open the database with the current (old) version for this migration. this should + // align with the database's current version. + const originalDatabase = await openDatabase({ + indexedDB, + namespace, + version: migration.oldSchema.version, + context, + }); + + // this will only write to our metadata store via operations! + engine = getMigrationEngine({ + meta, + migration, + context: { + ...context, + documentDb: originalDatabase, + }, + }); + try { + await migration.migrate(engine); + // wait on any out-of-band async operations to complete + await Promise.all(engine.awaitables); + } catch (err) { + context.log( + 'critical', + `Migration failed (${migration.oldSchema.version} -> ${migration.newSchema.version})`, + err, + ); + if (err instanceof Error) { + throw err; + } else { + throw new Error('Unknown error during migration'); + } + } + + // now we have to open the database again with the next version and + // make the appropriate schema changes during the upgrade. + await closeDatabase(originalDatabase); + } + + context.log( + 'debug', + 'Upgrading database', + namespace, + 'to version', + migration.newSchema.version, + ); + const upgradedDatabase = await applySchemaToDatabase({ + migration, + indexedDB, + namespace, + context, + }); + + /** + * In cases where operations from the future have been + * received by this client, we may have created entire + * documents in metadata which were not written to storage + * because all of their operations were in the future ( + * i.e. in the next version). We have to find those documents + * and also write their snapshots to storage, because they + * won't be present in storage already to 'refresh,' so + * if we don't analyze metadata for 'future' operations like + * this, we won't know they exist. + * + * This led to behavior where the metadata would be properly + * synced, but after upgrading the app and migrating, items + * would be missing from findAll and findOne queries. + */ + const docsWithUnappliedMigrations = await getDocsWithUnappliedMigrations({ + meta, + currentVersion: migration.oldSchema.version, + newVersion: migration.newSchema.version, + }); + + // once the schema is ready, we can write back the migrated documents + + for (const collection of migration.allCollections) { + // first step is to read in all the keys we need to rewrite + const documentReadTransaction = upgradedDatabase.transaction( + collection, + 'readwrite', + ); + const readStore = documentReadTransaction.objectStore(collection); + const keys = await getAllKeys(readStore); + // map the keys to OIDs + const oids = keys.map((key) => createOid(collection, `${key}`)); + oids.push( + ...engine.newOids.filter((oid) => { + return decomposeOid(oid).collection === collection; + }), + ...docsWithUnappliedMigrations.filter((oid) => { + return decomposeOid(oid).collection === collection; + }), + ); + + // add 'touch' operations to all root OIDs of all documents. + // this marks documents which have undergone a migration + // so that other clients know when they're working + // with unmigrated data - by seeing that there are no + // existing operations or baselines with a timestamp + // that matches the current version. + // UPDATE: no longer necessary now that pruning is a thing. + // await Promise.all( + // oids.map((oid) => + // meta.insertLocalOperations([ + // { + // oid, + // timestamp: meta.time.zero(migration.version), + // data: { op: 'touch' }, + // }, + // ]), + // ), + // ); + + const snapshots = await Promise.all( + oids.map(async (oid) => { + try { + const snap = await meta.getDocumentSnapshot(oid); + return [oid, snap]; + } catch (e) { + // this seems to happen with baselines/ops which are not fully + // cleaned up after deletion? + context.log( + 'error', + 'Could not regenerate snapshot during migration for oid', + oid, + 'this document will not be preserved', + e, + ); + return null; + } + }), + ); + + const views = snapshots + .filter((s): s is [string, any] => !!s) + .map(([oid, snapshot]) => { + if (!snapshot) return [oid, undefined]; + const view = getIndexValues( + migration.newSchema.collections[collection], + snapshot, + ); + return [oid, view]; + }); + + // now we can write the documents back + const documentWriteTransaction = upgradedDatabase.transaction( + collection, + 'readwrite', + ); + const writeStore = documentWriteTransaction.objectStore(collection); + await Promise.all( + views.map(([oid, view]) => { + if (view) { + return putView(writeStore, view); + } else { + const { id } = decomposeOid(oid); + return deleteView(writeStore, id); + } + }), + ); + } + + await closeDatabase(upgradedDatabase); + + context.log('debug', `Migration of ${namespace} complete.`); + context.log(` + ⬆️ v${migration.newSchema.version} Migration complete. Here's the rundown: + - Added collections: ${migration.addedCollections.join(', ')} + - Removed collections: ${migration.removedCollections.join(', ')} + - Changed collections: ${migration.changedCollections.join(', ')} + - New indexes: ${Object.keys(migration.addedIndexes) + .map((col) => + migration.addedIndexes[col].map((i) => `${col}.${i.name}`), + ) + .flatMap((i) => i) + .join(', ')} + - Removed indexes: ${Object.keys(migration.removedIndexes) + .map((col) => + migration.removedIndexes[col].map((i) => `${col}.${i.name}`), + ) + .flatMap((i) => i) + .join(', ')} + `); + } + }); +} + +async function getAllKeys(store: IDBObjectStore) { + return new Promise((resolve, reject) => { + const request = store.getAllKeys(); + request.onsuccess = (event) => { + resolve(request.result); + }; + request.onerror = (event) => { + reject(request.error); + }; + }); +} + +async function deleteView(store: IDBObjectStore, id: string) { + const request = store.delete(id); + return new Promise((resolve, reject) => { + request.onsuccess = (event) => { + resolve(); + }; + request.onerror = (event) => { + reject(request.error); + }; + }); +} + +async function putView(store: IDBObjectStore, view: any) { + const request = store.put(view); + return new Promise((resolve, reject) => { + request.onsuccess = (event) => { + resolve(); + }; + request.onerror = (event) => { + reject(request.error); + }; + }); +} + +/** + * Gets a list of root OIDs for all documents which had operations stored already + * that were not applied to their queryable snapshots because they were in the + * future. These documents need to be refreshed in storage. + */ +async function getDocsWithUnappliedMigrations({ + meta, + currentVersion, + newVersion: _, +}: { + currentVersion: number; + newVersion: number; + meta: Metadata; +}) { + // scan for all operations in metadata after the current version. + // this could be more efficient if also filtering below or equal newVersion but + // that seems so unlikely in practice... + const unappliedOperations: ClientOperation[] = []; + await meta.operations.iterateOverAllOperations( + (op) => unappliedOperations.push(op), + { + from: meta.time.zero(currentVersion + 1), + }, + ); + return Array.from( + new Set(unappliedOperations.map((op) => getOidRoot(op.oid))), + ); +} + +export function applySchemaToDatabase({ + migration, + indexedDB = globalIDB, + namespace, + context, +}: { + migration: Migration; + indexedDB?: IDBFactory; + namespace: string; + context: OpenDocumentDbContext; +}) { + return upgradeDatabase( + indexedDB, + namespace, + migration.newSchema.version, + (transaction, db) => { + for (const newCollection of migration.addedCollections) { + db.createObjectStore(newCollection, { + keyPath: migration.newSchema.collections[newCollection].primaryKey, + autoIncrement: false, + }); + } + + for (const collection of migration.allCollections) { + const store = transaction.objectStore(collection); + // apply new indexes + for (const newIndex of migration.addedIndexes[collection] || []) { + store.createIndex(newIndex.name, newIndex.name, { + multiEntry: newIndex.multiEntry, + }); + } + // remove old indexes + for (const oldIndex of migration.removedIndexes[collection] || []) { + store.deleteIndex(oldIndex.name); + } + } + for (const removedCollection of migration.removedCollections) { + // !! can't delete the store, because old operations that relate to + // this store may still exist in history. instead, we can clear it out + // and leave it in place + transaction.objectStore(removedCollection).clear(); + } + }, + context.log, + ); +} diff --git a/packages/store/src/migration/openDatabase.ts b/packages/store/src/migration/openDatabase.ts deleted file mode 100644 index 361d5281..00000000 --- a/packages/store/src/migration/openDatabase.ts +++ /dev/null @@ -1,749 +0,0 @@ -import { - CollectionFilter, - Migration, - MigrationEngine, - ObjectIdentifier, - StorageSchema, - addFieldDefaults, - assert, - assignIndexValues, - assignOidPropertiesToAllSubObjects, - assignOidsToAllSubObjects, - cloneDeep, - createOid, - decomposeOid, - diffToPatches, - getIndexValues, - getOid, - getOidRoot, - hasOid, - initialToPatches, - removeOidPropertiesFromAllSubObjects, -} from '@verdant-web/common'; -import { Context } from '../context.js'; -import { storeRequestPromise } from '../idb.js'; -import { Metadata } from '../metadata/Metadata.js'; -import { ClientOperation } from '../metadata/OperationsStore.js'; -import { findAllOids, findOneOid } from '../queries/dbQueries.js'; -import { - acquireLock, - closeDatabase, - getDatabaseVersion, - openDatabase, - upgradeDatabase, -} from './db.js'; -import { getMigrationPath } from './paths.js'; - -const globalIDB = - typeof window !== 'undefined' ? window.indexedDB : (undefined as any); - -type OpenDocumentDbContext = Omit; - -export async function openDocumentDatabase({ - version, - indexedDB = globalIDB, - migrations, - meta, - context, -}: { - version: number; - migrations: Migration[]; - indexedDB?: IDBFactory; - meta: Metadata; - context: OpenDocumentDbContext; -}) { - if (context.schema.wip) { - throw new Error('Cannot open a production client with a WIP schema!'); - } - - const currentVersion = await getDatabaseVersion( - indexedDB, - context.namespace, - version, - context.log, - ); - - context.log( - 'debug', - 'Current database version:', - currentVersion, - 'target version:', - version, - ); - - const toRun = getMigrationPath({ - currentVersion, - targetVersion: version, - migrations, - }); - - if (toRun.length > 0) { - context.log( - 'debug', - 'Migrations to run:', - toRun.map((m) => m.version), - ); - await runMigrations({ context, toRun, meta, indexedDB }); - } - return openDatabase(indexedDB, context.namespace, version, context.log); -} - -export async function openWIPDocumentDatabase({ - version, - indexedDB = globalIDB, - migrations, - meta, - context, - wipNamespace, -}: { - version: number; - migrations: Migration[]; - indexedDB?: IDBFactory; - meta: Metadata; - context: OpenDocumentDbContext; - wipNamespace: string; -}) { - context.log('debug', 'Opening WIP database', wipNamespace); - const currentWIPVersion = await getDatabaseVersion( - indexedDB, - wipNamespace, - version, - context.log, - ); - - if (currentWIPVersion === version) { - context.log('info', `WIP schema is up-to-date; not refreshing database`); - } else { - context.log('info', `WIP schema is out-of-date; refreshing database`); - - // first we need to copy the data from the production database to the WIP database - // at the current (non-wip) version. - - const initialToRun = getMigrationPath({ - currentVersion: currentWIPVersion, - targetVersion: version - 1, - migrations, - }); - - if (initialToRun.length > 0) { - await runMigrations({ - context, - toRun: initialToRun, - meta, - indexedDB, - namespace: wipNamespace, - }); - - // now, we copy the data from the main database. - const mainDatabase = await openDatabase( - indexedDB, - context.namespace, - version - 1, - context.log, - ); - - const wipDatabase = await openDatabase( - indexedDB, - wipNamespace, - version - 1, - context.log, - ); - - // DOMStringList... doesn't have iterable... why - const mainDatabaseStoreNames = new Array(); - for (let i = 0; i < mainDatabase.objectStoreNames.length; i++) { - mainDatabaseStoreNames.push(mainDatabase.objectStoreNames[i]); - } - - const copyFromTransaction = mainDatabase.transaction( - mainDatabaseStoreNames, - 'readonly', - ); - const copyFromStores = mainDatabaseStoreNames.map((name) => - copyFromTransaction.objectStore(name), - ); - const allObjects = await Promise.all( - copyFromStores.map((store) => storeRequestPromise(store.getAll())), - ); - - const copyToTransaction = wipDatabase.transaction( - mainDatabaseStoreNames, - 'readwrite', - ); - const copyToStores = mainDatabaseStoreNames.map((name) => - copyToTransaction.objectStore(name), - ); - - for (let i = 0; i < copyToStores.length; i++) { - await Promise.all( - allObjects[i].map((obj) => { - return storeRequestPromise(copyToStores[i].put(obj)); - }), - ); - } - } - - const toRun = getMigrationPath({ - currentVersion: version - 1, - targetVersion: version, - migrations, - }); - - if (toRun.length > 0) { - await runMigrations({ - context, - toRun, - meta, - indexedDB, - namespace: wipNamespace, - }); - } - } - - return openDatabase(indexedDB, wipNamespace, version, context.log); -} - -async function runMigrations({ - context, - toRun, - meta, - indexedDB = globalIDB, - namespace = context.namespace, -}: { - context: OpenDocumentDbContext; - toRun: Migration[]; - meta: Metadata; - indexedDB?: IDBFactory; - namespace?: string; -}) { - await acquireLock(namespace, async () => { - // now the fun part - for (const migration of toRun) { - // special case: if this is the version 1 migration, we have no pre-existing database - // to use for the migration. - let engine: MigrationEngine; - // migrations from 0 (i.e. initial migrations) don't attempt to open an existing db - if (migration.oldSchema.version === 0) { - engine = getInitialMigrationEngine({ - meta, - migration, - context, - }); - await migration.migrate(engine); - } else { - // open the database with the current (old) version for this migration. this should - // align with the database's current version. - const originalDatabase = await openDatabase( - indexedDB, - namespace, - migration.oldSchema.version, - context.log, - ); - - // this will only write to our metadata store via operations! - engine = getMigrationEngine({ - meta, - migration, - context: { - ...context, - documentDb: originalDatabase, - }, - }); - try { - await migration.migrate(engine); - // wait on any out-of-band async operations to complete - await Promise.all(engine.awaitables); - } catch (err) { - context.log( - 'critical', - `Migration failed (${migration.oldSchema.version} -> ${migration.newSchema.version})`, - err, - ); - if (err instanceof Error) { - throw err; - } else { - throw new Error('Unknown error during migration'); - } - } - - // now we have to open the database again with the next version and - // make the appropriate schema changes during the upgrade. - await closeDatabase(originalDatabase); - } - - context.log( - 'debug', - 'Upgrading database', - namespace, - 'to version', - migration.newSchema.version, - ); - await upgradeDatabase( - indexedDB, - namespace, - migration.newSchema.version, - (transaction, db) => { - for (const newCollection of migration.addedCollections) { - db.createObjectStore(newCollection, { - keyPath: - migration.newSchema.collections[newCollection].primaryKey, - autoIncrement: false, - }); - } - - for (const collection of migration.allCollections) { - const store = transaction.objectStore(collection); - // apply new indexes - for (const newIndex of migration.addedIndexes[collection] || []) { - store.createIndex(newIndex.name, newIndex.name, { - multiEntry: newIndex.multiEntry, - }); - } - // remove old indexes - for (const oldIndex of migration.removedIndexes[collection] || []) { - store.deleteIndex(oldIndex.name); - } - } - for (const removedCollection of migration.removedCollections) { - // !! can't delete the store, because old operations that relate to - // this store may still exist in history. instead, we can clear it out - // and leave it in place - transaction.objectStore(removedCollection).clear(); - } - }, - context.log, - ); - - /** - * In cases where operations from the future have been - * received by this client, we may have created entire - * documents in metadata which were not written to storage - * because all of their operations were in the future ( - * i.e. in the next version). We have to find those documents - * and also write their snapshots to storage, because they - * won't be present in storage already to 'refresh,' so - * if we don't analyze metadata for 'future' operations like - * this, we won't know they exist. - * - * This led to behavior where the metadata would be properly - * synced, but after upgrading the app and migrating, items - * would be missing from findAll and findOne queries. - */ - const docsWithUnappliedMigrations = await getDocsWithUnappliedMigrations({ - meta, - currentVersion: migration.oldSchema.version, - newVersion: migration.newSchema.version, - }); - - // once the schema is ready, we can write back the migrated documents - const upgradedDatabase = await openDatabase( - indexedDB, - namespace, - migration.newSchema.version, - context.log, - ); - for (const collection of migration.allCollections) { - // first step is to read in all the keys we need to rewrite - const documentReadTransaction = upgradedDatabase.transaction( - collection, - 'readwrite', - ); - const readStore = documentReadTransaction.objectStore(collection); - const keys = await getAllKeys(readStore); - // map the keys to OIDs - const oids = keys.map((key) => createOid(collection, `${key}`)); - oids.push( - ...engine.newOids.filter((oid) => { - return decomposeOid(oid).collection === collection; - }), - ...docsWithUnappliedMigrations.filter((oid) => { - return decomposeOid(oid).collection === collection; - }), - ); - - // add 'touch' operations to all root OIDs of all documents. - // this marks documents which have undergone a migration - // so that other clients know when they're working - // with unmigrated data - by seeing that there are no - // existing operations or baselines with a timestamp - // that matches the current version. - // UPDATE: no longer necessary now that pruning is a thing. - // await Promise.all( - // oids.map((oid) => - // meta.insertLocalOperations([ - // { - // oid, - // timestamp: meta.time.zero(migration.version), - // data: { op: 'touch' }, - // }, - // ]), - // ), - // ); - - const snapshots = await Promise.all( - oids.map(async (oid) => { - try { - const snap = await meta.getDocumentSnapshot(oid); - return [oid, snap]; - } catch (e) { - // this seems to happen with baselines/ops which are not fully - // cleaned up after deletion? - context.log( - 'error', - 'Could not regenerate snapshot during migration for oid', - oid, - 'this document will not be preserved', - e, - ); - return null; - } - }), - ); - - const views = snapshots - .filter((s): s is [string, any] => !!s) - .map(([oid, snapshot]) => { - if (!snapshot) return [oid, undefined]; - const view = getIndexValues( - migration.newSchema.collections[collection], - snapshot, - ); - return [oid, view]; - }); - - // now we can write the documents back - const documentWriteTransaction = upgradedDatabase.transaction( - collection, - 'readwrite', - ); - const writeStore = documentWriteTransaction.objectStore(collection); - await Promise.all( - views.map(([oid, view]) => { - if (view) { - return putView(writeStore, view); - } else { - const { id } = decomposeOid(oid); - return deleteView(writeStore, id); - } - }), - ); - } - - await closeDatabase(upgradedDatabase); - - context.log('debug', `Migration of ${namespace} complete.`); - context.log(` - ⬆️ v${migration.newSchema.version} Migration complete. Here's the rundown: - - Added collections: ${migration.addedCollections.join(', ')} - - Removed collections: ${migration.removedCollections.join(', ')} - - Changed collections: ${migration.changedCollections.join(', ')} - - New indexes: ${Object.keys(migration.addedIndexes) - .map((col) => - migration.addedIndexes[col].map((i) => `${col}.${i.name}`), - ) - .flatMap((i) => i) - .join(', ')} - - Removed indexes: ${Object.keys(migration.removedIndexes) - .map((col) => - migration.removedIndexes[col].map((i) => `${col}.${i.name}`), - ) - .flatMap((i) => i) - .join(', ')} - `); - } - }); -} - -function getMigrationMutations({ - migration, - meta, - getMigrationNow, - newOids, -}: { - migration: Migration; - newOids: string[]; - getMigrationNow: () => string; - meta: Metadata; -}) { - return migration.allCollections.reduce((acc, collectionName) => { - acc[collectionName] = { - put: async (doc: any) => { - // add defaults - addFieldDefaults(migration.newSchema.collections[collectionName], doc); - const primaryKey = - doc[migration.newSchema.collections[collectionName].primaryKey]; - const oid = createOid(collectionName, primaryKey); - newOids.push(oid); - await meta.insertLocalOperations( - initialToPatches(doc, oid, getMigrationNow), - ); - return doc; - }, - delete: async (id: string) => { - const rootOid = createOid(collectionName, id); - const allOids = await meta.getAllDocumentRelatedOids(rootOid); - return meta.insertLocalOperations( - allOids.map((oid) => ({ - oid, - timestamp: getMigrationNow(), - data: { op: 'delete' }, - })), - ); - }, - }; - return acc; - }, {} as any); -} - -function getMigrationQueries({ - migration, - context, - meta, -}: { - migration: Migration; - context: Context; - meta: Metadata; -}) { - return migration.oldCollections.reduce((acc, collectionName) => { - acc[collectionName] = { - get: async (id: string) => { - const oid = createOid(collectionName, id); - const doc = await meta.getDocumentSnapshot(oid, { - // only get the snapshot up to the previous version (newer operations may have synced) - to: meta.time.now(migration.oldSchema.version), - }); - return doc; - }, - findOne: async (filter: CollectionFilter) => { - const oid = await findOneOid({ - collection: collectionName, - index: filter, - context, - }); - if (!oid) return null; - const doc = await meta.getDocumentSnapshot(oid, { - // only get the snapshot up to the previous version (newer operations may have synced) - to: meta.time.now(migration.oldSchema.version), - }); - return doc; - }, - findAll: async (filter: CollectionFilter) => { - const oids = await findAllOids({ - collection: collectionName, - index: filter, - context, - }); - const docs = await Promise.all( - oids.map((oid) => - meta.getDocumentSnapshot(oid, { - // only get the snapshot up to the previous version (newer operations may have synced) - to: meta.time.now(migration.oldSchema.version), - }), - ), - ); - return docs; - }, - }; - return acc; - }, {} as any); -} - -function getMigrationEngine({ - meta, - migration, - context, -}: { - log?: (...args: any[]) => void; - migration: Migration; - meta: Metadata; - context: Context; -}): MigrationEngine { - function getMigrationNow() { - return meta.time.zero(migration.version); - } - - const newOids = new Array(); - - const queries = getMigrationQueries({ - migration, - context, - meta, - }); - const mutations = getMigrationMutations({ - migration, - getMigrationNow, - newOids, - meta, - }); - const deleteCollection = async (collection: string) => { - const allOids = await meta.getAllCollectionRelatedOids(collection); - return meta.insertLocalOperations( - allOids.map((oid) => ({ - oid, - timestamp: getMigrationNow(), - data: { op: 'delete' }, - })), - ); - }; - const awaitables = new Array>(); - const engine: MigrationEngine = { - log: context.log, - newOids, - deleteCollection, - migrate: async (collection, strategy) => { - const docs = await queries[collection].findAll(); - - await Promise.all( - docs.filter(Boolean).map(async (doc: any) => { - const rootOid = getOid(doc); - assert( - !!rootOid, - `Document is missing an OID: ${JSON.stringify(doc)}`, - ); - const original = cloneDeep(doc); - // @ts-ignore - excessive type resolution - const newValue = await strategy(doc); - if (newValue) { - // the migration has altered the shape of our document. we need - // to create the operation from the diff and write it to meta as - // a migration patch - removeOidPropertiesFromAllSubObjects(original); - removeOidPropertiesFromAllSubObjects(newValue); - assignOidsToAllSubObjects(newValue); - const patches = diffToPatches( - original, - newValue, - getMigrationNow, - undefined, - [], - { - mergeUnknownObjects: true, - }, - ); - if (patches.length > 0) { - await meta.insertLocalOperations(patches); - } - } - }), - ); - }, - queries, - mutations, - awaitables, - }; - return engine; -} - -function getInitialMigrationEngine({ - meta, - migration, - context, -}: { - context: OpenDocumentDbContext; - migration: Migration; - meta: Metadata; -}): MigrationEngine { - function getMigrationNow() { - return meta.time.zero(migration.version); - } - - const newOids = new Array(); - - const queries = new Proxy({} as any, { - get() { - throw new Error( - 'Queries are not available in initial migrations; there is no database yet!', - ); - }, - }) as any; - - const mutations = getMigrationMutations({ - migration, - getMigrationNow, - newOids, - meta, - }); - const engine: MigrationEngine = { - log: context.log, - newOids, - deleteCollection: () => { - throw new Error( - 'Calling deleteCollection() in initial migrations is not supported! Use initial migrations to seed initial data using mutations.', - ); - }, - migrate: () => { - throw new Error( - 'Calling migrate() in initial migrations is not supported! Use initial migrations to seed initial data using mutations.', - ); - }, - queries, - mutations, - awaitables: [], - }; - return engine; -} - -async function getAllKeys(store: IDBObjectStore) { - return new Promise((resolve, reject) => { - const request = store.getAllKeys(); - request.onsuccess = (event) => { - resolve(request.result); - }; - request.onerror = (event) => { - reject(request.error); - }; - }); -} - -async function deleteView(store: IDBObjectStore, id: string) { - const request = store.delete(id); - return new Promise((resolve, reject) => { - request.onsuccess = (event) => { - resolve(); - }; - request.onerror = (event) => { - reject(request.error); - }; - }); -} - -async function putView(store: IDBObjectStore, view: any) { - const request = store.put(view); - return new Promise((resolve, reject) => { - request.onsuccess = (event) => { - resolve(); - }; - request.onerror = (event) => { - reject(request.error); - }; - }); -} - -/** - * Gets a list of root OIDs for all documents which had operations stored already - * that were not applied to their queryable snapshots because they were in the - * future. These documents need to be refreshed in storage. - */ -async function getDocsWithUnappliedMigrations({ - meta, - currentVersion, - newVersion: _, -}: { - currentVersion: number; - newVersion: number; - meta: Metadata; -}) { - // scan for all operations in metadata after the current version. - // this could be more efficient if also filtering below or equal newVersion but - // that seems so unlikely in practice... - const unappliedOperations: ClientOperation[] = []; - await meta.operations.iterateOverAllOperations( - (op) => unappliedOperations.push(op), - { - from: meta.time.zero(currentVersion + 1), - }, - ); - return Array.from( - new Set(unappliedOperations.map((op) => getOidRoot(op.oid))), - ); -} diff --git a/packages/store/src/migration/openQueryDatabase.ts b/packages/store/src/migration/openQueryDatabase.ts new file mode 100644 index 00000000..6d9694c6 --- /dev/null +++ b/packages/store/src/migration/openQueryDatabase.ts @@ -0,0 +1,63 @@ +import { Migration } from '@verdant-web/common'; +import { Metadata } from '../metadata/Metadata.js'; +import { getDatabaseVersion, openDatabase } from './db.js'; +import { runMigrations } from './migrations.js'; +import { getMigrationPath } from './paths.js'; +import { OpenDocumentDbContext } from './types.js'; + +const globalIDB = + typeof window !== 'undefined' ? window.indexedDB : (undefined as any); + +export async function openQueryDatabase({ + version, + indexedDB = globalIDB, + migrations, + meta, + context, +}: { + version: number; + migrations: Migration[]; + indexedDB?: IDBFactory; + meta: Metadata; + context: OpenDocumentDbContext; +}) { + if (context.schema.wip) { + throw new Error('Cannot open a production client with a WIP schema!'); + } + + const currentVersion = await getDatabaseVersion( + indexedDB, + context.namespace, + version, + context.log, + ); + + context.log( + 'debug', + 'Current database version:', + currentVersion, + 'target version:', + version, + ); + + const toRun = getMigrationPath({ + currentVersion, + targetVersion: version, + migrations, + }); + + if (toRun.length > 0) { + context.log( + 'debug', + 'Migrations to run:', + toRun.map((m) => m.version), + ); + await runMigrations({ context, toRun, meta, indexedDB }); + } + return openDatabase({ + indexedDB, + namespace: context.namespace, + version, + context, + }); +} diff --git a/packages/store/src/migration/openWIPDatabase.ts b/packages/store/src/migration/openWIPDatabase.ts new file mode 100644 index 00000000..07b1c6d9 --- /dev/null +++ b/packages/store/src/migration/openWIPDatabase.ts @@ -0,0 +1,97 @@ +import { Migration } from '@verdant-web/common'; +import { Metadata } from '../metadata/Metadata.js'; +import { copyAll, getDatabaseVersion, openDatabase } from './db.js'; +import { runMigrations } from './migrations.js'; +import { getMigrationPath } from './paths.js'; +import { OpenDocumentDbContext } from './types.js'; + +const globalIDB = + typeof window !== 'undefined' ? window.indexedDB : (undefined as any); + +export async function openWIPDatabase({ + version, + indexedDB = globalIDB, + migrations, + meta, + context, + wipNamespace, +}: { + version: number; + migrations: Migration[]; + indexedDB?: IDBFactory; + meta: Metadata; + context: OpenDocumentDbContext; + wipNamespace: string; +}) { + context.log('debug', 'Opening WIP database', wipNamespace); + const currentWIPVersion = await getDatabaseVersion( + indexedDB, + wipNamespace, + version, + context.log, + ); + + if (currentWIPVersion === version) { + context.log('info', `WIP schema is up-to-date; not refreshing database`); + } else { + context.log('info', `WIP schema is out-of-date; refreshing database`); + + // first we need to copy the data from the production database to the WIP database + // at the current (non-wip) version. + + const initialToRun = getMigrationPath({ + currentVersion: currentWIPVersion, + targetVersion: version - 1, + migrations, + }); + + if (initialToRun.length > 0) { + await runMigrations({ + context, + toRun: initialToRun, + meta, + indexedDB, + namespace: wipNamespace, + }); + + // now, we copy the data from the main database. + const mainDatabase = await openDatabase({ + indexedDB, + namespace: context.namespace, + version: version - 1, + context, + }); + + const wipDatabase = await openDatabase({ + indexedDB, + namespace: wipNamespace, + version: version - 1, + context, + }); + await copyAll(mainDatabase, wipDatabase); + } + + const toRun = getMigrationPath({ + currentVersion: version - 1, + targetVersion: version, + migrations, + }); + + if (toRun.length > 0) { + await runMigrations({ + context, + toRun, + meta, + indexedDB, + namespace: wipNamespace, + }); + } + } + + return openDatabase({ + indexedDB, + namespace: wipNamespace, + version, + context, + }); +} diff --git a/packages/store/src/migration/types.ts b/packages/store/src/migration/types.ts new file mode 100644 index 00000000..4d53c382 --- /dev/null +++ b/packages/store/src/migration/types.ts @@ -0,0 +1,4 @@ +import { Context } from '../context.js'; + +/** During migration, only a partial context is available */ +export type OpenDocumentDbContext = Omit; diff --git a/packages/store/src/queries/QueryableStorage.ts b/packages/store/src/queries/QueryableStorage.ts index eabc438a..ae6a8e2d 100644 --- a/packages/store/src/queries/QueryableStorage.ts +++ b/packages/store/src/queries/QueryableStorage.ts @@ -25,7 +25,7 @@ export class QueryableStorage extends IDBService { */ reset = async () => { const allCollections = Object.keys(this.ctx.schema.collections); - const tx = this.ctx.documentDb.transaction(allCollections, 'readwrite'); + const tx = this.createTransaction(allCollections, { mode: 'readwrite' }); await Promise.all( allCollections.map((collection) => { const store = tx.objectStore(collection);