diff --git a/js/modules/messages_data_migrator.js b/js/modules/messages_data_migrator.js index e3ce66c89d..1ba211e0fd 100644 --- a/js/modules/messages_data_migrator.js +++ b/js/modules/messages_data_migrator.js @@ -1,12 +1,20 @@ +/* eslint-env browser */ + const isFunction = require('lodash/isFunction'); const isNumber = require('lodash/isNumber'); const isObject = require('lodash/isObject'); const isString = require('lodash/isString'); +const last = require('lodash/last'); const Message = require('./types/message'); const { deferredToPromise } = require('./deferred_to_promise'); -const Migrations0DatabaseWithAttachmentData = - require('./migrations/migrations_0_database_with_attachment_data'); + + +const DATABASE_NAME = 'signal'; +// Last version with attachment data stored in database: +const EXPECTED_DATABASE_VERSION = 17; +const MESSAGES_STORE_NAME = 'messages'; +const ITEMS_STORE_NAME = 'items'; exports.processNext = async ({ BackboneMessage, @@ -44,7 +52,7 @@ exports.processNext = async ({ const upgradeDuration = Date.now() - startUpgradeTime; const startSaveTime = Date.now(); - const saveMessage = _saveMessage({ BackboneMessage }); + const saveMessage = _saveMessageBackbone({ BackboneMessage }); await Promise.all(upgradedMessages.map(saveMessage)); const saveDuration = Date.now() - startSaveTime; @@ -78,20 +86,91 @@ exports.processAll = async ({ throw new TypeError('"upgradeMessageSchema" is required'); } - const lastIndex = null; - const unprocessedMessages = - await _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex({ - Backbone, - count: 10, - lastIndex, + const connection = await openDatabase(DATABASE_NAME, EXPECTED_DATABASE_VERSION); + const isComplete = await isMigrationComplete(connection); + console.log('Is attachment migration complete?', isComplete); + if (isComplete) { + return; + } + + const migrationStartTime = Date.now(); + let unprocessedMessages = []; + do { + // eslint-disable-next-line no-await-in-loop + const lastProcessedIndex = (await getLastProcessedIndex(connection)) || null; + + const fetchUnprocessedMessagesStartTime = Date.now(); + unprocessedMessages = + // eslint-disable-next-line no-await-in-loop + await _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex({ + connection, + count: 10, + lastIndex: lastProcessedIndex, + }); + const fetchDuration = Date.now() - fetchUnprocessedMessagesStartTime; + const numUnprocessedMessages = unprocessedMessages.length; + + const upgradeStartTime = Date.now(); + const upgradedMessages = + // eslint-disable-next-line no-await-in-loop + await Promise.all(unprocessedMessages.map(upgradeMessageSchema)); + const upgradeDuration = Date.now() - upgradeStartTime; + + const saveMessagesStartTime = Date.now(); + const transaction = connection.transaction(MESSAGES_STORE_NAME, 'readwrite'); + const transactionCompletion = completeTransaction(transaction); + // eslint-disable-next-line no-await-in-loop + await Promise.all(upgradedMessages.map(_saveMessage({ transaction }))); + // eslint-disable-next-line no-await-in-loop + await transactionCompletion; + const saveDuration = Date.now() - saveMessagesStartTime; + + // TODO: Confirm transaction is complete + + const lastMessage = last(upgradedMessages); + const newLastProcessedIndex = lastMessage ? lastMessage.id : null; + if (newLastProcessedIndex) { + // eslint-disable-next-line no-await-in-loop + await setLastProcessedIndex(connection, newLastProcessedIndex); + } + + console.log('Upgrade message schema on startup:', { + lastProcessedIndex, + numUnprocessedMessages, + fetchDuration, + saveDuration, + upgradeDuration, + newLastProcessedIndex, }); + } while (unprocessedMessages.length > 0); + + await markMigrationComplete(connection); + connection.close(); + + const totalDuration = Date.now() - migrationStartTime; + console.log('Attachment migration complete:', { totalDuration }); }; -const _saveMessage = ({ BackboneMessage } = {}) => (message) => { +const _saveMessageBackbone = ({ BackboneMessage } = {}) => (message) => { const backboneMessage = new BackboneMessage(message); return deferredToPromise(backboneMessage.save()); }; +const _saveMessage = ({ transaction } = {}) => (message) => { + if (!isObject(transaction)) { + throw new TypeError('"transaction" is required'); + } + + const messagesStore = transaction.objectStore(MESSAGES_STORE_NAME); + const request = messagesStore.put(message, message.id); + return new Promise((resolve, reject) => { + request.onsuccess = () => + resolve(); + request.onerror = event => + reject(event.target.error); + }); +}; + const _fetchMessagesRequiringSchemaUpgrade = async ({ BackboneMessageCollection, count } = {}) => { if (!isFunction(BackboneMessageCollection)) { @@ -119,11 +198,10 @@ const _fetchMessagesRequiringSchemaUpgrade = })); }; -const MAX_MESSAGE_KEY = 'ffffffff-ffff-ffff-ffff-ffffffffffff'; const _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex = - async ({ Backbone, count, lastIndex } = {}) => { - if (!isObject(Backbone)) { - throw new TypeError('"Backbone" is required'); + ({ connection, count, lastIndex } = {}) => { + if (!isObject(connection)) { + throw new TypeError('"connection" is required'); } if (!isNumber(count)) { @@ -134,17 +212,112 @@ const _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex = throw new TypeError('"lastIndex" must be a string'); } - const storeName = 'messages'; - const collection = - Migrations0DatabaseWithAttachmentData.createCollection({ Backbone, storeName }); + const hasLastIndex = Boolean(lastIndex); - const range = lastIndex ? [lastIndex, MAX_MESSAGE_KEY] : null; - await deferredToPromise(collection.fetch({ - limit: count, - range, - })); + const transaction = connection.transaction(MESSAGES_STORE_NAME, 'readonly'); + const messagesStore = transaction.objectStore(MESSAGES_STORE_NAME); - const models = collection.models || []; - const messages = models.map(model => model.toJSON()); - return messages; + const excludeLowerBound = true; + const query = hasLastIndex + ? IDBKeyRange.lowerBound(lastIndex, excludeLowerBound) + : undefined; + const request = messagesStore.getAll(query, count); + return new Promise((resolve, reject) => { + request.onsuccess = event => + resolve(event.target.result); + request.onerror = event => + reject(event.target.error); + }); }; + +const openDatabase = (name, version) => { + const request = window.indexedDB.open(name, version); + return new Promise((resolve, reject) => { + request.onblocked = () => + reject(new Error('Database blocked')); + + request.onupgradeneeded = event => + reject(new Error('Unexpected database upgrade required:' + + `oldVersion: ${event.oldVersion}, newVersion: ${event.newVersion}`)); + + request.onerror = event => + reject(event.target.error); + + request.onsuccess = (event) => { + const connection = event.target.result; + resolve(connection); + }; + }); +}; + +const LAST_PROCESSED_INDEX_KEY = 'attachmentMigration_lastProcessedIndex'; +const IS_MIGRATION_COMPLETE_KEY = 'attachmentMigration_isComplete'; + +const getLastProcessedIndex = connection => + getItem(connection, LAST_PROCESSED_INDEX_KEY); + +const setLastProcessedIndex = (connection, value) => + setItem(connection, LAST_PROCESSED_INDEX_KEY, value); + +const isMigrationComplete = async (connection) => { + const value = await getItem(connection, IS_MIGRATION_COMPLETE_KEY); + return Boolean(value); +}; + +const markMigrationComplete = connection => + setItem(connection, IS_MIGRATION_COMPLETE_KEY, true); + +const getItem = (connection, key) => { + if (!isObject(connection)) { + throw new TypeError('"connection" is required'); + } + + if (!isString(key)) { + throw new TypeError('"key" must be a string'); + } + + const transaction = connection.transaction(ITEMS_STORE_NAME, 'readonly'); + const itemsStore = transaction.objectStore(ITEMS_STORE_NAME); + const request = itemsStore.get(key); + return new Promise((resolve, reject) => { + request.onerror = event => + reject(event.target.error); + + request.onsuccess = event => + resolve(event.target.result); + }); +}; + +const setItem = (connection, key, value) => { + if (!isObject(connection)) { + throw new TypeError('"connection" is required'); + } + + if (!isString(key)) { + throw new TypeError('"key" must be a string'); + } + + const transaction = connection.transaction(ITEMS_STORE_NAME, 'readwrite'); + const itemsStore = transaction.objectStore(ITEMS_STORE_NAME); + const request = itemsStore.put(value, key); + return new Promise((resolve, reject) => { + request.onerror = event => + reject(event.target.error); + + request.onsuccess = () => + resolve(); + }); +}; + +const completeTransaction = transaction => + new Promise((resolve, reject) => { + // eslint-disable-next-line no-param-reassign + transaction.onabort = event => + reject(event.target.error); + // eslint-disable-next-line no-param-reassign + transaction.onerror = event => + reject(event.target.error); + // eslint-disable-next-line no-param-reassign + transaction.oncomplete = () => + resolve(); + });