From f1c5f73b39d87f9179c4a0802121679a51085239 Mon Sep 17 00:00:00 2001 From: Fedor Indutny <79877362+indutny-signal@users.noreply.github.com> Date: Mon, 9 Mar 2026 12:08:02 -0700 Subject: [PATCH] Simplify ContactsParser --- .../ContactsParser_test.preload.ts | 5 +- .../util/DelimitedStream_test.node.ts | 2 +- ts/textsecure/ContactsParser.preload.ts | 158 ++++-------------- ts/util/DelimitedStream.node.ts | 123 ++++++++++---- 4 files changed, 135 insertions(+), 153 deletions(-) diff --git a/ts/test-electron/ContactsParser_test.preload.ts b/ts/test-electron/ContactsParser_test.preload.ts index baf05f53b6..86c45bf600 100644 --- a/ts/test-electron/ContactsParser_test.preload.ts +++ b/ts/test-electron/ContactsParser_test.preload.ts @@ -254,6 +254,9 @@ async function parseContactsWithSmallChunkSize({ const smallChunksTransform = new SmallChunksTransform(32); const parseContactsTransform = new ParseContactsTransform(); + const contacts = new Array(); + parseContactsTransform.on('data', contact => contacts.push(contact)); + try { await pipeline(readStream, smallChunksTransform, parseContactsTransform); } catch (error) { @@ -271,5 +274,5 @@ async function parseContactsWithSmallChunkSize({ readStream.close(); - return parseContactsTransform.contacts; + return contacts; } diff --git a/ts/test-node/util/DelimitedStream_test.node.ts b/ts/test-node/util/DelimitedStream_test.node.ts index fbdb848826..1e24d28c73 100644 --- a/ts/test-node/util/DelimitedStream_test.node.ts +++ b/ts/test-node/util/DelimitedStream_test.node.ts @@ -114,7 +114,7 @@ describe('DelimitedStream', () => { new DelimitedStream(), collect(out) ), - 'Unfinished data' + 'Unfinished frame' ); }); diff --git a/ts/textsecure/ContactsParser.preload.ts b/ts/textsecure/ContactsParser.preload.ts index 6de483fcab..9c6247d7a5 100644 --- a/ts/textsecure/ContactsParser.preload.ts +++ b/ts/textsecure/ContactsParser.preload.ts @@ -1,17 +1,15 @@ // Copyright 2020 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only -import { Transform } from 'node:stream'; - import { createLogger } from '../logging/log.std.js'; import { SignalService as Proto } from '../protobuf/index.std.js'; -import protobuf from '../protobuf/wrap.std.js'; import { DurationInSeconds } from '../util/durations/index.std.js'; +import { DelimitedStream } from '../util/DelimitedStream.node.js'; import { getAbsoluteAttachmentPath, writeNewAttachmentData, - maybeDeleteAttachmentFile, } from '../util/migrations.preload.js'; +import { strictAssert } from '../util/assert.std.js'; import type { ContactAvatarType } from '../types/Avatar.std.js'; import type { AttachmentType } from '../types/Attachment.std.js'; import type { AciString } from '../types/ServiceId.std.js'; @@ -26,8 +24,6 @@ import { stringToMIMEType } from '../types/MIME.std.js'; const log = createLogger('ContactsParser'); -const { Reader } = protobuf; - type OptionalFields = { avatar?: Avatar | null; expireTimer?: number | null; @@ -61,6 +57,8 @@ export async function parseContactsV2( } const parseContactsTransform = new ParseContactsTransform(); + const contacts = new Array(); + parseContactsTransform.on('data', contact => contacts.push(contact)); await decryptAttachmentV2ToSink( { @@ -74,7 +72,7 @@ export async function parseContactsV2( parseContactsTransform ); - return parseContactsTransform.contacts; + return contacts; } // This transform pulls contacts and their avatars from a stream of bytes. This is tricky, @@ -82,125 +80,26 @@ export async function parseContactsV2( // So we are ready for decodeDelimited() to throw, and to keep activeContact around // while we wait for more chunks to get to the expected avatar size. // Note: exported only for testing -export class ParseContactsTransform extends Transform { - public contacts: Array = []; +export class ParseContactsTransform extends DelimitedStream { + protected override getTrailerSize(frame: Buffer): number { + const contact = Proto.ContactDetails.decode(frame); + return contact.avatar?.length ?? 0; + } - public activeContact: Proto.ContactDetails | undefined; - #unused: Uint8Array | undefined; - - override async _transform( - chunk: Buffer | undefined, - _encoding: string, - done: (error?: Error) => void + protected override async pushFrame( + frame: Buffer, + avatarData: Buffer ): Promise { - if (!chunk || chunk.byteLength === 0) { - done(); - return; - } + const contact = Proto.ContactDetails.decode(frame); - try { - let data = chunk; - if (this.#unused) { - data = Buffer.concat([this.#unused, data]); - this.#unused = undefined; - } - - const reader = Reader.create(data); - while (reader.pos < reader.len) { - const startPos = reader.pos; - - if (!this.activeContact) { - try { - this.activeContact = Proto.ContactDetails.decodeDelimited(reader); - } catch (err) { - // We get a RangeError if there wasn't enough data to read the next record. - if (err instanceof RangeError) { - // Note: A failed decodeDelimited() does in fact update reader.pos, so we - // must reset to startPos - this.#unused = data.subarray(startPos); - done(); - return; - } - - // Something deeper has gone wrong; the proto is malformed or something - done(err); - return; - } - } - - // Something has really gone wrong if the above parsing didn't throw but gave - // us nothing back. Let's end the parse. - if (!this.activeContact) { - done(new Error('ParseContactsTransform: No active contact!')); - return; - } - - const attachmentSize = this.activeContact?.avatar?.length ?? 0; - if (attachmentSize === 0) { - // No avatar attachment for this contact - const prepared = prepareContact(this.activeContact); - if (prepared) { - this.contacts.push(prepared); - } - this.activeContact = undefined; - - continue; - } - - const spaceLeftAfterRead = reader.len - (reader.pos + attachmentSize); - if (spaceLeftAfterRead >= 0) { - // We've read enough data to read the entire attachment - const avatarData = reader.buf.subarray( - reader.pos, - reader.pos + attachmentSize - ); - const hash = computeHash(avatarData); - - const local = - // eslint-disable-next-line no-await-in-loop - await writeNewAttachmentData(avatarData); - - const contentType = this.activeContact.avatar?.contentType; - const prepared = prepareContact(this.activeContact, { - ...this.activeContact.avatar, - ...local, - contentType: contentType - ? stringToMIMEType(contentType) - : undefined, - hash, - }); - if (prepared) { - this.contacts.push(prepared); - } else { - // eslint-disable-next-line no-await-in-loop - await maybeDeleteAttachmentFile(local.path); - } - this.activeContact = undefined; - - reader.skip(attachmentSize); - } else { - // We have an attachment, but we haven't read enough data yet. We need to - // wait for another chunk. - this.#unused = data.subarray(reader.pos); - done(); - return; - } - } - - // No need to push; no downstream consumers! - } catch (error) { - done(error); - return; - } - - done(); + this.push(await prepareContact(contact, avatarData)); } } -function prepareContact( +async function prepareContact( { aci: rawAci, aciBinary, ...proto }: Proto.ContactDetails, - avatar?: ContactAvatarType -): ContactDetailsWithAvatar | undefined { + avatarData: Uint8Array +): Promise { const expireTimer = proto.expireTimer != null ? DurationInSeconds.fromSeconds(proto.expireTimer) @@ -213,7 +112,24 @@ function prepareContact( return undefined; } - const result = { + let avatar: ContactAvatarType | undefined; + if (avatarData.byteLength > 0) { + strictAssert(proto.avatar != null, 'Expected avatar with avatar data'); + + const hash = computeHash(avatarData); + + const local = await writeNewAttachmentData(avatarData); + + const contentType = proto.avatar?.contentType; + avatar = { + ...proto.avatar, + ...local, + contentType: contentType ? stringToMIMEType(contentType) : undefined, + hash, + }; + } + + return { ...proto, expireTimer, expireTimerVersion: proto.expireTimerVersion ?? null, @@ -221,6 +137,4 @@ function prepareContact( avatar, number: dropNull(proto.number), }; - - return result; } diff --git a/ts/util/DelimitedStream.node.ts b/ts/util/DelimitedStream.node.ts index 7370e4d6e4..f262f05063 100644 --- a/ts/util/DelimitedStream.node.ts +++ b/ts/util/DelimitedStream.node.ts @@ -5,29 +5,41 @@ import { Transform } from 'node:stream'; import { missingCaseError } from './missingCaseError.std.js'; -enum State { - Prefix = 'Prefix', - Data = 'Data', -} +type State = + | { + kind: 'prefix'; + size: number; + value: number; + } + | { + kind: 'frame'; + remaining: number; + parts: Array; + } + | { + kind: 'trailer'; + frame: Buffer; + remaining: number; + parts: Array; + }; + +const EMPTY_TRAILER = Buffer.alloc(0); export class DelimitedStream extends Transform { - #state = State.Prefix; - #prefixValue = 0; - #prefixSize = 0; - #parts = new Array(); + #state: State = { kind: 'prefix', size: 0, value: 0 }; constructor() { super({ readableObjectMode: true }); } - override _transform( + override async _transform( chunk: Buffer, _encoding: BufferEncoding, done: (error?: Error) => void - ): void { + ): Promise { let offset = 0; while (offset < chunk.length) { - if (this.#state === State.Prefix) { + if (this.#state.kind === 'prefix') { const b = chunk[offset]; offset += 1; @@ -38,35 +50,73 @@ export class DelimitedStream extends Transform { const value = b & 0x7f; // eslint-disable-next-line no-bitwise - this.#prefixValue |= value << (7 * this.#prefixSize); - this.#prefixSize += 1; + this.#state.value |= value << (7 * this.#state.size); + this.#state.size += 1; // Check that we didn't go over 32bits. Node.js buffers can never // be larger than 2gb anyway! - if (this.#prefixSize > 4) { + if (this.#state.size > 4) { done(new Error('Delimiter encoding overflow')); return; } if (isLast) { - this.#state = State.Data; + this.#state = { + kind: 'frame', + remaining: this.#state.value, + parts: [], + }; } - } else if (this.#state === State.Data) { - const toTake = Math.min(this.#prefixValue, chunk.length - offset); + } else if ( + this.#state.kind === 'frame' || + this.#state.kind === 'trailer' + ) { + const toTake = Math.min(this.#state.remaining, chunk.length - offset); const part = chunk.slice(offset, offset + toTake); offset += toTake; - this.#prefixValue -= toTake; + this.#state.remaining -= toTake; - this.#parts.push(part); + this.#state.parts.push(part); - if (this.#prefixValue <= 0) { - this.#state = State.Prefix; - this.#prefixSize = 0; - this.#prefixValue = 0; + if (this.#state.remaining > 0) { + continue; + } - const whole = Buffer.concat(this.#parts); - this.#parts = []; - this.push(whole); + if (this.#state.kind === 'frame') { + const frame = Buffer.concat(this.#state.parts); + const trailerSize = this.getTrailerSize(frame); + + if (trailerSize === 0) { + this.#state = { + kind: 'prefix', + size: 0, + value: 0, + }; + + // eslint-disable-next-line no-await-in-loop + await this.pushFrame(frame, EMPTY_TRAILER); + } else { + this.#state = { + kind: 'trailer', + frame, + remaining: trailerSize, + parts: [], + }; + } + } else if (this.#state.kind === 'trailer') { + const oldState = this.#state; + const trailer = Buffer.concat(this.#state.parts); + + this.#state = { + kind: 'prefix', + size: 0, + value: 0, + }; + + // eslint-disable-next-line no-await-in-loop + await this.pushFrame(oldState.frame, trailer); + } else { + throw missingCaseError(this.#state); } } else { throw missingCaseError(this.#state); @@ -76,16 +126,31 @@ export class DelimitedStream extends Transform { } override _flush(done: (error?: Error) => void): void { - if (this.#state !== State.Prefix) { - done(new Error('Unfinished data')); + if (this.#state.kind === 'frame') { + done(new Error('Unfinished frame')); return; } + if (this.#state.kind === 'trailer') { + done(new Error('Unfinished trailer')); + return; + } + if (this.#state.kind !== 'prefix') { + throw missingCaseError(this.#state); + } - if (this.#prefixSize !== 0) { + if (this.#state.size !== 0) { done(new Error('Unfinished prefix')); return; } done(); } + + protected getTrailerSize(_frame: Buffer): number { + return 0; + } + + protected async pushFrame(frame: Buffer, _trailer: Buffer): Promise { + this.push(frame); + } }