Switch provisioning to libsignal

This commit is contained in:
Fedor Indutny
2026-01-30 10:36:41 -08:00
committed by GitHub
parent a59c298aa1
commit 134246fb7d
18 changed files with 151 additions and 1765 deletions

View File

@@ -2,6 +2,8 @@
// SPDX-License-Identifier: AGPL-3.0-only
import pTimeout, { TimeoutError as PTimeoutError } from 'p-timeout';
import type { LibSignalError } from '@signalapp/libsignal-client';
import type { ProvisioningConnection } from '@signalapp/libsignal-client/dist/net/Chat.js';
import { createLogger } from '../logging/log.std.js';
import * as Errors from '../types/errors.std.js';
@@ -25,13 +27,8 @@ import {
import ProvisioningCipher, {
type ProvisionDecryptResult,
} from './ProvisioningCipher.node.js';
import {
type IWebSocketResource,
type IncomingWebSocketRequest,
ServerRequestType,
} from './WebsocketResources.preload.js';
import { ConnectTimeoutError } from './Errors.std.js';
import type { getProvisioningResource } from './WebAPI.preload.js';
import type { getProvisioningConnection } from './WebAPI.preload.js';
const log = createLogger('Provisioner');
@@ -45,7 +42,7 @@ export enum EventKind {
}
export type ProvisionerOptionsType = Readonly<{
server: { getProvisioningResource: typeof getProvisioningResource };
server: { getProvisioningConnection: typeof getProvisioningConnection };
}>;
export type EnvelopeType = ProvisionDecryptResult;
@@ -106,10 +103,12 @@ const QR_CODE_TIMEOUTS = [10 * SECOND, 20 * SECOND, 30 * SECOND, 60 * SECOND];
export class Provisioner {
readonly #subscribers = new Set<SubscriberType>();
readonly #server: { getProvisioningResource: typeof getProvisioningResource };
readonly #server: {
getProvisioningConnection: typeof getProvisioningConnection;
};
readonly #retryBackOff = new BackOff(FIBONACCI_TIMEOUTS);
#sockets: Array<IWebSocketResource> = [];
#sockets: Array<ProvisioningConnection> = [];
#abortController: AbortController | undefined;
#attemptCount = 0;
#isRunning = false;
@@ -326,60 +325,43 @@ export class Provisioner {
const timeoutAt = Date.now() + timeout;
const resource = await this.#server.getProvisioningResource(
const connection = await this.#server.getProvisioningConnection(
{
handleRequest: (request: IncomingWebSocketRequest) => {
const { requestType, body } = request;
if (!body) {
log.warn('connect: no request body');
request.respond(400, 'Missing body');
onReceivedAddress: (address, ack) => {
if (state !== SocketState.WaitingForUuid) {
log.error('onReceivedAddress: duplicate uuid');
drop(connection.disconnect());
return;
}
try {
if (requestType === ServerRequestType.ProvisioningAddress) {
strictAssert(
state === SocketState.WaitingForUuid,
'Provisioner.connect: duplicate uuid'
);
const proto = Proto.ProvisioningAddress.decode(body);
strictAssert(
proto.address,
'Provisioner.connect: expected a UUID'
);
state = SocketState.WaitingForEnvelope;
uuidPromise.resolve(proto.address);
request.respond(200, 'OK');
} else if (requestType === ServerRequestType.ProvisioningMessage) {
strictAssert(
state === SocketState.WaitingForEnvelope,
'Provisioner.connect: duplicate envelope or not ready'
);
const ciphertext = Proto.ProvisionEnvelope.decode(body);
const envelope = cipher.decrypt(ciphertext);
state = SocketState.Done;
this.#notify({
kind: EventKind.Envelope,
envelope,
isLinkAndSync:
isLinkAndSyncEnabled() &&
Bytes.isNotEmpty(envelope.ephemeralBackupKey),
});
} else {
log.warn('connect: unsupported request type', requestType);
request.respond(404, 'Unsupported');
}
} catch (error) {
log.error('connect: error', Errors.toLogFormat(error));
resource.close();
}
state = SocketState.WaitingForEnvelope;
uuidPromise.resolve(address);
ack.send(200);
},
handleDisconnect() {
// No-op
onReceivedEnvelope: (body, ack) => {
if (state !== SocketState.WaitingForEnvelope) {
log.error('onReceivedEnvelope: duplicate envelope or not ready');
drop(connection.disconnect());
return;
}
const ciphertext = Proto.ProvisionEnvelope.decode(body);
const envelope = cipher.decrypt(ciphertext);
state = SocketState.Done;
this.#notify({
kind: EventKind.Envelope,
envelope,
isLinkAndSync:
isLinkAndSyncEnabled() &&
Bytes.isNotEmpty(envelope.ephemeralBackupKey),
});
ack.send(200);
},
onConnectionInterrupted: (cause: LibSignalError | null) => {
signal.removeEventListener('abort', onAbort);
this.#handleClose(connection, state, cause);
},
},
timeout
@@ -392,16 +374,11 @@ export class Provisioner {
// Setup listeners on the socket
const onAbort = () => {
resource.close();
drop(connection.disconnect());
uuidPromise.reject(new Error('aborted'));
};
signal.addEventListener('abort', onAbort);
resource.addEventListener('close', ({ code, reason }) => {
signal.removeEventListener('abort', onAbort);
this.#handleClose(resource, state, code, reason);
});
// But only register it once we get the uuid from server back.
const uuid = await pTimeout(
@@ -420,28 +397,28 @@ export class Provisioner {
this.#notify({ kind: EventKind.URL, url });
this.#sockets.push(resource);
this.#sockets.push(connection);
while (this.#sockets.length > MAX_OPEN_SOCKETS) {
log.info('closing extra socket');
this.#sockets.shift()?.close();
drop(this.#sockets.shift()?.disconnect());
}
}
#handleClose(
resource: IWebSocketResource,
connection: ProvisioningConnection,
state: SocketState,
code: number,
reason: string
cause: LibSignalError | null
): void {
const index = this.#sockets.indexOf(resource);
const reason = cause && Errors.toLogFormat(cause);
const index = this.#sockets.indexOf(connection);
if (index === -1) {
log.info(`ignoring socket closed, code=${code}, reason=${reason}`);
log.info(`ignoring socket closed, reason=${reason}`);
return;
}
const logId = `Provisioner.#handleClose(${index})`;
log.info(`${logId}: closed, code=${code}, reason=${reason}`);
const logId = `Provisioner.#handleClose(${index}): reason=${reason}`;
log.info(`${logId}: closed`);
// Is URL from the socket displayed as a QR code?
const isActive = index === this.#sockets.length - 1;
@@ -460,9 +437,7 @@ export class Provisioner {
state === SocketState.WaitingForUuid
? EventKind.ConnectError
: EventKind.EnvelopeError,
error: new Error(
`Socket ${index} closed, code=${code}, reason=${reason}`
),
error: new Error(`Socket ${index} closed, reason=${reason}`),
});
}
}

View File

@@ -9,13 +9,14 @@ import {
import URL from 'node:url';
import type { RequestInit, Response } from 'node-fetch';
import { Headers } from 'node-fetch';
import type { connection as WebSocket } from 'websocket';
import qs from 'node:querystring';
import EventListener from 'node:events';
import type { IncomingMessage } from 'node:http';
import { setTimeout as sleep } from 'node:timers/promises';
import type { UnauthenticatedChatConnection } from '@signalapp/libsignal-client/dist/net/Chat.js';
import type {
UnauthenticatedChatConnection,
ProvisioningConnection,
ProvisioningConnectionListener,
} from '@signalapp/libsignal-client/dist/net/Chat.js';
import { strictAssert } from '../util/assert.std.js';
import { explodePromise } from '../util/explodePromise.std.js';
@@ -26,8 +27,6 @@ import {
} from '../util/BackOff.std.js';
import * as durations from '../util/durations/index.std.js';
import { drop } from '../util/drop.std.js';
import type { ProxyAgent } from '../util/createProxyAgent.node.js';
import { createProxyAgent } from '../util/createProxyAgent.node.js';
import { type SocketInfo, SocketStatus } from '../types/SocketStatus.std.js';
import { HTTPError } from '../types/HTTPError.std.js';
import * as Errors from '../types/errors.std.js';
@@ -39,17 +38,14 @@ import type {
ChatKind,
IChatConnection,
IncomingWebSocketRequest,
IWebSocketResource,
WebSocketResourceOptions,
} from './WebsocketResources.preload.js';
import WebSocketResource, {
connectAuthenticatedLibsignal,
connectUnauthenticatedLibsignal,
import {
connectAuthenticated,
connectUnauthenticated,
ServerRequestType,
} from './WebsocketResources.preload.js';
import { ConnectTimeoutError } from './Errors.std.js';
import type { IRequestHandler, WebAPICredentials } from './Types.d.ts';
import { connect as connectWebSocket } from './WebSocket.preload.js';
import type { ServerAlert } from '../types/ServerAlert.std.js';
import { getUserLanguages } from '../util/userLanguages.std.js';
@@ -66,13 +62,6 @@ export const AUTHENTICATED_CHANNEL_NAME = 'authenticated';
export const NORMAL_DISCONNECT_CODE = 3000;
export type SocketManagerOptions = Readonly<{
url: string;
certificateAuthority: string;
version: string;
proxyUrl?: string;
}>;
type SocketStatusUpdate = { status: SocketStatus };
export type SocketStatuses = Record<
@@ -84,9 +73,9 @@ export type SocketExpirationReason = 'remote' | 'build';
// This class manages two websocket resources:
//
// - Authenticated IWebSocketResource which uses supplied WebAPICredentials and
// - Authenticated IChatConnection which uses supplied WebAPICredentials and
// automatically reconnects on closed socket (using back off)
// - Unauthenticated IWebSocketResource that is created on the first outgoing
// - Unauthenticated IChatConnection that is created on the first outgoing
// unauthenticated request and is periodically rotated (5 minutes since first
// activity on the socket).
//
@@ -95,7 +84,7 @@ export type SocketExpirationReason = 'remote' | 'build';
// least one such request handler becomes available.
//
// Incoming requests on unauthenticated resource are not currently supported.
// IWebSocketResource is responsible for their immediate termination.
// IChatConnection is responsible for their immediate termination.
export class SocketManager extends EventListener {
#backOff = new BackOff(FIBONACCI_TIMEOUTS, {
jitter: JITTER,
@@ -105,7 +94,6 @@ export class SocketManager extends EventListener {
#unauthenticated?: AbortableProcess<IChatConnection<'unauth'>>;
#unauthenticatedExpirationTimer?: NodeJS.Timeout;
#credentials?: WebAPICredentials;
#lazyProxyAgent?: Promise<ProxyAgent>;
#authenticatedStatus: SocketInfo = {
status: SocketStatus.CLOSED,
};
@@ -121,10 +109,7 @@ export class SocketManager extends EventListener {
#reconnectController: AbortController | undefined;
#envelopeCount = 0;
constructor(
private readonly libsignalNet: Net.Net,
private readonly options: SocketManagerOptions
) {
constructor(private readonly libsignalNet: Net.Net) {
super();
}
@@ -202,7 +187,7 @@ export class SocketManager extends EventListener {
window.SignalContext.getResolvedMessagesLocale()
);
const process = connectAuthenticatedLibsignal({
const process = connectAuthenticated({
libsignalNet: this.libsignalNet,
name: AUTHENTICATED_CHANNEL_NAME,
credentials: this.#credentials,
@@ -367,7 +352,7 @@ export class SocketManager extends EventListener {
}
// Either returns currently connecting/active authenticated
// IWebSocketResource or connects a fresh one.
// IChatConnection or connects a fresh one.
public async getAuthenticatedResource(): Promise<IChatConnection<'auth'>> {
if (!this.#authenticated) {
strictAssert(this.#credentials !== undefined, 'Missing credentials');
@@ -378,11 +363,11 @@ export class SocketManager extends EventListener {
return this.#authenticated.getResult();
}
// Creates new IWebSocketResource for AccountManager's provisioning
public async getProvisioningResource(
handler: IRequestHandler,
timeout?: number
): Promise<IWebSocketResource> {
// Creates new ProvisioningConnection for AccountManager's provisioning
public async getProvisioningConnection(
listener: ProvisioningConnectionListener,
timeout: number
): Promise<ProvisioningConnection> {
if (this.#expirationReason != null) {
throw new Error(
`${this.#expirationReason} expired, ` +
@@ -390,48 +375,22 @@ export class SocketManager extends EventListener {
);
}
return this.#connectResource({
name: 'provisioning',
path: '/v1/websocket/provisioning/',
proxyAgent: await this.#getProxyAgent(),
resourceOptions: {
name: 'provisioning',
handleRequest: (req: IncomingWebSocketRequest): void => {
handler.handleRequest(req);
},
keepalive: { path: '/v1/keepalive/provisioning' },
},
extraHeaders: {
'x-signal-websocket-timeout': 'true',
},
timeout,
}).getResult();
const abortController = new AbortController();
const timer = setTimeout(() => {
abortController.abort();
}, timeout);
try {
return await this.libsignalNet.connectProvisioning(listener, {
abortSignal: abortController.signal,
});
} finally {
clearTimeout(timer);
}
}
// Creates new WebSocket for Art Creator provisioning
public async connectExternalSocket({
url,
extraHeaders,
}: {
url: string;
extraHeaders?: Record<string, string>;
}): Promise<WebSocket> {
const proxyAgent = await this.#getProxyAgent();
return connectWebSocket({
name: 'art-creator-provisioning',
url,
version: this.options.version,
proxyAgent,
extraHeaders,
createResource(socket: WebSocket): WebSocket {
return socket;
},
}).getResult();
}
public async getUnauthenticatedLibsignalApi(): Promise<UnauthenticatedChatConnection> {
public async getUnauthenticatedApi(): Promise<UnauthenticatedChatConnection> {
const resource = await this.#getUnauthenticatedResource();
return resource.libsignalWebsocket;
}
@@ -652,7 +611,7 @@ export class SocketManager extends EventListener {
);
const process: AbortableProcess<IChatConnection<'unauth'>> =
connectUnauthenticatedLibsignal({
connectUnauthenticated({
libsignalNet: this.libsignalNet,
name: UNAUTHENTICATED_CHANNEL_NAME,
userLanguages,
@@ -728,58 +687,6 @@ export class SocketManager extends EventListener {
return this.#unauthenticated.getResult();
}
#connectResource({
name,
path,
proxyAgent,
resourceOptions,
query = {},
extraHeaders = {},
onUpgradeResponse,
timeout,
}: {
name: string;
path: string;
proxyAgent: ProxyAgent | undefined;
resourceOptions: WebSocketResourceOptions;
query?: Record<string, string>;
extraHeaders?: Record<string, string>;
onUpgradeResponse?: (response: IncomingMessage) => void;
timeout?: number;
}): AbortableProcess<IWebSocketResource> {
const queryWithDefaults = {
agent: 'OWD',
version: this.options.version,
...query,
};
const url = `${this.options.url}${path}?${qs.encode(queryWithDefaults)}`;
const { version } = this.options;
const start = performance.now();
const webSocketResourceConnection = connectWebSocket({
name,
url,
version,
certificateAuthority: this.options.certificateAuthority,
proxyAgent,
timeout,
extraHeaders,
onUpgradeResponse,
createResource(socket: WebSocket): WebSocketResource {
const duration = (performance.now() - start).toFixed(1);
log.info(
`WebSocketResource(${resourceOptions.name}) connected in ${duration}ms`
);
return new WebSocketResource(socket, resourceOptions);
},
});
return webSocketResourceConnection;
}
async #checkResource<Chat extends ChatKind>(
process?: AbortableProcess<IChatConnection<Chat>>
): Promise<void> {
@@ -928,14 +835,6 @@ export class SocketManager extends EventListener {
);
}
async #getProxyAgent(): Promise<ProxyAgent | undefined> {
if (this.options.proxyUrl && !this.#lazyProxyAgent) {
// Cache the promise so that we don't import concurrently.
this.#lazyProxyAgent = createProxyAgent(this.options.proxyUrl);
}
return this.#lazyProxyAgent;
}
// EventEmitter types
public override on(type: 'authError', callback: () => void): this;

View File

@@ -23,6 +23,10 @@ import type {
Pni,
} from '@signalapp/libsignal-client';
import { AccountAttributes } from '@signalapp/libsignal-client/dist/net.js';
import type {
ProvisioningConnection,
ProvisioningConnectionListener,
} from '@signalapp/libsignal-client/dist/net.js';
import { GroupSendFullToken } from '@signalapp/libsignal-client/zkgroup.js';
import type {
Request as KTRequest,
@@ -93,7 +97,6 @@ import { createLogger } from '../logging/log.std.js';
import { maybeParseUrl, urlPathFromComponents } from '../util/url.std.js';
import { HOUR, MINUTE, SECOND } from '../util/durations/index.std.js';
import { safeParseNumber } from '../util/numbers.std.js';
import type { IWebSocketResource } from './WebsocketResources.preload.js';
import { getLibsignalNet } from './preconnect.preload.js';
import type { GroupSendToken } from '../types/GroupSendEndorsements.std.js';
import {
@@ -1738,12 +1741,7 @@ const PARSE_RANGE_HEADER = /\/(\d+)$/;
const PARSE_GROUP_LOG_RANGE_HEADER =
/^versions\s+(\d{1,10})-(\d{1,10})\/(\d{1,10})/;
const socketManager = new SocketManager(libsignalNet, {
url: chatServiceUrl,
certificateAuthority,
version,
proxyUrl,
});
const socketManager = new SocketManager(libsignalNet);
socketManager.on('statusChange', () => {
window.Whisper.events.emit('socketStatusChange');
@@ -2478,7 +2476,7 @@ export async function getAccountForUsername({
hash,
}: GetAccountForUsernameOptionsType): Promise<GetAccountForUsernameResultType> {
const aci = await _retry(async () => {
const chat = await socketManager.getUnauthenticatedLibsignalApi();
const chat = await socketManager.getUnauthenticatedApi();
return chat.lookUpUsernameHash({ hash });
});
@@ -2490,7 +2488,7 @@ export async function keyTransparencySearch(
abortSignal?: AbortSignal
): Promise<void> {
return _retry(async () => {
const chat = await socketManager.getUnauthenticatedLibsignalApi();
const chat = await socketManager.getUnauthenticatedApi();
if (abortSignal?.aborted) {
throw new Error('Aborted');
}
@@ -2506,7 +2504,7 @@ export async function keyTransparencyMonitor(
abortSignal?: AbortSignal
): Promise<void> {
return _retry(async () => {
const chat = await socketManager.getUnauthenticatedLibsignalApi();
const chat = await socketManager.getUnauthenticatedApi();
if (abortSignal?.aborted) {
throw new Error('Aborted');
}
@@ -2731,7 +2729,7 @@ export async function resolveUsernameLink({
uuid,
}: ResolveUsernameByLinkOptionsType): Promise<ResolveUsernameLinkResultType> {
return _retry(async () => {
const chat = await socketManager.getUnauthenticatedLibsignalApi();
const chat = await socketManager.getUnauthenticatedApi();
return chat.lookUpUsernameLink({ uuid, entropy });
});
}
@@ -3679,7 +3677,7 @@ export async function sendMulti(
}
const result = await _retry(async () => {
const chat = await socketManager.getUnauthenticatedLibsignalApi();
const chat = await socketManager.getUnauthenticatedApi();
return chat.sendMultiRecipientMessage({
payload,
timestamp,
@@ -4819,11 +4817,11 @@ export async function getHasSubscription(
return data.subscription.active;
}
export function getProvisioningResource(
handler: IRequestHandler,
timeout?: number
): Promise<IWebSocketResource> {
return socketManager.getProvisioningResource(handler, timeout);
export function getProvisioningConnection(
listener: ProvisioningConnectionListener,
timeout: number
): Promise<ProvisioningConnection> {
return socketManager.getProvisioningConnection(listener, timeout);
}
export async function cdsLookup({

View File

@@ -1,148 +0,0 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import ws from 'websocket';
import type { connection as WebSocket } from 'websocket';
import type { IncomingMessage } from 'node:http';
import { AbortableProcess } from '../util/AbortableProcess.std.js';
import { strictAssert } from '../util/assert.std.js';
import { explodePromise } from '../util/explodePromise.std.js';
import { getUserAgent } from '../util/getUserAgent.node.js';
import * as durations from '../util/durations/index.std.js';
import type { ProxyAgent } from '../util/createProxyAgent.node.js';
import { createHTTPSAgent } from '../util/createHTTPSAgent.node.js';
import { HTTPError } from '../types/HTTPError.std.js';
import { createLogger } from '../logging/log.std.js';
import * as Timers from '../Timers.preload.js';
import { ConnectTimeoutError } from './Errors.std.js';
import { handleStatusCode, translateError } from './Utils.dom.js';
const { client: WebSocketClient } = ws;
const log = createLogger('WebSocket');
const TEN_SECONDS = 10 * durations.SECOND;
const WEBSOCKET_CONNECT_TIMEOUT = TEN_SECONDS;
const KEEPALIVE_INTERVAL_MS = TEN_SECONDS;
export type IResource = {
close(code: number, reason: string): void;
};
export type ConnectOptionsType<Resource extends IResource> = Readonly<{
name: string;
url: string;
certificateAuthority?: string;
version: string;
proxyAgent?: ProxyAgent;
timeout?: number;
extraHeaders?: Record<string, string>;
onUpgradeResponse?: (response: IncomingMessage) => void;
createResource(socket: WebSocket): Resource;
}>;
export function connect<Resource extends IResource>({
name,
url,
certificateAuthority,
version,
proxyAgent,
extraHeaders = {},
timeout = WEBSOCKET_CONNECT_TIMEOUT,
onUpgradeResponse,
createResource,
}: ConnectOptionsType<Resource>): AbortableProcess<Resource> {
const fixedScheme = url
.replace('https://', 'wss://')
.replace('http://', 'ws://');
const headers = {
...extraHeaders,
'User-Agent': getUserAgent(version),
};
const client = new WebSocketClient({
tlsOptions: {
ca: certificateAuthority,
agent: proxyAgent ?? createHTTPSAgent(),
},
maxReceivedFrameSize: 0x210000,
});
client.connect(fixedScheme, undefined, undefined, headers);
const { stack } = new Error();
const { promise, resolve, reject } = explodePromise<Resource>();
const timer = Timers.setTimeout(() => {
reject(new ConnectTimeoutError('Connection timed out'));
client.abort();
}, timeout);
let resource: Resource | undefined;
client.on('connect', socket => {
Timers.clearTimeout(timer);
socket.socket.setKeepAlive(true, KEEPALIVE_INTERVAL_MS);
resource = createResource(socket);
resolve(resource);
});
client.on('upgradeResponse', response => {
onUpgradeResponse?.(response);
});
client.on('httpResponse', async response => {
Timers.clearTimeout(timer);
const statusCode = response.statusCode || -1;
await handleStatusCode(statusCode);
const error = new HTTPError('connectResource: invalid websocket response', {
code: statusCode || -1,
headers: {},
stack,
});
const translatedError = translateError(error);
strictAssert(
translatedError,
'`httpResponse` event cannot be emitted with 200 status code'
);
reject(translatedError);
});
client.on('connectFailed', originalErr => {
Timers.clearTimeout(timer);
const err = new HTTPError('connectResource: connectFailed', {
code: -1,
headers: {},
stack,
cause: originalErr,
});
reject(err);
});
return new AbortableProcess<Resource>(
`WebSocket.connect(${name})`,
{
abort() {
if (resource) {
log.warn(`closing socket ${name}`);
resource.close(3000, 'aborted');
} else {
log.warn(`aborting connection ${name}`);
Timers.clearTimeout(timer);
client.abort();
}
},
},
promise
);
}

View File

@@ -26,16 +26,12 @@
/* eslint-disable @typescript-eslint/no-namespace */
/* eslint-disable @typescript-eslint/brace-style */
import type { connection as WebSocket, IMessage } from 'websocket';
import Long from 'long';
import pTimeout from 'p-timeout';
import { Response } from 'node-fetch';
import net from 'node:net';
import { z } from 'zod';
import type { LibSignalError, Net } from '@signalapp/libsignal-client';
import { ErrorCode } from '@signalapp/libsignal-client';
import { Buffer } from 'node:buffer';
import type {
AuthenticatedChatConnection,
ChatServerMessageAck,
@@ -47,15 +43,11 @@ import type { EventHandler } from './EventTarget.std.js';
import EventTarget from './EventTarget.std.js';
import * as durations from '../util/durations/index.std.js';
import { dropNull } from '../util/dropNull.std.js';
import { drop } from '../util/drop.std.js';
import { isOlderThan } from '../util/timestamp.std.js';
import { strictAssert } from '../util/assert.std.js';
import * as Errors from '../types/errors.std.js';
import { SignalService as Proto } from '../protobuf/index.std.js';
import { createLogger } from '../logging/log.std.js';
import * as Timers from '../Timers.preload.js';
import type { IResource } from './WebSocket.preload.js';
import { AbortableProcess } from '../util/AbortableProcess.std.js';
import type { WebAPICredentials } from './Types.d.ts';
@@ -66,10 +58,6 @@ import type { ServerAlert } from '../types/ServerAlert.std.js';
const log = createLogger('WebsocketResources');
const THIRTY_SECONDS = 30 * durations.SECOND;
const MAX_MESSAGE_SIZE = 512 * 1024;
const AGGREGATED_STATS_KEY = 'websocketStats';
export enum IpVersion {
@@ -166,20 +154,12 @@ export enum ServerRequestType {
Unknown = 'unknown',
}
export type IncomingWebSocketRequest = {
readonly requestType: ServerRequestType;
readonly body: Uint8Array | undefined;
readonly timestamp: number | undefined;
respond(status: number, message: string): void;
};
export class IncomingWebSocketRequestLibsignal implements IncomingWebSocketRequest {
export class IncomingWebSocketRequest {
constructor(
readonly requestType: ServerRequestType,
readonly body: Uint8Array | undefined,
readonly timestamp: number | undefined,
private readonly ack: ChatServerMessageAck | undefined
private readonly ack: Pick<ChatServerMessageAck, 'send'> | undefined
) {}
respond(status: number, _message: string): void {
@@ -187,68 +167,6 @@ export class IncomingWebSocketRequestLibsignal implements IncomingWebSocketReque
}
}
export class IncomingWebSocketRequestLegacy implements IncomingWebSocketRequest {
readonly #id: Long;
public readonly requestType: ServerRequestType;
public readonly body: Uint8Array | undefined;
public readonly timestamp: number | undefined;
constructor(
request: Proto.IWebSocketRequestMessage,
private readonly sendBytes: (bytes: Buffer) => void
) {
strictAssert(request.id, 'request without id');
strictAssert(request.verb, 'request without verb');
strictAssert(request.path, 'request without path');
this.#id = request.id;
this.requestType = resolveType(request.path, request.verb);
this.body = dropNull(request.body);
this.timestamp = resolveTimestamp(request.headers || []);
}
public respond(status: number, message: string): void {
const bytes = Proto.WebSocketMessage.encode({
type: Proto.WebSocketMessage.Type.RESPONSE,
response: { id: this.#id, message, status },
}).finish();
this.sendBytes(Buffer.from(bytes));
}
}
function resolveType(path: string, verb: string): ServerRequestType {
if (path === ServerRequestType.ApiMessage) {
return ServerRequestType.ApiMessage;
}
if (path === ServerRequestType.ApiEmptyQueue && verb === 'PUT') {
return ServerRequestType.ApiEmptyQueue;
}
if (path === ServerRequestType.ProvisioningAddress && verb === 'PUT') {
return ServerRequestType.ProvisioningAddress;
}
if (path === ServerRequestType.ProvisioningMessage && verb === 'PUT') {
return ServerRequestType.ProvisioningMessage;
}
return ServerRequestType.Unknown;
}
function resolveTimestamp(headers: ReadonlyArray<string>): number | undefined {
// The 'X-Signal-Timestamp' is usually the last item, so start there.
let it = headers.length;
// eslint-disable-next-line no-plusplus
while (--it >= 0) {
const match = headers[it].match(/^X-Signal-Timestamp:\s*(\d+)\s*$/i);
if (match && match.length === 2) {
return Number(match[1]);
}
}
return undefined;
}
export type SendRequestOptions = Readonly<{
verb: string;
path: string;
@@ -281,12 +199,12 @@ export class CloseEvent extends Event {
export type ChatKind = 'auth' | 'unauth';
type LibsignalChatConnection<Kind extends ChatKind> = Kind extends 'auth'
type ChatConnection<Kind extends ChatKind> = Kind extends 'auth'
? AuthenticatedChatConnection
: UnauthenticatedChatConnection;
// eslint-disable-next-line no-restricted-syntax
export interface IWebSocketResource extends IResource {
export interface IWebSocketResource {
sendRequest(options: SendRequestOptions): Promise<Response>;
addEventListener(name: 'close', handler: (ev: CloseEvent) => void): void;
@@ -301,16 +219,16 @@ export interface IWebSocketResource extends IResource {
}
export type IChatConnection<Chat extends ChatKind> = IWebSocketResource & {
get libsignalWebsocket(): LibsignalChatConnection<Chat>;
get libsignalWebsocket(): ChatConnection<Chat>;
};
type LibsignalWebSocketResourceHolder<Chat extends ChatKind> = {
resource: LibsignalWebSocketResource<Chat> | undefined;
type WebSocketResourceHandler<Chat extends ChatKind> = {
resource: WebSocketResource<Chat> | undefined;
};
const UNEXPECTED_DISCONNECT_CODE = 3001;
export function connectUnauthenticatedLibsignal({
export function connectUnauthenticated({
libsignalNet,
name,
userLanguages,
@@ -320,9 +238,9 @@ export function connectUnauthenticatedLibsignal({
name: string;
userLanguages: ReadonlyArray<string>;
keepalive: KeepAliveOptionsType;
}): AbortableProcess<LibsignalWebSocketResource<'unauth'>> {
const logId = `LibsignalWebSocketResource(${name})`;
const listener: LibsignalWebSocketResourceHolder<'unauth'> &
}): AbortableProcess<WebSocketResource<'unauth'>> {
const logId = `WebSocketResource(${name})`;
const listener: WebSocketResourceHandler<'unauth'> &
ConnectionEventsListener = {
resource: undefined,
onConnectionInterrupted(cause: LibSignalError | null): void {
@@ -334,7 +252,7 @@ export function connectUnauthenticatedLibsignal({
this.resource = undefined;
},
};
return connectLibsignal(
return connect(
abortSignal =>
libsignalNet.connectUnauthenticatedChat(listener, {
abortSignal,
@@ -346,7 +264,7 @@ export function connectUnauthenticatedLibsignal({
);
}
export function connectAuthenticatedLibsignal({
export function connectAuthenticated({
libsignalNet,
name,
credentials,
@@ -364,10 +282,9 @@ export function connectAuthenticatedLibsignal({
receiveStories: boolean;
userLanguages: ReadonlyArray<string>;
keepalive: KeepAliveOptionsType;
}): AbortableProcess<LibsignalWebSocketResource<'auth'>> {
const logId = `LibsignalWebSocketResource(${name})`;
const listener: LibsignalWebSocketResourceHolder<'auth'> &
ChatServiceListener = {
}): AbortableProcess<WebSocketResource<'auth'>> {
const logId = `WebSocketResource(${name})`;
const listener: WebSocketResourceHandler<'auth'> & ChatServiceListener = {
resource: undefined,
onIncomingMessage(
envelope: Uint8Array,
@@ -375,7 +292,7 @@ export function connectAuthenticatedLibsignal({
ack: ChatServerMessageAck
): void {
// Handle incoming messages even if we've disconnected.
const request = new IncomingWebSocketRequestLibsignal(
const request = new IncomingWebSocketRequest(
ServerRequestType.ApiMessage,
envelope,
timestamp,
@@ -388,7 +305,7 @@ export function connectAuthenticatedLibsignal({
logDisconnectedListenerWarn(logId, 'onQueueEmpty');
return;
}
const request = new IncomingWebSocketRequestLibsignal(
const request = new IncomingWebSocketRequest(
ServerRequestType.ApiEmptyQueue,
undefined,
undefined,
@@ -408,7 +325,7 @@ export function connectAuthenticatedLibsignal({
onReceivedAlerts(alerts.map(parseServerAlertsFromHeader).flat());
},
};
return connectLibsignal(
return connect(
(abortSignal: AbortSignal) =>
libsignalNet.connectAuthenticatedChat(
credentials.username,
@@ -427,21 +344,19 @@ function logDisconnectedListenerWarn(logId: string, method: string): void {
log.warn(`${logId} received ${method}, but listener already disconnected`);
}
function connectLibsignal<Chat extends ChatKind>(
makeConnection: (
abortSignal: AbortSignal
) => Promise<LibsignalChatConnection<Chat>>,
resourceHolder: LibsignalWebSocketResourceHolder<Chat>,
function connect<Chat extends ChatKind>(
makeConnection: (abortSignal: AbortSignal) => Promise<ChatConnection<Chat>>,
resourceHolder: WebSocketResourceHandler<Chat>,
logId: string,
keepalive: KeepAliveOptionsType
): AbortableProcess<LibsignalWebSocketResource<Chat>> {
): AbortableProcess<WebSocketResource<Chat>> {
const abortController = new AbortController();
const connectAsync = async () => {
try {
const service = await makeConnection(abortController.signal);
log.info(`${logId} connected`);
const connectionInfo = service.connectionInfo();
const resource = new LibsignalWebSocketResource(
const resource = new WebSocketResource(
service,
IpVersion[connectionInfo.ipVersion],
connectionInfo.localPort,
@@ -461,7 +376,7 @@ function connectLibsignal<Chat extends ChatKind>(
throw error;
}
};
return new AbortableProcess<LibsignalWebSocketResource<Chat>>(
return new AbortableProcess<WebSocketResource<Chat>>(
`${logId}.connect`,
{
abort() {
@@ -477,7 +392,7 @@ function connectLibsignal<Chat extends ChatKind>(
);
}
export class LibsignalWebSocketResource<Chat extends ChatKind>
export class WebSocketResource<Chat extends ChatKind>
extends EventTarget
implements IChatConnection<Chat>
{
@@ -494,7 +409,7 @@ export class LibsignalWebSocketResource<Chat extends ChatKind>
#keepalive: KeepAlive;
constructor(
private readonly chatService: LibsignalChatConnection<Chat>,
private readonly chatService: ChatConnection<Chat>,
private readonly socketIpVersion: IpVersion,
private readonly localPortNumber: number,
private readonly logId: string,
@@ -583,7 +498,7 @@ export class LibsignalWebSocketResource<Chat extends ChatKind>
return response;
}
get libsignalWebsocket(): LibsignalChatConnection<Chat> {
get libsignalWebsocket(): ChatConnection<Chat> {
return this.chatService;
}
@@ -605,338 +520,6 @@ export class LibsignalWebSocketResource<Chat extends ChatKind>
}
}
export default class WebSocketResource
extends EventTarget
implements IWebSocketResource
{
#outgoingId = Long.fromNumber(1, true);
#closed = false;
readonly #outgoingMap = new Map<
string,
(result: SendRequestResult) => void
>();
readonly #boundOnMessage: (message: IMessage) => void;
#activeRequests = new Set<IncomingWebSocketRequest | string>();
#shuttingDown = false;
#shutdownTimer?: Timers.Timeout;
readonly #logId: string;
readonly #localSocketPort: number | undefined;
readonly #socketIpVersion: IpVersion | undefined;
// Public for tests
public readonly keepalive?: KeepAlive;
constructor(
private readonly socket: WebSocket,
private readonly options: WebSocketResourceOptions
) {
super();
this.#logId = `WebSocketResource(${options.name})`;
this.#localSocketPort = socket.socket.localPort;
if (!socket.socket.localAddress) {
this.#socketIpVersion = undefined;
}
if (socket.socket.localAddress == null) {
this.#socketIpVersion = undefined;
} else if (net.isIPv4(socket.socket.localAddress)) {
this.#socketIpVersion = IpVersion.IPv4;
} else if (net.isIPv6(socket.socket.localAddress)) {
this.#socketIpVersion = IpVersion.IPv6;
} else {
this.#socketIpVersion = undefined;
}
this.#boundOnMessage = this.#onMessage.bind(this);
socket.on('message', this.#boundOnMessage);
if (options.keepalive) {
const keepalive = new KeepAlive(
this,
options.name,
options.keepalive ?? {}
);
this.keepalive = keepalive;
keepalive.reset();
socket.on('close', () => this.keepalive?.stop());
socket.on('error', (error: Error) => {
log.warn(`${this.#logId}: WebSocket error`, Errors.toLogFormat(error));
});
}
socket.on('close', (code, reason) => {
this.#closed = true;
log.warn(`${this.#logId}: Socket closed`);
this.dispatchEvent(new CloseEvent(code, reason || 'normal'));
});
this.addEventListener('close', () => this.#onClose());
}
public ipVersion(): IpVersion | undefined {
return this.#socketIpVersion;
}
public localPort(): number | undefined {
return this.#localSocketPort;
}
public override addEventListener(
name: 'close',
handler: (ev: CloseEvent) => void
): void;
public override addEventListener(name: string, handler: EventHandler): void {
return super.addEventListener(name, handler);
}
public async sendRequest(options: SendRequestOptions): Promise<Response> {
const id = this.#outgoingId;
const idString = id.toString();
strictAssert(
!this.#outgoingMap.has(idString),
'Duplicate outgoing request'
);
// Note that this automatically wraps
this.#outgoingId = this.#outgoingId.add(1);
const bytes = Proto.WebSocketMessage.encode({
type: Proto.WebSocketMessage.Type.REQUEST,
request: {
verb: options.verb,
path: options.path,
body: options.body,
headers: options.headers
? options.headers
.map(([key, value]) => {
return `${key}:${value}`;
})
.slice()
: undefined,
id,
},
}).finish();
strictAssert(
bytes.length <= MAX_MESSAGE_SIZE,
'WebSocket request byte size exceeded'
);
strictAssert(!this.#shuttingDown, 'Cannot send request, shutting down');
this.#addActive(idString);
const promise = new Promise<SendRequestResult>((resolve, reject) => {
let timer = options.timeout
? Timers.setTimeout(() => {
this.#removeActive(idString);
this.close(UNEXPECTED_DISCONNECT_CODE, 'Request timed out');
reject(new Error(`Request timed out; id: [${idString}]`));
}, options.timeout)
: undefined;
this.#outgoingMap.set(idString, result => {
if (timer !== undefined) {
Timers.clearTimeout(timer);
timer = undefined;
}
this.keepalive?.reset();
this.#removeActive(idString);
resolve(result);
});
});
this.socket.sendBytes(Buffer.from(bytes));
const requestResult = await promise;
return WebSocketResource.intoResponse(requestResult);
}
public forceKeepAlive(timeout?: number): void {
if (!this.keepalive) {
return;
}
drop(this.keepalive.send(timeout));
}
public close(code = NORMAL_DISCONNECT_CODE, reason?: string): void {
if (this.#closed) {
log.info(`${this.#logId}.close: Already closed! ${code}/${reason}`);
return;
}
log.info(`${this.#logId}.close(${code})`);
if (this.keepalive) {
this.keepalive.stop();
}
this.socket.close(code, reason);
this.socket.removeListener('message', this.#boundOnMessage);
// On linux the socket can wait a long time to emit its close event if we've
// lost the internet connection. On the order of minutes. This speeds that
// process up.
Timers.setTimeout(() => {
if (this.#closed) {
return;
}
log.warn(`${this.#logId}.close: Dispatching our own socket close event`);
this.dispatchEvent(new CloseEvent(code, reason || 'normal'));
}, 5 * durations.SECOND);
}
public shutdown(): void {
if (this.#closed) {
return;
}
if (this.#activeRequests.size === 0) {
log.info(`${this.#logId}.shutdown: no active requests, closing`);
this.close(NORMAL_DISCONNECT_CODE, 'Shutdown');
return;
}
this.#shuttingDown = true;
log.info(`${this.#logId}.shutdown: shutting down`);
this.#shutdownTimer = Timers.setTimeout(() => {
if (this.#closed) {
return;
}
log.warn(`${this.#logId}.shutdown: Failed to shutdown gracefully`);
this.close(NORMAL_DISCONNECT_CODE, 'Shutdown');
}, THIRTY_SECONDS);
}
#onMessage({ type, binaryData }: IMessage): void {
if (type !== 'binary' || !binaryData) {
throw new Error(`Unsupported websocket message type: ${type}`);
}
const message = Proto.WebSocketMessage.decode(binaryData);
if (
message.type === Proto.WebSocketMessage.Type.REQUEST &&
message.request
) {
const handleRequest =
this.options.handleRequest ||
(request => request.respond(404, 'Not found'));
const incomingRequest = new IncomingWebSocketRequestLegacy(
message.request,
(bytes: Buffer): void => {
this.#removeActive(incomingRequest);
strictAssert(
bytes.length <= MAX_MESSAGE_SIZE,
'WebSocket response byte size exceeded'
);
this.socket.sendBytes(bytes);
}
);
if (this.#shuttingDown) {
incomingRequest.respond(-1, 'Shutting down');
return;
}
this.#addActive(incomingRequest);
handleRequest(incomingRequest);
} else if (
message.type === Proto.WebSocketMessage.Type.RESPONSE &&
message.response
) {
const { response } = message;
strictAssert(response.id, 'response without id');
const responseIdString = response.id.toString();
const resolve = this.#outgoingMap.get(responseIdString);
this.#outgoingMap.delete(responseIdString);
if (!resolve) {
throw new Error(`Received response for unknown request ${response.id}`);
}
resolve({
status: response.status ?? -1,
message: response.message ?? '',
response: dropNull(response.body),
headers: response.headers ?? [],
});
}
}
#onClose(): void {
const outgoing = new Map(this.#outgoingMap);
this.#outgoingMap.clear();
for (const resolve of outgoing.values()) {
resolve({
status: -1,
message: 'Connection closed',
response: undefined,
headers: [],
});
}
}
#addActive(request: IncomingWebSocketRequest | string): void {
this.#activeRequests.add(request);
}
#removeActive(request: IncomingWebSocketRequest | string): void {
if (!this.#activeRequests.has(request)) {
log.warn(`${this.#logId}.removeActive: removing unknown request`);
return;
}
this.#activeRequests.delete(request);
if (this.#activeRequests.size !== 0) {
return;
}
if (!this.#shuttingDown) {
return;
}
if (this.#shutdownTimer) {
Timers.clearTimeout(this.#shutdownTimer);
this.#shutdownTimer = undefined;
}
log.info(`${this.#logId}.removeActive: shutdown complete`);
this.close(NORMAL_DISCONNECT_CODE, 'Shutdown');
}
private static intoResponse(sendRequestResult: SendRequestResult): Response {
const {
status,
message: statusText,
response,
headers: flatResponseHeaders,
} = sendRequestResult;
const headers: Array<[string, string]> = flatResponseHeaders.map(header => {
const [key, value] = header.split(':', 2);
strictAssert(value !== undefined, 'Invalid header!');
return [key, value];
});
return new Response(response, {
status,
statusText,
headers,
});
}
}
export type KeepAliveOptionsType = {
path?: string;
};

View File

@@ -1,99 +0,0 @@
// Copyright 2022 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { Cds2Client } from '@signalapp/libsignal-client';
import { strictAssert } from '../../util/assert.std.js';
import { SignalService as Proto } from '../../protobuf/index.std.js';
import { CDSSocketBase, CDSSocketState } from './CDSSocketBase.node.js';
import type { CDSSocketBaseOptionsType } from './CDSSocketBase.node.js';
export type CDSISocketOptionsType = Readonly<{
mrenclave: Uint8Array;
}> &
CDSSocketBaseOptionsType;
export class CDSISocket extends CDSSocketBase<CDSISocketOptionsType> {
#privCdsClient: Cds2Client | undefined;
public override async handshake(): Promise<void> {
strictAssert(
this.state === CDSSocketState.Open,
'CDSI handshake called twice'
);
this.state = CDSSocketState.Handshake;
{
const { done, value: attestationMessage } =
await this.socketIterator.next();
strictAssert(!done, 'CDSI socket closed before handshake');
const earliestValidTimestamp = new Date();
strictAssert(
this.#privCdsClient === undefined,
'CDSI handshake called twice'
);
this.#privCdsClient = Cds2Client.new(
this.options.mrenclave,
attestationMessage,
earliestValidTimestamp
);
}
this.socket.sendBytes(Buffer.from(this.#cdsClient.initialRequest()));
{
const { done, value: message } = await this.socketIterator.next();
strictAssert(!done, 'CDSI socket expected handshake data');
this.#cdsClient.completeHandshake(message);
}
this.state = CDSSocketState.Established;
}
protected override async sendRequest(
_version: number,
request: Uint8Array
): Promise<void> {
this.socket.sendBytes(
Buffer.from(this.#cdsClient.establishedSend(request))
);
const { done, value: ciphertext } = await this.socketIterator.next();
strictAssert(!done, 'CDSISocket.sendRequest(): expected token message');
const message = await this.decryptResponse(ciphertext);
this.logger.info('CDSISocket.sendRequest(): processing token message');
const { token } = Proto.CDSClientResponse.decode(message);
strictAssert(token, 'CDSISocket.sendRequest(): expected token');
this.socket.sendBytes(
Buffer.from(
this.#cdsClient.establishedSend(
Proto.CDSClientRequest.encode({
tokenAck: true,
}).finish()
)
)
);
}
protected override async decryptResponse(
ciphertext: Uint8Array
): Promise<Uint8Array> {
return this.#cdsClient.establishedRecv(ciphertext);
}
//
// Private
//
get #cdsClient(): Cds2Client {
strictAssert(this.#privCdsClient, 'CDSISocket did not start handshake');
return this.#privCdsClient;
}
}

View File

@@ -1,262 +0,0 @@
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
import { EventEmitter } from 'node:events';
import { Readable } from 'node:stream';
import lodash from 'lodash';
import type { connection as WebSocket } from 'websocket';
import Long from 'long';
import type { LoggerType } from '../../types/Logging.std.js';
import { strictAssert } from '../../util/assert.std.js';
import { isUntaggedPniString, toTaggedPni } from '../../types/ServiceId.std.js';
import { isAciString } from '../../util/isAciString.std.js';
import * as Bytes from '../../Bytes.std.js';
import { UUID_BYTE_SIZE } from '../../types/Crypto.std.js';
import { uuidToBytes, bytesToUuid } from '../../util/uuidToBytes.std.js';
import { SignalService as Proto } from '../../protobuf/index.std.js';
import type {
CDSRequestOptionsType,
CDSResponseEntryType,
CDSResponseType,
} from './Types.d.ts';
import { RateLimitedError } from './RateLimitedError.std.js';
const { noop } = lodash;
export type CDSSocketBaseOptionsType = Readonly<{
logger: LoggerType;
socket: WebSocket;
}>;
export enum CDSSocketState {
Open = 'Open',
Handshake = 'Handshake',
Established = 'Established',
Closed = 'Closed',
}
const MAX_E164_COUNT = 5000;
const E164_BYTE_SIZE = 8;
const TRIPLE_BYTE_SIZE = UUID_BYTE_SIZE * 2 + E164_BYTE_SIZE;
export abstract class CDSSocketBase<
Options extends CDSSocketBaseOptionsType = CDSSocketBaseOptionsType,
> extends EventEmitter {
protected state = CDSSocketState.Open;
protected readonly socket: WebSocket;
protected readonly logger: LoggerType;
protected readonly socketIterator: AsyncIterator<Uint8Array>;
constructor(protected readonly options: Options) {
super();
// For easier access
this.logger = options.logger;
this.socket = options.socket;
this.socketIterator = this.#iterateSocket();
}
public async close(code: number, reason: string): Promise<void> {
return this.socket.close(code, reason);
}
public async request({
e164s,
acisAndAccessKeys,
returnAcisWithoutUaks = false,
}: CDSRequestOptionsType): Promise<CDSResponseType> {
const log = this.logger;
strictAssert(
e164s.length < MAX_E164_COUNT,
'CDSSocket does not support paging. Use this for one-off requests'
);
strictAssert(
this.state === CDSSocketState.Established,
'CDS Connection not established'
);
const version = 2;
const aciUakPairs = acisAndAccessKeys.map(({ aci, accessKey }) =>
Bytes.concatenate([uuidToBytes(aci), Bytes.fromBase64(accessKey)])
);
const request = Proto.CDSClientRequest.encode({
newE164s: Bytes.concatenate(
e164s.map(e164 => {
// Long.fromString handles numbers with or without a leading '+'
return new Uint8Array(Long.fromString(e164).toBytesBE());
})
),
aciUakPairs: Bytes.concatenate(aciUakPairs),
returnAcisWithoutUaks,
}).finish();
log.info(`CDSSocket.request(): sending version=${version} request`);
await this.sendRequest(version, request);
const resultMap: Map<string, CDSResponseEntryType> = new Map();
// eslint-disable-next-line no-constant-condition
while (true) {
// eslint-disable-next-line no-await-in-loop
const { done, value: ciphertext } = await this.socketIterator.next();
if (done) {
this.state = CDSSocketState.Closed;
break;
}
// eslint-disable-next-line no-await-in-loop
const message = await this.decryptResponse(ciphertext);
log.info('CDSSocket.request(): processing response message');
const response = Proto.CDSClientResponse.decode(message);
decodeSingleResponse(resultMap, response);
}
log.info('CDSSocket.request(): done');
return { debugPermitsUsed: 0, entries: resultMap };
}
// Abstract methods
public abstract handshake(): Promise<void>;
protected abstract sendRequest(
version: number,
data: Uint8Array
): Promise<void>;
protected abstract decryptResponse(
ciphertext: Uint8Array
): Promise<Uint8Array>;
// EventEmitter types
public override on(
type: 'close',
callback: (code: number, reason?: string) => void
): this;
public override on(type: 'error', callback: (error: Error) => void): this;
public override on(
type: string | symbol,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
listener: (...args: Array<any>) => void
): this {
return super.on(type, listener);
}
public override emit(type: 'close', code: number, reason?: string): boolean;
public override emit(type: 'error', error: Error): boolean;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
public override emit(type: string | symbol, ...args: Array<any>): boolean {
return super.emit(type, ...args);
}
//
// Private
//
#iterateSocket(): AsyncIterator<Uint8Array> {
const stream = new Readable({ read: noop, objectMode: true });
this.socket.on('message', ({ type, binaryData }) => {
strictAssert(type === 'binary', 'Invalid CDS socket packet');
strictAssert(binaryData, 'Invalid CDS socket packet');
stream.push(binaryData);
});
this.socket.on('close', (code, reason) => {
if (code === 1000) {
stream.push(null);
} else if (code === 4008) {
try {
const payload = JSON.parse(reason);
stream.destroy(new RateLimitedError(payload));
} catch (error) {
stream.destroy(
new Error(
`Socket closed with code ${code} and reason ${reason}, ` +
'but rate limiting response cannot be parsed'
)
);
}
} else {
stream.destroy(
new Error(`Socket closed with code ${code} and reason ${reason}`)
);
}
});
this.socket.on('error', (error: Error) => stream.destroy(error));
return stream[Symbol.asyncIterator]();
}
}
function decodeSingleResponse(
resultMap: Map<string, CDSResponseEntryType>,
response: Proto.CDSClientResponse
): void {
if (!response.e164PniAciTriples) {
return;
}
for (
let i = 0;
i < response.e164PniAciTriples.length;
i += TRIPLE_BYTE_SIZE
) {
const tripleBytes = response.e164PniAciTriples.subarray(
i,
i + TRIPLE_BYTE_SIZE
);
strictAssert(
tripleBytes.length === TRIPLE_BYTE_SIZE,
'Invalid size of CDS response triple'
);
let offset = 0;
const e164Bytes = tripleBytes.subarray(offset, offset + E164_BYTE_SIZE);
offset += E164_BYTE_SIZE;
const pniBytes = tripleBytes.subarray(offset, offset + UUID_BYTE_SIZE);
offset += UUID_BYTE_SIZE;
const aciBytes = tripleBytes.subarray(offset, offset + UUID_BYTE_SIZE);
offset += UUID_BYTE_SIZE;
const e164Long = Long.fromBytesBE(Array.from(e164Bytes));
if (e164Long.isZero()) {
continue;
}
const e164 = `+${e164Long.toString()}`;
const pni = bytesToUuid(pniBytes);
const aci = bytesToUuid(aciBytes);
strictAssert(
aci === undefined || isAciString(aci),
'CDSI response has invalid ACI'
);
strictAssert(
pni === undefined || isUntaggedPniString(pni),
'CDSI response has invalid PNI'
);
resultMap.set(e164, {
pni: pni === undefined ? undefined : toTaggedPni(pni),
aci,
});
}
}