Use iterator with backpressure for export

Co-authored-by: Fedor Indutny <79877362+indutny-signal@users.noreply.github.com>
This commit is contained in:
automated-signal
2026-03-12 11:37:36 -05:00
committed by GitHub
parent b4c2749a07
commit da339458ce
2 changed files with 51 additions and 51 deletions

View File

@@ -193,7 +193,8 @@ const { isNumber } = lodash;
const log = createLogger('backupExport');
const MAX_CONCURRENCY = 100;
// We only run 4 sql workers so going much higher doesn't help
const MAX_CONCURRENCY = 8;
// We want a very generous timeout to make sure that we always resume write
// access to the database.
@@ -803,8 +804,6 @@ export class BackupExportStream extends Readable {
this.#stats.chatFolders += 1;
}
let cursor: PageBackupMessagesCursorType | undefined;
const callHistory = await DataReader.getAllCallHistory();
const callHistoryByCallId = makeLookup(callHistory, 'callId');
@@ -820,64 +819,52 @@ export class BackupExportStream extends Readable {
pni: me.get('pni'),
};
const processMessageBatch = async (
messages: ReadonlyArray<MessageAttributesType>
): Promise<void> => {
const iter = pMapIterable(
messages,
message => {
if (skippedConversationIds.has(message.conversationId)) {
this.#stats.skippedMessages += 1;
return undefined;
}
const FLUSH_EVERY = 10000;
return this.#toChatItem(message, {
aboutMe,
callHistoryByCallId,
pinnedMessagesByMessageId,
});
},
{ concurrency: MAX_CONCURRENCY }
);
for await (const chatItem of iter) {
if (chatItem === undefined) {
const iter = pMapIterable(
Readable.from(getAllMessages(), {
objectMode: true,
highWaterMark: FLUSH_EVERY,
}),
message => {
if (skippedConversationIds.has(message.conversationId)) {
this.#stats.skippedMessages += 1;
// Can't be backed up.
continue;
return undefined;
}
this.#pushFrame({
chatItem,
return this.#toChatItem(message, {
aboutMe,
callHistoryByCallId,
pinnedMessagesByMessageId,
});
},
{
concurrency: MAX_CONCURRENCY,
backpressure: FLUSH_EVERY,
}
);
if (this.options.validationRun) {
// flush every chatItem to expose all validation errors
await this.#flush();
}
this.#stats.messages += 1;
for await (const chatItem of iter) {
if (chatItem === undefined) {
this.#stats.skippedMessages += 1;
// Can't be backed up.
continue;
}
await this.#flush();
};
this.#pushFrame({
chatItem,
});
this.#stats.messages += 1;
let messages: ReadonlyArray<MessageAttributesType> = [];
while (!cursor?.done) {
// Performance optimization: it takes roughly the same time to load and
// to process a batch of messages so begin loading the next batch while
// processing the current one.
// eslint-disable-next-line no-await-in-loop
const [{ messages: newMessages, cursor: newCursor }] = await Promise.all([
DataReader.pageBackupMessages(cursor),
processMessageBatch(messages),
]);
cursor = newCursor;
messages = newMessages;
if (
this.options.validationRun ||
this.#stats.messages % FLUSH_EVERY === 0
) {
// flush every chatItem to expose all validation errors
await this.#flush();
}
}
await processMessageBatch(messages);
await this.#flush();
log.warn('final stats', {
@@ -3954,3 +3941,16 @@ function toAppTheme(theme: ThemeType): Backups.AccountData.AppTheme {
return ENUM.SYSTEM;
}
async function* getAllMessages(): AsyncIterable<MessageAttributesType> {
let cursor: PageBackupMessagesCursorType | undefined;
while (!cursor?.done) {
const { messages, cursor: newCursor } =
// eslint-disable-next-line no-await-in-loop
await DataReader.pageBackupMessages(cursor);
cursor = newCursor;
yield* messages;
}
}

View File

@@ -8965,7 +8965,7 @@ function pageBackupMessages(
db: ReadableDB,
cursor?: PageBackupMessagesCursorType
): PageBackupMessagesResultType {
const LIMIT = 10000;
const LIMIT = 1000;
const [query, params] = sql`
SELECT
rowid,