mirror of
https://github.com/signalapp/Signal-Desktop.git
synced 2026-04-02 08:13:37 +01:00
161 lines
4.1 KiB
TypeScript
161 lines
4.1 KiB
TypeScript
// Copyright 2023 Signal Messenger, LLC
|
|
// SPDX-License-Identifier: AGPL-3.0-only
|
|
|
|
import { Transform } from 'node:stream';
|
|
|
|
import { missingCaseError } from './missingCaseError.std.js';
|
|
|
|
type State =
|
|
| {
|
|
kind: 'prefix';
|
|
size: number;
|
|
value: number;
|
|
}
|
|
| {
|
|
kind: 'frame';
|
|
remaining: number;
|
|
parts: Array<Buffer<ArrayBuffer>>;
|
|
}
|
|
| {
|
|
kind: 'trailer';
|
|
frame: Buffer<ArrayBuffer>;
|
|
remaining: number;
|
|
parts: Array<Buffer<ArrayBuffer>>;
|
|
};
|
|
|
|
const EMPTY_TRAILER = Buffer.alloc(0);
|
|
|
|
export class DelimitedStream extends Transform {
|
|
#state: State = { kind: 'prefix', size: 0, value: 0 };
|
|
|
|
constructor() {
|
|
super({ readableObjectMode: true });
|
|
}
|
|
|
|
override async _transform(
|
|
chunk: Buffer<ArrayBuffer>,
|
|
_encoding: BufferEncoding,
|
|
done: (error?: Error) => void
|
|
): Promise<void> {
|
|
let offset = 0;
|
|
while (offset < chunk.length) {
|
|
if (this.#state.kind === 'prefix') {
|
|
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
|
|
const b = chunk[offset]!;
|
|
offset += 1;
|
|
|
|
// See: https://protobuf.dev/programming-guides/encoding/
|
|
// eslint-disable-next-line no-bitwise
|
|
const isLast = (b & 0x80) === 0;
|
|
// eslint-disable-next-line no-bitwise
|
|
const value = b & 0x7f;
|
|
|
|
// eslint-disable-next-line no-bitwise
|
|
this.#state.value |= value << (7 * this.#state.size);
|
|
this.#state.size += 1;
|
|
|
|
// Check that we didn't go over 32bits. Node.js buffers can never
|
|
// be larger than 2gb anyway!
|
|
if (this.#state.size > 4) {
|
|
done(new Error('Delimiter encoding overflow'));
|
|
return;
|
|
}
|
|
|
|
if (isLast) {
|
|
this.#state = {
|
|
kind: 'frame',
|
|
remaining: this.#state.value,
|
|
parts: [],
|
|
};
|
|
}
|
|
} else if (
|
|
this.#state.kind === 'frame' ||
|
|
this.#state.kind === 'trailer'
|
|
) {
|
|
const toTake = Math.min(this.#state.remaining, chunk.length - offset);
|
|
const part = chunk.slice(offset, offset + toTake);
|
|
offset += toTake;
|
|
this.#state.remaining -= toTake;
|
|
|
|
this.#state.parts.push(part);
|
|
|
|
if (this.#state.remaining > 0) {
|
|
continue;
|
|
}
|
|
|
|
if (this.#state.kind === 'frame') {
|
|
const frame = Buffer.concat(this.#state.parts);
|
|
const trailerSize = this.getTrailerSize(frame);
|
|
|
|
if (trailerSize === 0) {
|
|
this.#state = {
|
|
kind: 'prefix',
|
|
size: 0,
|
|
value: 0,
|
|
};
|
|
|
|
// eslint-disable-next-line no-await-in-loop
|
|
await this.pushFrame(frame, EMPTY_TRAILER);
|
|
} else {
|
|
this.#state = {
|
|
kind: 'trailer',
|
|
frame,
|
|
remaining: trailerSize,
|
|
parts: [],
|
|
};
|
|
}
|
|
} else if (this.#state.kind === 'trailer') {
|
|
const oldState = this.#state;
|
|
const trailer = Buffer.concat(this.#state.parts);
|
|
|
|
this.#state = {
|
|
kind: 'prefix',
|
|
size: 0,
|
|
value: 0,
|
|
};
|
|
|
|
// eslint-disable-next-line no-await-in-loop
|
|
await this.pushFrame(oldState.frame, trailer);
|
|
} else {
|
|
throw missingCaseError(this.#state);
|
|
}
|
|
} else {
|
|
throw missingCaseError(this.#state);
|
|
}
|
|
}
|
|
done();
|
|
}
|
|
|
|
override _flush(done: (error?: Error) => void): void {
|
|
if (this.#state.kind === 'frame') {
|
|
done(new Error('Unfinished frame'));
|
|
return;
|
|
}
|
|
if (this.#state.kind === 'trailer') {
|
|
done(new Error('Unfinished trailer'));
|
|
return;
|
|
}
|
|
if (this.#state.kind !== 'prefix') {
|
|
throw missingCaseError(this.#state);
|
|
}
|
|
|
|
if (this.#state.size !== 0) {
|
|
done(new Error('Unfinished prefix'));
|
|
return;
|
|
}
|
|
|
|
done();
|
|
}
|
|
|
|
protected getTrailerSize(_frame: Buffer<ArrayBuffer>): number {
|
|
return 0;
|
|
}
|
|
|
|
protected async pushFrame(
|
|
frame: Buffer<ArrayBuffer>,
|
|
_trailer: Buffer<ArrayBuffer>
|
|
): Promise<void> {
|
|
this.push(frame);
|
|
}
|
|
}
|