From da339458ce105dd4db98c2a95684bb65f80626d6 Mon Sep 17 00:00:00 2001 From: automated-signal <37887102+automated-signal@users.noreply.github.com> Date: Thu, 12 Mar 2026 11:37:36 -0500 Subject: [PATCH] Use iterator with backpressure for export Co-authored-by: Fedor Indutny <79877362+indutny-signal@users.noreply.github.com> --- ts/services/backups/export.preload.ts | 100 +++++++++++++------------- ts/sql/Server.node.ts | 2 +- 2 files changed, 51 insertions(+), 51 deletions(-) diff --git a/ts/services/backups/export.preload.ts b/ts/services/backups/export.preload.ts index 6f35f78b30..e16fa99c4e 100644 --- a/ts/services/backups/export.preload.ts +++ b/ts/services/backups/export.preload.ts @@ -193,7 +193,8 @@ const { isNumber } = lodash; const log = createLogger('backupExport'); -const MAX_CONCURRENCY = 100; +// We only run 4 sql workers so going much higher doesn't help +const MAX_CONCURRENCY = 8; // We want a very generous timeout to make sure that we always resume write // access to the database. @@ -803,8 +804,6 @@ export class BackupExportStream extends Readable { this.#stats.chatFolders += 1; } - let cursor: PageBackupMessagesCursorType | undefined; - const callHistory = await DataReader.getAllCallHistory(); const callHistoryByCallId = makeLookup(callHistory, 'callId'); @@ -820,64 +819,52 @@ export class BackupExportStream extends Readable { pni: me.get('pni'), }; - const processMessageBatch = async ( - messages: ReadonlyArray - ): Promise => { - const iter = pMapIterable( - messages, - message => { - if (skippedConversationIds.has(message.conversationId)) { - this.#stats.skippedMessages += 1; - return undefined; - } + const FLUSH_EVERY = 10000; - return this.#toChatItem(message, { - aboutMe, - callHistoryByCallId, - pinnedMessagesByMessageId, - }); - }, - { concurrency: MAX_CONCURRENCY } - ); - - for await (const chatItem of iter) { - if (chatItem === undefined) { + const iter = pMapIterable( + Readable.from(getAllMessages(), { + objectMode: true, + highWaterMark: FLUSH_EVERY, + }), + message => { + if (skippedConversationIds.has(message.conversationId)) { this.#stats.skippedMessages += 1; - // Can't be backed up. - continue; + return undefined; } - this.#pushFrame({ - chatItem, + return this.#toChatItem(message, { + aboutMe, + callHistoryByCallId, + pinnedMessagesByMessageId, }); + }, + { + concurrency: MAX_CONCURRENCY, + backpressure: FLUSH_EVERY, + } + ); - if (this.options.validationRun) { - // flush every chatItem to expose all validation errors - await this.#flush(); - } - - this.#stats.messages += 1; + for await (const chatItem of iter) { + if (chatItem === undefined) { + this.#stats.skippedMessages += 1; + // Can't be backed up. + continue; } - await this.#flush(); - }; + this.#pushFrame({ + chatItem, + }); + this.#stats.messages += 1; - let messages: ReadonlyArray = []; - while (!cursor?.done) { - // Performance optimization: it takes roughly the same time to load and - // to process a batch of messages so begin loading the next batch while - // processing the current one. - - // eslint-disable-next-line no-await-in-loop - const [{ messages: newMessages, cursor: newCursor }] = await Promise.all([ - DataReader.pageBackupMessages(cursor), - processMessageBatch(messages), - ]); - cursor = newCursor; - messages = newMessages; + if ( + this.options.validationRun || + this.#stats.messages % FLUSH_EVERY === 0 + ) { + // flush every chatItem to expose all validation errors + await this.#flush(); + } } - await processMessageBatch(messages); await this.#flush(); log.warn('final stats', { @@ -3954,3 +3941,16 @@ function toAppTheme(theme: ThemeType): Backups.AccountData.AppTheme { return ENUM.SYSTEM; } + +async function* getAllMessages(): AsyncIterable { + let cursor: PageBackupMessagesCursorType | undefined; + while (!cursor?.done) { + const { messages, cursor: newCursor } = + // eslint-disable-next-line no-await-in-loop + await DataReader.pageBackupMessages(cursor); + + cursor = newCursor; + + yield* messages; + } +} diff --git a/ts/sql/Server.node.ts b/ts/sql/Server.node.ts index 111e107535..3e28ea5e0d 100644 --- a/ts/sql/Server.node.ts +++ b/ts/sql/Server.node.ts @@ -8965,7 +8965,7 @@ function pageBackupMessages( db: ReadableDB, cursor?: PageBackupMessagesCursorType ): PageBackupMessagesResultType { - const LIMIT = 10000; + const LIMIT = 1000; const [query, params] = sql` SELECT rowid,