sendToGroup: Move to libsignal typed API

This commit is contained in:
Scott Nonnenberg
2026-01-14 05:46:09 +10:00
committed by GitHub
parent 078cce6b10
commit a27a87a934
4 changed files with 250 additions and 36 deletions

View File

@@ -31,6 +31,8 @@ const SemverKeys = [
'desktop.pinnedMessages.send.prod',
'desktop.plaintextExport.beta',
'desktop.plaintextExport.prod',
'desktop.retireAccessKeyGroupSend.beta',
'desktop.retireAccessKeyGroupSend.prod',
] as const;
export type SemverKeyType = (typeof SemverKeys)[number];

View File

@@ -15,7 +15,10 @@ import EventListener from 'node:events';
import type { IncomingMessage } from 'node:http';
import { setTimeout as sleep } from 'node:timers/promises';
import type { UnauthUsernamesService } from '@signalapp/libsignal-client/dist/net';
import type {
UnauthMessagesService,
UnauthUsernamesService,
} from '@signalapp/libsignal-client/dist/net';
import { strictAssert } from '../util/assert.std.js';
import { explodePromise } from '../util/explodePromise.std.js';
@@ -431,7 +434,9 @@ export class SocketManager extends EventListener {
}).getResult();
}
public async getUnauthenticatedLibsignalApi(): Promise<UnauthUsernamesService> {
public async getUnauthenticatedLibsignalApi(): Promise<
UnauthUsernamesService & UnauthMessagesService
> {
const resource = await this.#getUnauthenticatedResource();
return resource.libsignalWebsocket;
}

View File

@@ -15,6 +15,7 @@ import { v4 as getGuid } from 'uuid';
import { z } from 'zod';
import type { Readable } from 'node:stream';
import qs from 'node:querystring';
import { LibSignalErrorBase, ErrorCode } from '@signalapp/libsignal-client';
import type {
KEMPublicKey,
PublicKey,
@@ -22,6 +23,7 @@ import type {
Pni,
} from '@signalapp/libsignal-client';
import { AccountAttributes } from '@signalapp/libsignal-client/dist/net.js';
import { GroupSendFullToken } from '@signalapp/libsignal-client/zkgroup.js';
import { assertDev, strictAssert } from '../util/assert.std.js';
import * as durations from '../util/durations/index.std.js';
@@ -58,6 +60,7 @@ import {
serviceIdSchema,
aciSchema,
untaggedPniSchema,
fromServiceIdObject,
} from '../types/ServiceId.std.js';
import type { BackupPresentationHeadersType } from '../types/backups.node.js';
import { HTTPError } from '../types/HTTPError.std.js';
@@ -295,6 +298,9 @@ export const multiRecipient200ResponseSchema = z.object({
export type MultiRecipient200ResponseType = z.infer<
typeof multiRecipient200ResponseSchema
>;
export type SendMultiResponseType = {
uuids404: Array<ServiceIdString>;
};
export const multiRecipient409ResponseSchema = z.array(
z.object({
@@ -642,11 +648,16 @@ async function _retry<R>(
try {
return await f();
} catch (e) {
const httpNoNetwork = e instanceof HTTPError && e.code === -1;
const libsignalNoNetwork =
e instanceof LibSignalErrorBase &&
(e.code === ErrorCode.IoError ||
e.code === ErrorCode.ChatServiceInactive);
if (
e instanceof HTTPError &&
e.code === -1 &&
count < limit &&
!abortSignal?.aborted
!abortSignal?.aborted &&
(httpNoNetwork || libsignalNoNetwork)
) {
return new Promise(resolve => {
setTimeout(() => {
@@ -3583,7 +3594,51 @@ function booleanToString(value: boolean | undefined): string {
return value ? 'true' : 'false';
}
export async function sendWithSenderKey(
export async function sendMulti(
payload: Uint8Array,
groupSendToken: GroupSendToken | null,
timestamp: number,
{
online = false,
urgent = true,
story = false,
}: {
online?: boolean;
story?: boolean;
urgent?: boolean;
}
): Promise<SendMultiResponseType> {
log.info(`send/${timestamp}/<multiple>/sendMulti`);
let auth: 'story' | GroupSendFullToken;
if (story) {
if (groupSendToken?.length) {
log.warn('sendMulti: story=true and groupSendToken was provided');
}
auth = 'story';
} else if (groupSendToken?.length) {
auth = new GroupSendFullToken(groupSendToken);
} else {
throw new Error('sendMulti: missing groupSendToken and story=false');
}
const result = await _retry(async () => {
const chat = await socketManager.getUnauthenticatedLibsignalApi();
return chat.sendMultiRecipientMessage({
payload,
timestamp,
auth,
onlineOnly: online,
urgent,
});
});
return {
uuids404: result.unregisteredIds.map(fromServiceIdObject),
};
}
export async function sendMultiLegacy(
data: Uint8Array,
accessKeys: Uint8Array | null,
groupSendToken: GroupSendToken | null,
@@ -3602,7 +3657,7 @@ export async function sendWithSenderKey(
const urgentParam = `&urgent=${booleanToString(urgent)}`;
const storyParam = `&story=${booleanToString(story)}`;
log.info(`send/${timestamp}/<multiple>/sendWithSenderKey`);
log.info(`send/${timestamp}/<multiple>/sendMultiLegacy`);
const response = await _ajax({
host: 'chatService',
call: 'multiRecipient',
@@ -3626,7 +3681,7 @@ export async function sendWithSenderKey(
}
log.error(
'invalid response from sendWithSenderKey',
'sendMultiLegacy: invalid response from server',
toLogFormat(parseResult.error)
);
return response as MultiRecipient200ResponseType;

View File

@@ -14,6 +14,10 @@ import {
SenderCertificate,
UnidentifiedSenderMessageContent,
} from '@signalapp/libsignal-client';
import type {
MismatchedDevicesError,
RateLimitedError,
} from '@signalapp/libsignal-client';
import {
signalProtocolStore,
GLOBAL_ZONE,
@@ -31,7 +35,7 @@ import * as Errors from '../types/errors.std.js';
import { DataWriter } from '../sql/Client.preload.js';
import { getValue } from '../RemoteConfig.dom.js';
import type { ServiceIdString } from '../types/ServiceId.std.js';
import { ServiceIdKind } from '../types/ServiceId.std.js';
import { fromServiceIdObject, ServiceIdKind } from '../types/ServiceId.std.js';
import * as Bytes from '../Bytes.std.js';
import { isRecord } from './isRecord.std.js';
@@ -70,12 +74,13 @@ import { SEALED_SENDER, ZERO_ACCESS_KEY } from '../types/SealedSender.std.js';
import { HTTPError } from '../types/HTTPError.std.js';
import { parseIntOrThrow } from './parseIntOrThrow.std.js';
import {
sendWithSenderKey,
getKeysForServiceId as doGetKeysForServiceId,
getKeysForServiceIdUnauth as doGetKeysForServiceIdUnauth,
multiRecipient200ResponseSchema,
multiRecipient409ResponseSchema,
multiRecipient410ResponseSchema,
sendMulti,
sendMultiLegacy,
} from '../textsecure/WebAPI.preload.js';
import { SignalService as Proto } from '../protobuf/index.std.js';
@@ -91,6 +96,7 @@ import type { GroupSendToken } from '../types/GroupSendEndorsements.std.js';
import { isAciString } from './isAciString.std.js';
import { safeParseStrict, safeParseUnknown } from './schemas.std.js';
import { itemStorage } from '../textsecure/Storage.preload.js';
import { isFeaturedEnabledNoRedux } from './isFeatureEnabled.dom.js';
const { differenceWith, omit } = lodash;
@@ -301,9 +307,14 @@ export async function sendToGroupViaSenderKey(
urgent,
} = options;
const isAccessKeySendRetired = isFeaturedEnabledNoRedux({
betaKey: 'desktop.retireAccessKeyGroupSend.beta',
prodKey: 'desktop.retireAccessKeyGroupSend.prod',
});
const logId = `sendToGroupViaSenderKey/${sendTarget.idForLogging()}`;
log.info(
`${logId}: Starting ${timestamp}, recursion count ${recursion.count}, reason: ${recursion.reason}...`
`${logId}: Starting ${timestamp}, recursion count ${recursion.count}, reason: ${recursion.reason}, accessKeyRetired: ${isAccessKeySendRetired}...`
);
if (recursion.count > MAX_RECURSION) {
@@ -537,10 +548,16 @@ export async function sendToGroupViaSenderKey(
groupSendToken = groupSendEndorsementState.buildToken(
new Set(senderKeyRecipients)
);
} else {
} else if (!isAccessKeySendRetired) {
accessKeys = getXorOfAccessKeys(devicesForSenderKey, { story });
}
if (isAccessKeySendRetired && !groupSendToken && !story) {
throw new Error(
'sendToGroup: missing groupSendToken and story=false. Failing over.'
);
}
try {
const messageBuffer = await encryptForSenderKey({
contentHint,
@@ -550,35 +567,50 @@ export async function sendToGroupViaSenderKey(
groupId,
});
const result = await sendWithSenderKey(
messageBuffer,
accessKeys,
groupSendToken,
timestamp,
{ online, story, urgent }
);
const parsed = safeParseStrict(multiRecipient200ResponseSchema, result);
if (parsed.success) {
const { uuids404 } = parsed.data;
if (uuids404 && uuids404.length > 0) {
if (isAccessKeySendRetired) {
const result = await sendMulti(messageBuffer, groupSendToken, timestamp, {
online,
story,
urgent,
});
if (result.uuids404.length > 0) {
await waitForAll({
tasks: uuids404.map(
tasks: result.uuids404.map(
serviceId => async () => markServiceIdUnregistered(serviceId)
),
});
}
senderKeyRecipientsWithDevices = omit(
senderKeyRecipientsWithDevices,
uuids404 || []
);
} else {
log.error(
`${logId}: Server returned unexpected 200 response ${JSON.stringify(
parsed.error.flatten()
)}`
const result = await sendMultiLegacy(
messageBuffer,
accessKeys,
groupSendToken,
timestamp,
{ online, story, urgent }
);
const parsed = safeParseStrict(multiRecipient200ResponseSchema, result);
if (parsed.success) {
const { uuids404 } = parsed.data;
if (uuids404 && uuids404.length > 0) {
await waitForAll({
tasks: uuids404.map(
serviceId => async () => markServiceIdUnregistered(serviceId)
),
});
}
senderKeyRecipientsWithDevices = omit(
senderKeyRecipientsWithDevices,
uuids404 || []
);
} else {
log.error(
`${logId}: Server returned unexpected 200 response ${JSON.stringify(
parsed.error.flatten()
)}`
);
}
}
if (shouldSaveProto(sendType)) {
@@ -597,6 +629,126 @@ export async function sendToGroupViaSenderKey(
);
}
} catch (error) {
if (error instanceof LibSignalErrorBase) {
if (error.code === ErrorCode.RequestUnauthorized) {
throw new HTTPError('libsignal threw RequestUnauthorized', {
code: 401,
headers: {},
});
}
if (error.code === ErrorCode.ChatServiceInactive) {
throw new HTTPError('libsignal threw ChatServiceInactive', {
code: -1,
headers: {},
});
}
if (error.code === ErrorCode.IoError) {
throw new HTTPError('libsignal threw IoError', {
code: -1,
headers: {},
});
}
if (error.code === ErrorCode.RateLimitedError) {
const rateLimitedError = error as unknown as RateLimitedError;
const { retryAfterSecs } = rateLimitedError;
throw new HTTPError(
`libsignal threw RateLimitedError with retryAfterSecs=${retryAfterSecs}`,
{
code: 429,
headers: {
'retry-after': retryAfterSecs?.toString(),
},
}
);
}
if (error.code === ErrorCode.MismatchedDevices) {
const mismatchedError = error as unknown as MismatchedDevicesError;
const { entries } = mismatchedError;
const staleDevices: Array<PartialDeviceType> = [];
log.warn(
`${logId}: libsignal threw MismatchedDevices, with ${entries?.length} entries`
);
await waitForAll({
maxConcurrency: 3,
tasks: entries.map(entry => async () => {
const uuid = fromServiceIdObject(entry.account);
const isEmpty =
entry.missingDevices.length === 0 &&
entry.extraDevices.length === 0 &&
entry.staleDevices.length === 0;
if (isEmpty) {
log.warn(
`${logId}/MismatchedDevices: Entry for ${uuid} was empty - fetching all keys`
);
await fetchKeysForServiceId(
uuid,
null,
groupSendEndorsementState
);
}
if (entry.missingDevices.length > 0) {
// Start new sessions; didn't have sessions before
await fetchKeysForServiceId(
uuid,
entry.missingDevices,
groupSendEndorsementState
);
}
// Clear unneeded sessions
await waitForAll({
tasks: entry.extraDevices.map(deviceId => async () => {
await signalProtocolStore.archiveSession(
new QualifiedAddress(ourAci, Address.create(uuid, deviceId))
);
}),
});
await waitForAll({
tasks: entry.staleDevices.map(device => async () => {
// Save all stale devices in one list for updating senderKeyInfo
staleDevices.push({ serviceId: uuid, id: device });
// Clear stale sessions
await signalProtocolStore.archiveSession(
new QualifiedAddress(ourAci, Address.create(uuid, device))
);
}),
});
if (entry.staleDevices.length > 0) {
// Start new sessions; previous session was stale
await fetchKeysForServiceId(
uuid,
entry.staleDevices,
groupSendEndorsementState
);
}
}),
});
// Update sende senderKeyInfo in one update
if (staleDevices.length > 0) {
const toUpdate = sendTarget.getSenderKeyInfo();
if (toUpdate) {
await sendTarget.saveSenderKeyInfo({
...toUpdate,
memberDevices: differenceWith(
toUpdate.memberDevices,
staleDevices,
partialDeviceComparator
),
});
}
}
return startOver('error: mismatched devices');
}
}
if (error.code === UNKNOWN_RECIPIENT) {
onFailedToSendWithEndorsements(error);
throw new UnknownRecipientError();
@@ -974,6 +1126,7 @@ async function handle409Response(
);
if (parsed.success) {
await waitForAll({
maxConcurrency: 10,
tasks: parsed.data.map(item => async () => {
const { uuid, devices } = item;
// Start new sessions with devices we didn't know about before
@@ -998,7 +1151,6 @@ async function handle409Response(
});
}
}),
maxConcurrency: 10,
});
} else {
log.error(
@@ -1356,11 +1508,11 @@ async function fetchKeysForServiceIds(
try {
await waitForAll({
maxConcurrency: 50,
tasks: serviceIds.map(
serviceId => async () =>
fetchKeysForServiceId(serviceId, null, groupSendEndorsementState)
),
maxConcurrency: 50,
});
} catch (error) {
log.error(