Fix backup export concurrency model

This commit is contained in:
Fedor Indutny
2026-03-12 07:50:36 -07:00
committed by GitHub
parent 979497cea3
commit 6e0ec380b6
6 changed files with 62 additions and 60 deletions

View File

@@ -3,7 +3,7 @@
import { Aci, Pni, ServiceId } from '@signalapp/libsignal-client';
import { BackupJsonExporter } from '@signalapp/libsignal-client/dist/MessageBackup.js';
import pMap from 'p-map';
import { pMapIterable } from 'p-map';
import pTimeout from 'p-timeout';
import { Readable } from 'node:stream';
import lodash from 'lodash';
@@ -193,8 +193,7 @@ const { isNumber } = lodash;
const log = createLogger('backupExport');
// Temporarily limited to preserve the received_at order
const MAX_CONCURRENCY = 1;
const MAX_CONCURRENCY = 100;
// We want a very generous timeout to make sure that we always resume write
// access to the database.
@@ -821,59 +820,64 @@ export class BackupExportStream extends Readable {
pni: me.get('pni'),
};
let lastBatch: Promise<void> = Promise.resolve();
const processMessageBatch = async (
messages: ReadonlyArray<MessageAttributesType>
): Promise<void> => {
const iter = pMapIterable(
messages,
message => {
if (skippedConversationIds.has(message.conversationId)) {
this.#stats.skippedMessages += 1;
return undefined;
}
return this.#toChatItem(message, {
aboutMe,
callHistoryByCallId,
pinnedMessagesByMessageId,
});
},
{ concurrency: MAX_CONCURRENCY }
);
for await (const chatItem of iter) {
if (chatItem === undefined) {
this.#stats.skippedMessages += 1;
// Can't be backed up.
continue;
}
this.#pushFrame({
chatItem,
});
if (this.options.validationRun) {
// flush every chatItem to expose all validation errors
await this.#flush();
}
this.#stats.messages += 1;
}
await this.#flush();
};
let messages: ReadonlyArray<MessageAttributesType> = [];
while (!cursor?.done) {
// Performance optimization: it takes roughly the same time to load and
// to process a batch of messages so begin loading the next batch while
// processing the current one.
// eslint-disable-next-line no-await-in-loop
const [{ messages, cursor: newCursor }] = await Promise.all([
const [{ messages: newMessages, cursor: newCursor }] = await Promise.all([
DataReader.pageBackupMessages(cursor),
lastBatch,
processMessageBatch(messages),
]);
cursor = newCursor;
lastBatch = (async () => {
await pMap(
messages,
async message => {
if (skippedConversationIds.has(message.conversationId)) {
this.#stats.skippedMessages += 1;
return;
}
const chatItem = await this.#toChatItem(message, {
aboutMe,
callHistoryByCallId,
pinnedMessagesByMessageId,
});
if (chatItem === undefined) {
this.#stats.skippedMessages += 1;
// Can't be backed up.
return;
}
this.#pushFrame({
chatItem,
});
if (this.options.validationRun) {
// flush every chatItem to expose all validation errors
await this.#flush();
}
this.#stats.messages += 1;
},
{ concurrency: MAX_CONCURRENCY }
);
await this.#flush();
})();
messages = newMessages;
}
await lastBatch;
await processMessageBatch(messages);
await this.#flush();
log.warn('final stats', {