Simplify ContactsParser

Co-authored-by: Fedor Indutny <79877362+indutny-signal@users.noreply.github.com>
This commit is contained in:
automated-signal
2026-03-09 17:36:42 -05:00
committed by GitHub
parent 79e154d1f7
commit b4f4ba35b1
4 changed files with 135 additions and 153 deletions

View File

@@ -5,29 +5,41 @@ import { Transform } from 'node:stream';
import { missingCaseError } from './missingCaseError.std.js';
enum State {
Prefix = 'Prefix',
Data = 'Data',
}
type State =
| {
kind: 'prefix';
size: number;
value: number;
}
| {
kind: 'frame';
remaining: number;
parts: Array<Buffer>;
}
| {
kind: 'trailer';
frame: Buffer;
remaining: number;
parts: Array<Buffer>;
};
const EMPTY_TRAILER = Buffer.alloc(0);
export class DelimitedStream extends Transform {
#state = State.Prefix;
#prefixValue = 0;
#prefixSize = 0;
#parts = new Array<Buffer>();
#state: State = { kind: 'prefix', size: 0, value: 0 };
constructor() {
super({ readableObjectMode: true });
}
override _transform(
override async _transform(
chunk: Buffer,
_encoding: BufferEncoding,
done: (error?: Error) => void
): void {
): Promise<void> {
let offset = 0;
while (offset < chunk.length) {
if (this.#state === State.Prefix) {
if (this.#state.kind === 'prefix') {
const b = chunk[offset];
offset += 1;
@@ -38,35 +50,73 @@ export class DelimitedStream extends Transform {
const value = b & 0x7f;
// eslint-disable-next-line no-bitwise
this.#prefixValue |= value << (7 * this.#prefixSize);
this.#prefixSize += 1;
this.#state.value |= value << (7 * this.#state.size);
this.#state.size += 1;
// Check that we didn't go over 32bits. Node.js buffers can never
// be larger than 2gb anyway!
if (this.#prefixSize > 4) {
if (this.#state.size > 4) {
done(new Error('Delimiter encoding overflow'));
return;
}
if (isLast) {
this.#state = State.Data;
this.#state = {
kind: 'frame',
remaining: this.#state.value,
parts: [],
};
}
} else if (this.#state === State.Data) {
const toTake = Math.min(this.#prefixValue, chunk.length - offset);
} else if (
this.#state.kind === 'frame' ||
this.#state.kind === 'trailer'
) {
const toTake = Math.min(this.#state.remaining, chunk.length - offset);
const part = chunk.slice(offset, offset + toTake);
offset += toTake;
this.#prefixValue -= toTake;
this.#state.remaining -= toTake;
this.#parts.push(part);
this.#state.parts.push(part);
if (this.#prefixValue <= 0) {
this.#state = State.Prefix;
this.#prefixSize = 0;
this.#prefixValue = 0;
if (this.#state.remaining > 0) {
continue;
}
const whole = Buffer.concat(this.#parts);
this.#parts = [];
this.push(whole);
if (this.#state.kind === 'frame') {
const frame = Buffer.concat(this.#state.parts);
const trailerSize = this.getTrailerSize(frame);
if (trailerSize === 0) {
this.#state = {
kind: 'prefix',
size: 0,
value: 0,
};
// eslint-disable-next-line no-await-in-loop
await this.pushFrame(frame, EMPTY_TRAILER);
} else {
this.#state = {
kind: 'trailer',
frame,
remaining: trailerSize,
parts: [],
};
}
} else if (this.#state.kind === 'trailer') {
const oldState = this.#state;
const trailer = Buffer.concat(this.#state.parts);
this.#state = {
kind: 'prefix',
size: 0,
value: 0,
};
// eslint-disable-next-line no-await-in-loop
await this.pushFrame(oldState.frame, trailer);
} else {
throw missingCaseError(this.#state);
}
} else {
throw missingCaseError(this.#state);
@@ -76,16 +126,31 @@ export class DelimitedStream extends Transform {
}
override _flush(done: (error?: Error) => void): void {
if (this.#state !== State.Prefix) {
done(new Error('Unfinished data'));
if (this.#state.kind === 'frame') {
done(new Error('Unfinished frame'));
return;
}
if (this.#state.kind === 'trailer') {
done(new Error('Unfinished trailer'));
return;
}
if (this.#state.kind !== 'prefix') {
throw missingCaseError(this.#state);
}
if (this.#prefixSize !== 0) {
if (this.#state.size !== 0) {
done(new Error('Unfinished prefix'));
return;
}
done();
}
protected getTrailerSize(_frame: Buffer): number {
return 0;
}
protected async pushFrame(frame: Buffer, _trailer: Buffer): Promise<void> {
this.push(frame);
}
}