Improve backup export speed

This commit is contained in:
Fedor Indutny
2026-03-11 17:12:27 -07:00
committed by GitHub
parent 52ba69a6f2
commit 979497cea3
6 changed files with 77 additions and 16 deletions

View File

@@ -17,7 +17,7 @@ import {
resumeWriteAccess,
} from '../../sql/Client.preload.js';
import type {
PageMessagesCursorType,
PageBackupMessagesCursorType,
IdentityKeyType,
} from '../../sql/Interface.std.js';
import { createLogger } from '../../logging/log.std.js';
@@ -804,7 +804,7 @@ export class BackupExportStream extends Readable {
this.#stats.chatFolders += 1;
}
let cursor: PageMessagesCursorType | undefined;
let cursor: PageBackupMessagesCursorType | undefined;
const callHistory = await DataReader.getAllCallHistory();
const callHistoryByCallId = makeLookup(callHistory, 'callId');
@@ -821,13 +821,20 @@ export class BackupExportStream extends Readable {
pni: me.get('pni'),
};
try {
while (!cursor?.done) {
const { messages, cursor: newCursor } =
// eslint-disable-next-line no-await-in-loop
await DataReader.pageMessages(cursor);
let lastBatch: Promise<void> = Promise.resolve();
while (!cursor?.done) {
// Performance optimization: it takes roughly the same time to load and
// to process a batch of messages so begin loading the next batch while
// processing the current one.
// eslint-disable-next-line no-await-in-loop
// eslint-disable-next-line no-await-in-loop
const [{ messages, cursor: newCursor }] = await Promise.all([
DataReader.pageBackupMessages(cursor),
lastBatch,
]);
cursor = newCursor;
lastBatch = (async () => {
await pMap(
messages,
async message => {
@@ -862,17 +869,11 @@ export class BackupExportStream extends Readable {
{ concurrency: MAX_CONCURRENCY }
);
cursor = newCursor;
// eslint-disable-next-line no-await-in-loop
await this.#flush();
}
} finally {
if (cursor !== undefined) {
await DataReader.finishPageMessages(cursor);
}
})();
}
await lastBatch;
await this.#flush();
log.warn('final stats', {

View File

@@ -1174,6 +1174,7 @@ export class BackupsService {
this.#isRunning = 'export';
const start = Date.now();
window.IPC.startTrackingQueryStats();
try {
if (options.type === 'remote') {
strictAssert(
@@ -1274,6 +1275,7 @@ export class BackupsService {
duration,
};
} finally {
window.IPC.stopTrackingQueryStats({ epochName: 'Backup Export' });
log.info('exportBackup: finished...');
this.#isRunning = false;
}

View File

@@ -559,6 +559,18 @@ export type PageMessagesResultType = Readonly<{
messages: ReadonlyArray<MessageAttributesType>;
}>;
export type PageBackupMessagesCursorType = Readonly<{
__page_backup_messages_cursor: never;
nextRowid: number;
done: boolean;
}>;
export type PageBackupMessagesResultType = Readonly<{
cursor: PageBackupMessagesCursorType;
messages: ReadonlyArray<MessageAttributesType>;
}>;
export type GetAllStoriesResultType = ReadonlyArray<
MessageType & {
hasReplies: boolean;
@@ -876,6 +888,12 @@ type ReadableInterface = {
pageMessages: (cursor?: PageMessagesCursorType) => PageMessagesResultType;
finishPageMessages: (cursor: PageMessagesCursorType) => void;
// Must not be used when write access is not paused.
// See `pauseWriteAccess`
pageBackupMessages: (
cursor?: PageBackupMessagesCursorType
) => PageBackupMessagesResultType;
getTotalUnreadForConversation: (
conversationId: string,
options: {

View File

@@ -153,6 +153,8 @@ import type {
MessageType,
PageMessagesCursorType,
PageMessagesResultType,
PageBackupMessagesCursorType,
PageBackupMessagesResultType,
PreKeyIdType,
PollVoteReadResultType,
ReactionResultType,
@@ -573,6 +575,7 @@ export const DataReader: ServerReadableInterface = {
finishGetKnownMessageAttachments,
pageMessages,
finishPageMessages,
pageBackupMessages,
getKnownDownloads,
getKnownConversationAttachments,
@@ -8958,6 +8961,32 @@ function finishPageMessages(
`);
}
function pageBackupMessages(
db: ReadableDB,
cursor?: PageBackupMessagesCursorType
): PageBackupMessagesResultType {
const LIMIT = 10000;
const [query, params] = sql`
SELECT
rowid,
${MESSAGE_COLUMNS_FRAGMENT}
FROM messages
WHERE
rowid >= ${cursor?.nextRowid ?? 0}
LIMIT ${LIMIT}
`;
const rows: Array<MessageTypeUnhydrated & { rowid: number }> = db
.prepare(query)
.all(params);
return {
cursor: {
nextRowid: rows.at(-1)?.rowid ?? 0,
done: rows.length < LIMIT,
} as PageBackupMessagesCursorType,
messages: hydrateMessages(db, rows),
};
}
function getKnownDownloads(db: ReadableDB): Array<string> {
const result = [];

View File

@@ -276,6 +276,12 @@ export class MainSQL {
duration: number;
}>;
if (method === 'pageBackupMessages' && this.#pauseWaiters == null) {
throw new Error(
'pageBackupMessages can only run after pauseWriteAccess()'
);
}
// pageMessages runs over several queries and needs to have access to
// the same temporary table, it also creates temporary insert/update
// triggers so it has to run on the same connection that updates the tables

View File

@@ -20,6 +20,11 @@ export function unicodeSlice(
begin: number,
end: number
): string {
// Optimization: whole string fits into the range, return as is
if (begin === 0 && end >= input.length) {
return input;
}
// Until https://chromium-review.googlesource.com/c/v8/v8/+/4190519 is merged,
// we should limit the input size to avoid allocating tons of memory.
// This should be longer than any max length we'd expect to slice.