diff --git a/app/attachment_channel.ts b/app/attachment_channel.ts index 104dacfd83..5be5093e65 100644 --- a/app/attachment_channel.ts +++ b/app/attachment_channel.ts @@ -4,7 +4,7 @@ import { ipcMain, protocol } from 'electron'; import { createReadStream } from 'node:fs'; import { join, normalize } from 'node:path'; -import { Readable, PassThrough, type Writable } from 'node:stream'; +import { PassThrough, type Writable } from 'node:stream'; import { pipeline } from 'node:stream/promises'; import { randomBytes } from 'node:crypto'; import { once } from 'node:events'; @@ -41,6 +41,7 @@ import { SECOND } from '../ts/util/durations'; import { drop } from '../ts/util/drop'; import { strictAssert } from '../ts/util/assert'; import { ValidatingPassThrough } from '../ts/util/ValidatingPassThrough'; +import { toWebStream } from '../ts/util/toWebStream'; import { decryptAttachmentV2ToSink } from '../ts/AttachmentCrypto'; let initialized = false; @@ -514,7 +515,7 @@ function handleRangeRequest({ const create200Response = (): Response => { const plaintext = rangeFinder.get(0, context); - return new Response(Readable.toWeb(plaintext) as ReadableStream, { + return new Response(toWebStream(plaintext), { status: 200, headers, }); @@ -547,7 +548,7 @@ function handleRangeRequest({ } const stream = rangeFinder.get(start, context); - return new Response(Readable.toWeb(stream) as ReadableStream, { + return new Response(toWebStream(stream), { status: 206, headers, }); diff --git a/ts/test-node/util/toWebStream_test.ts b/ts/test-node/util/toWebStream_test.ts new file mode 100644 index 0000000000..032b2c1c8a --- /dev/null +++ b/ts/test-node/util/toWebStream_test.ts @@ -0,0 +1,95 @@ +// Copyright 2024 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { assert } from 'chai'; +import { Readable } from 'node:stream'; +import { once } from 'node:events'; +import { toWebStream } from '../../util/toWebStream'; + +describe('toWebStream', () => { + it('only reads what it needs', async () => { + const CHUNK_SIZE = 16 * 1024; + let pushed = 0; + const readable = new Readable({ + read() { + pushed += 1; + this.push(Buffer.alloc(CHUNK_SIZE)); + }, + }); + + const reader = toWebStream(readable).getReader(); + const { value, done } = await reader.read(); + + // One to be read, one buffered + assert.strictEqual(pushed, 2); + assert.isFalse(done); + assert.strictEqual(value?.byteLength, 2 * CHUNK_SIZE); + }); + + it('closes controller on end', async () => { + const readable = Readable.from([ + Buffer.from('hello '), + Buffer.from('world'), + ]); + + const reader = toWebStream(readable).getReader(); + { + const { value, done } = await reader.read(); + assert.strictEqual(value?.toString(), 'hello '); + assert.isFalse(done); + } + { + const { value, done } = await reader.read(); + assert.strictEqual(value?.toString(), 'world'); + assert.isFalse(done); + } + { + const { value, done } = await reader.read(); + assert.isUndefined(value); + assert.isTrue(done); + } + }); + + it('handles premature close', async () => { + const readable = new Readable({ + read() { + // no-op + }, + }); + + const reader = toWebStream(readable).getReader(); + readable.destroy(); + await assert.isRejected(reader.read(), 'Premature close'); + }); + + it('handles error close', async () => { + const readable = new Readable({ + read() { + // no-op + }, + }); + + const reader = toWebStream(readable).getReader(); + readable.destroy(new Error('error msg')); + await assert.isRejected(reader.read(), 'error msg'); + }); + + it('can be wrapped and destroyed during data read', async () => { + const readable = new Readable({ + read() { + this.push(Buffer.from('hello')); + }, + }); + + const web = toWebStream(readable); + + // Some sort of mismatch between Node's expectation for ReadStream and + // what TS says ReadStream is in WebAPIs. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const node = Readable.fromWeb(web as any); + node.on('data', () => { + node.destroy(); + }); + await once(node, 'close'); + }); +}); diff --git a/ts/util/toWebStream.ts b/ts/util/toWebStream.ts new file mode 100644 index 0000000000..1b0d8a9746 --- /dev/null +++ b/ts/util/toWebStream.ts @@ -0,0 +1,44 @@ +// Copyright 2024 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { type Readable, finished } from 'node:stream'; +import { once } from 'node:events'; + +// Note: can be removed once https://github.com/nodejs/node/issues/54205 is +// resolved and ported to Electron. +export function toWebStream(readable: Readable): ReadableStream { + let controller: ReadableStreamDefaultController; + + const cleanup = finished(readable, err => { + if (err != null) { + return controller.error(err); + } + + controller.close(); + }); + + return new ReadableStream({ + start(newController) { + controller = newController; + }, + async pull() { + try { + await once(readable, 'readable'); + const chunk = readable.read(); + if (chunk != null) { + controller.enqueue(chunk); + } + } catch (error) { + cleanup(); + controller.error(error); + } + }, + cancel(reason) { + // If we got canceled - don't call controller.close/.error since it will + // throw. + cleanup(); + + readable.destroy(reason ? new Error(reason) : undefined); + }, + }); +}