Simplify ContactsParser

This commit is contained in:
Fedor Indutny
2026-03-09 12:08:02 -07:00
committed by GitHub
parent fcf067303a
commit f1c5f73b39
4 changed files with 135 additions and 153 deletions

View File

@@ -254,6 +254,9 @@ async function parseContactsWithSmallChunkSize({
const smallChunksTransform = new SmallChunksTransform(32);
const parseContactsTransform = new ParseContactsTransform();
const contacts = new Array<ContactDetailsWithAvatar>();
parseContactsTransform.on('data', contact => contacts.push(contact));
try {
await pipeline(readStream, smallChunksTransform, parseContactsTransform);
} catch (error) {
@@ -271,5 +274,5 @@ async function parseContactsWithSmallChunkSize({
readStream.close();
return parseContactsTransform.contacts;
return contacts;
}

View File

@@ -114,7 +114,7 @@ describe('DelimitedStream', () => {
new DelimitedStream(),
collect(out)
),
'Unfinished data'
'Unfinished frame'
);
});

View File

@@ -1,17 +1,15 @@
// Copyright 2020 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { Transform } from 'node:stream';
import { createLogger } from '../logging/log.std.js';
import { SignalService as Proto } from '../protobuf/index.std.js';
import protobuf from '../protobuf/wrap.std.js';
import { DurationInSeconds } from '../util/durations/index.std.js';
import { DelimitedStream } from '../util/DelimitedStream.node.js';
import {
getAbsoluteAttachmentPath,
writeNewAttachmentData,
maybeDeleteAttachmentFile,
} from '../util/migrations.preload.js';
import { strictAssert } from '../util/assert.std.js';
import type { ContactAvatarType } from '../types/Avatar.std.js';
import type { AttachmentType } from '../types/Attachment.std.js';
import type { AciString } from '../types/ServiceId.std.js';
@@ -26,8 +24,6 @@ import { stringToMIMEType } from '../types/MIME.std.js';
const log = createLogger('ContactsParser');
const { Reader } = protobuf;
type OptionalFields = {
avatar?: Avatar | null;
expireTimer?: number | null;
@@ -61,6 +57,8 @@ export async function parseContactsV2(
}
const parseContactsTransform = new ParseContactsTransform();
const contacts = new Array<ContactDetailsWithAvatar>();
parseContactsTransform.on('data', contact => contacts.push(contact));
await decryptAttachmentV2ToSink(
{
@@ -74,7 +72,7 @@ export async function parseContactsV2(
parseContactsTransform
);
return parseContactsTransform.contacts;
return contacts;
}
// This transform pulls contacts and their avatars from a stream of bytes. This is tricky,
@@ -82,125 +80,26 @@ export async function parseContactsV2(
// So we are ready for decodeDelimited() to throw, and to keep activeContact around
// while we wait for more chunks to get to the expected avatar size.
// Note: exported only for testing
export class ParseContactsTransform extends Transform {
public contacts: Array<ContactDetailsWithAvatar> = [];
export class ParseContactsTransform extends DelimitedStream {
protected override getTrailerSize(frame: Buffer): number {
const contact = Proto.ContactDetails.decode(frame);
return contact.avatar?.length ?? 0;
}
public activeContact: Proto.ContactDetails | undefined;
#unused: Uint8Array | undefined;
override async _transform(
chunk: Buffer | undefined,
_encoding: string,
done: (error?: Error) => void
protected override async pushFrame(
frame: Buffer,
avatarData: Buffer
): Promise<void> {
if (!chunk || chunk.byteLength === 0) {
done();
return;
}
const contact = Proto.ContactDetails.decode(frame);
try {
let data = chunk;
if (this.#unused) {
data = Buffer.concat([this.#unused, data]);
this.#unused = undefined;
}
const reader = Reader.create(data);
while (reader.pos < reader.len) {
const startPos = reader.pos;
if (!this.activeContact) {
try {
this.activeContact = Proto.ContactDetails.decodeDelimited(reader);
} catch (err) {
// We get a RangeError if there wasn't enough data to read the next record.
if (err instanceof RangeError) {
// Note: A failed decodeDelimited() does in fact update reader.pos, so we
// must reset to startPos
this.#unused = data.subarray(startPos);
done();
return;
}
// Something deeper has gone wrong; the proto is malformed or something
done(err);
return;
}
}
// Something has really gone wrong if the above parsing didn't throw but gave
// us nothing back. Let's end the parse.
if (!this.activeContact) {
done(new Error('ParseContactsTransform: No active contact!'));
return;
}
const attachmentSize = this.activeContact?.avatar?.length ?? 0;
if (attachmentSize === 0) {
// No avatar attachment for this contact
const prepared = prepareContact(this.activeContact);
if (prepared) {
this.contacts.push(prepared);
}
this.activeContact = undefined;
continue;
}
const spaceLeftAfterRead = reader.len - (reader.pos + attachmentSize);
if (spaceLeftAfterRead >= 0) {
// We've read enough data to read the entire attachment
const avatarData = reader.buf.subarray(
reader.pos,
reader.pos + attachmentSize
);
const hash = computeHash(avatarData);
const local =
// eslint-disable-next-line no-await-in-loop
await writeNewAttachmentData(avatarData);
const contentType = this.activeContact.avatar?.contentType;
const prepared = prepareContact(this.activeContact, {
...this.activeContact.avatar,
...local,
contentType: contentType
? stringToMIMEType(contentType)
: undefined,
hash,
});
if (prepared) {
this.contacts.push(prepared);
} else {
// eslint-disable-next-line no-await-in-loop
await maybeDeleteAttachmentFile(local.path);
}
this.activeContact = undefined;
reader.skip(attachmentSize);
} else {
// We have an attachment, but we haven't read enough data yet. We need to
// wait for another chunk.
this.#unused = data.subarray(reader.pos);
done();
return;
}
}
// No need to push; no downstream consumers!
} catch (error) {
done(error);
return;
}
done();
this.push(await prepareContact(contact, avatarData));
}
}
function prepareContact(
async function prepareContact(
{ aci: rawAci, aciBinary, ...proto }: Proto.ContactDetails,
avatar?: ContactAvatarType
): ContactDetailsWithAvatar | undefined {
avatarData: Uint8Array
): Promise<ContactDetailsWithAvatar | undefined> {
const expireTimer =
proto.expireTimer != null
? DurationInSeconds.fromSeconds(proto.expireTimer)
@@ -213,7 +112,24 @@ function prepareContact(
return undefined;
}
const result = {
let avatar: ContactAvatarType | undefined;
if (avatarData.byteLength > 0) {
strictAssert(proto.avatar != null, 'Expected avatar with avatar data');
const hash = computeHash(avatarData);
const local = await writeNewAttachmentData(avatarData);
const contentType = proto.avatar?.contentType;
avatar = {
...proto.avatar,
...local,
contentType: contentType ? stringToMIMEType(contentType) : undefined,
hash,
};
}
return {
...proto,
expireTimer,
expireTimerVersion: proto.expireTimerVersion ?? null,
@@ -221,6 +137,4 @@ function prepareContact(
avatar,
number: dropNull(proto.number),
};
return result;
}

View File

@@ -5,29 +5,41 @@ import { Transform } from 'node:stream';
import { missingCaseError } from './missingCaseError.std.js';
enum State {
Prefix = 'Prefix',
Data = 'Data',
}
type State =
| {
kind: 'prefix';
size: number;
value: number;
}
| {
kind: 'frame';
remaining: number;
parts: Array<Buffer>;
}
| {
kind: 'trailer';
frame: Buffer;
remaining: number;
parts: Array<Buffer>;
};
const EMPTY_TRAILER = Buffer.alloc(0);
export class DelimitedStream extends Transform {
#state = State.Prefix;
#prefixValue = 0;
#prefixSize = 0;
#parts = new Array<Buffer>();
#state: State = { kind: 'prefix', size: 0, value: 0 };
constructor() {
super({ readableObjectMode: true });
}
override _transform(
override async _transform(
chunk: Buffer,
_encoding: BufferEncoding,
done: (error?: Error) => void
): void {
): Promise<void> {
let offset = 0;
while (offset < chunk.length) {
if (this.#state === State.Prefix) {
if (this.#state.kind === 'prefix') {
const b = chunk[offset];
offset += 1;
@@ -38,35 +50,73 @@ export class DelimitedStream extends Transform {
const value = b & 0x7f;
// eslint-disable-next-line no-bitwise
this.#prefixValue |= value << (7 * this.#prefixSize);
this.#prefixSize += 1;
this.#state.value |= value << (7 * this.#state.size);
this.#state.size += 1;
// Check that we didn't go over 32bits. Node.js buffers can never
// be larger than 2gb anyway!
if (this.#prefixSize > 4) {
if (this.#state.size > 4) {
done(new Error('Delimiter encoding overflow'));
return;
}
if (isLast) {
this.#state = State.Data;
this.#state = {
kind: 'frame',
remaining: this.#state.value,
parts: [],
};
}
} else if (this.#state === State.Data) {
const toTake = Math.min(this.#prefixValue, chunk.length - offset);
} else if (
this.#state.kind === 'frame' ||
this.#state.kind === 'trailer'
) {
const toTake = Math.min(this.#state.remaining, chunk.length - offset);
const part = chunk.slice(offset, offset + toTake);
offset += toTake;
this.#prefixValue -= toTake;
this.#state.remaining -= toTake;
this.#parts.push(part);
this.#state.parts.push(part);
if (this.#prefixValue <= 0) {
this.#state = State.Prefix;
this.#prefixSize = 0;
this.#prefixValue = 0;
if (this.#state.remaining > 0) {
continue;
}
const whole = Buffer.concat(this.#parts);
this.#parts = [];
this.push(whole);
if (this.#state.kind === 'frame') {
const frame = Buffer.concat(this.#state.parts);
const trailerSize = this.getTrailerSize(frame);
if (trailerSize === 0) {
this.#state = {
kind: 'prefix',
size: 0,
value: 0,
};
// eslint-disable-next-line no-await-in-loop
await this.pushFrame(frame, EMPTY_TRAILER);
} else {
this.#state = {
kind: 'trailer',
frame,
remaining: trailerSize,
parts: [],
};
}
} else if (this.#state.kind === 'trailer') {
const oldState = this.#state;
const trailer = Buffer.concat(this.#state.parts);
this.#state = {
kind: 'prefix',
size: 0,
value: 0,
};
// eslint-disable-next-line no-await-in-loop
await this.pushFrame(oldState.frame, trailer);
} else {
throw missingCaseError(this.#state);
}
} else {
throw missingCaseError(this.#state);
@@ -76,16 +126,31 @@ export class DelimitedStream extends Transform {
}
override _flush(done: (error?: Error) => void): void {
if (this.#state !== State.Prefix) {
done(new Error('Unfinished data'));
if (this.#state.kind === 'frame') {
done(new Error('Unfinished frame'));
return;
}
if (this.#state.kind === 'trailer') {
done(new Error('Unfinished trailer'));
return;
}
if (this.#state.kind !== 'prefix') {
throw missingCaseError(this.#state);
}
if (this.#prefixSize !== 0) {
if (this.#state.size !== 0) {
done(new Error('Unfinished prefix'));
return;
}
done();
}
protected getTrailerSize(_frame: Buffer): number {
return 0;
}
protected async pushFrame(frame: Buffer, _trailer: Buffer): Promise<void> {
this.push(frame);
}
}