diff --git a/ACKNOWLEDGMENTS.md b/ACKNOWLEDGMENTS.md index 74749a6448..4dc96a8594 100644 --- a/ACKNOWLEDGMENTS.md +++ b/ACKNOWLEDGMENTS.md @@ -4700,7 +4700,7 @@ Signal Desktop makes use of the following open source projects. MIT License - Copyright (c) Sindre Sorhus (sindresorhus.com) + Copyright (c) Sindre Sorhus (https://sindresorhus.com) Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: diff --git a/package.json b/package.json index 6e10fc5258..cc5d18f96f 100644 --- a/package.json +++ b/package.json @@ -187,7 +187,7 @@ "node-fetch": "2.6.7", "nop": "1.0.0", "normalize-path": "3.0.0", - "p-map": "2.1.0", + "p-map": "7.0.4", "p-queue": "6.6.2", "p-timeout": "4.1.0", "parsecurrency": "1.1.1", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index ae6f9f214d..746120ab1a 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -288,8 +288,8 @@ importers: specifier: 3.0.0 version: 3.0.0 p-map: - specifier: 2.1.0 - version: 2.1.0 + specifier: 7.0.4 + version: 7.0.4 p-queue: specifier: 6.6.2 version: 6.6.2 @@ -8482,10 +8482,6 @@ packages: resolution: {integrity: sha512-wPrq66Llhl7/4AGC6I+cqxT07LhXvWL08LNXz1fENOw0Ap4sRZZ/gZpTTJ5jpurzzzfS2W/Ge9BY3LgLjCShcw==} engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} - p-map@2.1.0: - resolution: {integrity: sha512-y3b8Kpd8OAN444hxfBbFfj1FY/RjtTd8tzYwhUqNYXx0fXx2iX4maP4Qr6qhIKbQXI02wTLAda4fYUbDagTUFw==} - engines: {node: '>=6'} - p-map@3.0.0: resolution: {integrity: sha512-d3qXVTF/s+W+CdJ5A29wywV2n8CQQYahlgz2bFiA+4eVNJbHJodPZ+/gXwPGh0bOqA+j8S+6+ckmvLGPk1QpxQ==} engines: {node: '>=8'} @@ -8494,8 +8490,8 @@ packages: resolution: {integrity: sha512-/bjOqmgETBYB5BoEeGVea8dmvHb2m9GLy1E9W43yeyfP6QQCZGFNa+XRceJEuDB6zqr+gKpIAmlLebMpykw/MQ==} engines: {node: '>=10'} - p-map@7.0.3: - resolution: {integrity: sha512-VkndIv2fIB99swvQoA65bm+fsmt6UNdGeIB0oxBs+WhAhdh08QA04JXpI7rbB9r08/nkbysKoya9rtDERYOYMA==} + p-map@7.0.4: + resolution: {integrity: sha512-tkAQEw8ysMzmkhgw8k+1U/iPhWNhykKnSk4Rd5zLoPJCuJaGRPo6YposrZgaxHKzDHdDWWZvE/Sk7hsL2X/CpQ==} engines: {node: '>=18'} p-queue@6.6.2: @@ -15968,7 +15964,7 @@ snapshots: minipass-collect: 2.0.1 minipass-flush: 1.0.5 minipass-pipeline: 1.2.4 - p-map: 7.0.3 + p-map: 7.0.4 ssri: 12.0.0 tar: 7.5.2 unique-filename: 4.0.0 @@ -20290,8 +20286,6 @@ snapshots: dependencies: p-limit: 4.0.0 - p-map@2.1.0: {} - p-map@3.0.0: dependencies: aggregate-error: 3.1.0 @@ -20300,7 +20294,7 @@ snapshots: dependencies: aggregate-error: 3.1.0 - p-map@7.0.3: {} + p-map@7.0.4: {} p-queue@6.6.2: dependencies: diff --git a/scripts/generate-acknowledgments.js b/scripts/generate-acknowledgments.js index c609cf9b67..313be6203c 100644 --- a/scripts/generate-acknowledgments.js +++ b/scripts/generate-acknowledgments.js @@ -4,7 +4,7 @@ const assert = require('node:assert'); const fs = require('node:fs'); const { join } = require('node:path'); -const pMap = require('p-map'); +const { default: pMap } = require('p-map'); const prettier = require('prettier'); const { default: packageJson } = require('./packageJson.js'); diff --git a/ts/services/backups/export.preload.ts b/ts/services/backups/export.preload.ts index e93ad09678..6f35f78b30 100644 --- a/ts/services/backups/export.preload.ts +++ b/ts/services/backups/export.preload.ts @@ -3,7 +3,7 @@ import { Aci, Pni, ServiceId } from '@signalapp/libsignal-client'; import { BackupJsonExporter } from '@signalapp/libsignal-client/dist/MessageBackup.js'; -import pMap from 'p-map'; +import { pMapIterable } from 'p-map'; import pTimeout from 'p-timeout'; import { Readable } from 'node:stream'; import lodash from 'lodash'; @@ -193,8 +193,7 @@ const { isNumber } = lodash; const log = createLogger('backupExport'); -// Temporarily limited to preserve the received_at order -const MAX_CONCURRENCY = 1; +const MAX_CONCURRENCY = 100; // We want a very generous timeout to make sure that we always resume write // access to the database. @@ -821,59 +820,64 @@ export class BackupExportStream extends Readable { pni: me.get('pni'), }; - let lastBatch: Promise = Promise.resolve(); + const processMessageBatch = async ( + messages: ReadonlyArray + ): Promise => { + const iter = pMapIterable( + messages, + message => { + if (skippedConversationIds.has(message.conversationId)) { + this.#stats.skippedMessages += 1; + return undefined; + } + + return this.#toChatItem(message, { + aboutMe, + callHistoryByCallId, + pinnedMessagesByMessageId, + }); + }, + { concurrency: MAX_CONCURRENCY } + ); + + for await (const chatItem of iter) { + if (chatItem === undefined) { + this.#stats.skippedMessages += 1; + // Can't be backed up. + continue; + } + + this.#pushFrame({ + chatItem, + }); + + if (this.options.validationRun) { + // flush every chatItem to expose all validation errors + await this.#flush(); + } + + this.#stats.messages += 1; + } + + await this.#flush(); + }; + + let messages: ReadonlyArray = []; while (!cursor?.done) { // Performance optimization: it takes roughly the same time to load and // to process a batch of messages so begin loading the next batch while // processing the current one. // eslint-disable-next-line no-await-in-loop - const [{ messages, cursor: newCursor }] = await Promise.all([ + const [{ messages: newMessages, cursor: newCursor }] = await Promise.all([ DataReader.pageBackupMessages(cursor), - lastBatch, + processMessageBatch(messages), ]); cursor = newCursor; - - lastBatch = (async () => { - await pMap( - messages, - async message => { - if (skippedConversationIds.has(message.conversationId)) { - this.#stats.skippedMessages += 1; - return; - } - - const chatItem = await this.#toChatItem(message, { - aboutMe, - callHistoryByCallId, - pinnedMessagesByMessageId, - }); - - if (chatItem === undefined) { - this.#stats.skippedMessages += 1; - // Can't be backed up. - return; - } - - this.#pushFrame({ - chatItem, - }); - - if (this.options.validationRun) { - // flush every chatItem to expose all validation errors - await this.#flush(); - } - - this.#stats.messages += 1; - }, - { concurrency: MAX_CONCURRENCY } - ); - - await this.#flush(); - })(); + messages = newMessages; } - await lastBatch; + await processMessageBatch(messages); await this.#flush(); log.warn('final stats', { diff --git a/ts/services/backups/index.preload.ts b/ts/services/backups/index.preload.ts index 7bd59c1c73..7e04c2fe35 100644 --- a/ts/services/backups/index.preload.ts +++ b/ts/services/backups/index.preload.ts @@ -754,6 +754,7 @@ export class BackupsService { log.info('internal validation: starting'); const start = Date.now(); + window.IPC.startTrackingQueryStats(); const recordStream = new BackupExportStream({ ...exportOptions, validationRun: true, @@ -762,6 +763,9 @@ export class BackupsService { recordStream.run(); const totalBytes = await validateBackupStream(recordStream); + window.IPC.stopTrackingQueryStats({ + epochName: 'Internal Validate Backup', + }); const duration = Date.now() - start;