From 79e154d1f709c8e3b09143db23de97de6212dd74 Mon Sep 17 00:00:00 2001 From: automated-signal <37887102+automated-signal@users.noreply.github.com> Date: Mon, 9 Mar 2026 17:36:36 -0500 Subject: [PATCH] Simplify localBackup media parsing Co-authored-by: Fedor Indutny <79877362+indutny-signal@users.noreply.github.com> --- ts/services/backups/util/localBackup.node.ts | 139 +++++-------------- 1 file changed, 37 insertions(+), 102 deletions(-) diff --git a/ts/services/backups/util/localBackup.node.ts b/ts/services/backups/util/localBackup.node.ts index a12082ff7c..1cc1b90f35 100644 --- a/ts/services/backups/util/localBackup.node.ts +++ b/ts/services/backups/util/localBackup.node.ts @@ -5,13 +5,13 @@ import { randomBytes } from 'node:crypto'; import { dirname, join } from 'node:path'; import { readdir, readFile, stat, writeFile } from 'node:fs/promises'; import { createReadStream, createWriteStream } from 'node:fs'; -import { Transform } from 'node:stream'; +import { Readable, Writable } from 'node:stream'; import { pipeline } from 'node:stream/promises'; import { createLogger } from '../../../logging/log.std.js'; import * as Bytes from '../../../Bytes.std.js'; import * as Errors from '../../../types/errors.std.js'; import { Signal } from '../../../protobuf/index.std.js'; -import protobuf from '../../../protobuf/wrap.std.js'; +import { DelimitedStream } from '../../../util/DelimitedStream.node.js'; import { strictAssert } from '../../../util/assert.std.js'; import { decryptAesCtr, encryptAesCtr } from '../../../Crypto.node.js'; import type { LocalBackupMetadataVerificationType } from '../../../types/backups.node.js'; @@ -19,12 +19,9 @@ import { LOCAL_BACKUP_VERSION, LOCAL_BACKUP_BACKUP_ID_IV_LENGTH, } from '../constants.std.js'; -import { explodePromise } from '../../../util/explodePromise.std.js'; const log = createLogger('localBackup'); -const { Reader } = protobuf; - export function getLocalBackupFilesDirectory({ backupsBaseDir, }: { @@ -158,33 +155,26 @@ export async function writeLocalBackupFilesList({ snapshotDir: string; mediaNames: Array; }): Promise> { - const { promise, resolve, reject } = explodePromise>(); - const filesListPath = join(snapshotDir, 'files'); const writeStream = createWriteStream(filesListPath); - writeStream.on('error', error => { - reject(error); - }); const files: Array = []; - for (const mediaName of mediaNames) { - const data = Signal.backup.local.FilesFrame.encodeDelimited({ - mediaName, - }).finish(); - if (!writeStream.write(data)) { - // eslint-disable-next-line no-await-in-loop - await new Promise(resolveStream => - writeStream.once('drain', resolveStream) - ); + + function* generateFrames() { + for (const mediaName of mediaNames) { + const data = Signal.backup.local.FilesFrame.encodeDelimited({ + mediaName, + }).finish(); + + yield data; + + files.push(mediaName); } - files.push(mediaName); } - writeStream.end(() => { - resolve(files); - }); + const frameGenerator = Readable.from(generateFrames()); - await promise; + await pipeline(frameGenerator, writeStream); return files; } @@ -193,10 +183,30 @@ export async function readLocalBackupFilesList( ): Promise> { const filesListPath = join(snapshotDir, 'files'); const readStream = createReadStream(filesListPath); - const parseFilesTransform = new ParseFilesListTransform(); + const delimitedStream = new DelimitedStream(); + + const mediaNames = new Array(); + const parseFilesWritable = new Writable({ + objectMode: true, + write(data, _enc, callback) { + try { + const file = Signal.backup.local.FilesFrame.decode(data); + if (file.mediaName) { + mediaNames.push(file.mediaName); + } else { + log.warn( + 'ParseFilesListTransform: Active file had empty mediaName, ignoring' + ); + } + callback(null); + } catch (error) { + callback(error); + } + }, + }); try { - await pipeline(readStream, parseFilesTransform); + await pipeline(readStream, delimitedStream, parseFilesWritable); } catch (error) { try { readStream.close(); @@ -212,82 +222,7 @@ export async function readLocalBackupFilesList( readStream.close(); - return parseFilesTransform.mediaNames; -} - -export class ParseFilesListTransform extends Transform { - public mediaNames: Array = []; - - public activeFile: Signal.backup.local.FilesFrame | undefined; - #unused: Uint8Array | undefined; - - override async _transform( - chunk: Buffer | undefined, - _encoding: string, - done: (error?: Error) => void - ): Promise { - if (!chunk || chunk.byteLength === 0) { - done(); - return; - } - - try { - let data = chunk; - if (this.#unused) { - data = Buffer.concat([this.#unused, data]); - this.#unused = undefined; - } - - const reader = Reader.create(data); - while (reader.pos < reader.len) { - const startPos = reader.pos; - - if (!this.activeFile) { - try { - this.activeFile = - Signal.backup.local.FilesFrame.decodeDelimited(reader); - } catch (err) { - // We get a RangeError if there wasn't enough data to read the next record. - if (err instanceof RangeError) { - // Note: A failed decodeDelimited() does in fact update reader.pos, so we - // must reset to startPos - this.#unused = data.subarray(startPos); - done(); - return; - } - - // Something deeper has gone wrong; the proto is malformed or something - done(err); - return; - } - } - - if (!this.activeFile) { - done( - new Error( - 'ParseFilesListTransform: No active file after successful decode!' - ) - ); - return; - } - - if (this.activeFile.mediaName) { - this.mediaNames.push(this.activeFile.mediaName); - } else { - log.warn( - 'ParseFilesListTransform: Active file had empty mediaName, ignoring' - ); - } - - this.activeFile = undefined; - } - } catch (error) { - done(error); - return; - } - - done(); - } + return mediaNames; } export type ValidateLocalBackupStructureResultType =