From faf0b630c1f3dc6a21eb66a2709ff036c8b56f20 Mon Sep 17 00:00:00 2001 From: Greyson Parrelli Date: Fri, 30 May 2025 13:25:47 -0400 Subject: [PATCH] Move profile fetches to ProfileApi. --- .../securesms/dependencies/AppDependencies.kt | 2 +- .../ApplicationDependencyProvider.java | 7 +- .../dependencies/NetworkDependenciesModule.kt | 2 +- .../securesms/jobs/RetrieveProfileJob.kt | 136 ++++++++--------- .../MockApplicationDependencyProvider.kt | 2 +- .../signalservice/api/profiles/ProfileApi.kt | 118 ++++++++++++++- .../api/profiles/ProfileRepository.kt | 139 ++++++++++++++++++ 7 files changed, 324 insertions(+), 82 deletions(-) create mode 100644 libsignal-service/src/main/java/org/whispersystems/signalservice/api/profiles/ProfileRepository.kt diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/AppDependencies.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/AppDependencies.kt index 6fdc716ab4..03d5b5bc6a 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/AppDependencies.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/AppDependencies.kt @@ -445,7 +445,7 @@ object AppDependencies { fun provideMessageApi(authWebSocket: SignalWebSocket.AuthenticatedWebSocket, unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket): MessageApi fun provideProvisioningApi(authWebSocket: SignalWebSocket.AuthenticatedWebSocket, unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket): ProvisioningApi fun provideCertificateApi(authWebSocket: SignalWebSocket.AuthenticatedWebSocket): CertificateApi - fun provideProfileApi(authWebSocket: SignalWebSocket.AuthenticatedWebSocket, pushServiceSocket: PushServiceSocket): ProfileApi + fun provideProfileApi(authWebSocket: SignalWebSocket.AuthenticatedWebSocket, unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket, pushServiceSocket: PushServiceSocket, clientZkProfileOperations: ClientZkProfileOperations): ProfileApi fun provideRemoteConfigApi(authWebSocket: SignalWebSocket.AuthenticatedWebSocket): RemoteConfigApi fun provideDonationsApi(authWebSocket: SignalWebSocket.AuthenticatedWebSocket, unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket): DonationsApi } diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java index b210760c0b..0339967a06 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/ApplicationDependencyProvider.java @@ -8,7 +8,6 @@ import android.os.HandlerThread; import androidx.annotation.NonNull; import androidx.annotation.VisibleForTesting; -import org.jetbrains.annotations.NotNull; import org.signal.billing.BillingFactory; import org.signal.core.util.ThreadUtil; import org.signal.core.util.billing.BillingApi; @@ -549,7 +548,7 @@ public class ApplicationDependencyProvider implements AppDependencies.Provider { } @Override - public @NonNull ProvisioningApi provideProvisioningApi(@NonNull SignalWebSocket.AuthenticatedWebSocket authWebSocket, SignalWebSocket.@NotNull UnauthenticatedWebSocket unauthWebSocket) { + public @NonNull ProvisioningApi provideProvisioningApi(@NonNull SignalWebSocket.AuthenticatedWebSocket authWebSocket, @NonNull SignalWebSocket.UnauthenticatedWebSocket unauthWebSocket) { return new ProvisioningApi(authWebSocket, unauthWebSocket); } @@ -559,8 +558,8 @@ public class ApplicationDependencyProvider implements AppDependencies.Provider { } @Override - public @NonNull ProfileApi provideProfileApi(@NonNull SignalWebSocket.AuthenticatedWebSocket authWebSocket, @NonNull PushServiceSocket pushServiceSocket) { - return new ProfileApi(authWebSocket, pushServiceSocket); + public @NonNull ProfileApi provideProfileApi(@NonNull SignalWebSocket.AuthenticatedWebSocket authWebSocket, @NonNull SignalWebSocket.UnauthenticatedWebSocket unauthWebSocket, @NonNull PushServiceSocket pushServiceSocket, @NonNull ClientZkProfileOperations clientZkProfileOperations) { + return new ProfileApi(authWebSocket, unauthWebSocket, pushServiceSocket, clientZkProfileOperations); } @Override diff --git a/app/src/main/java/org/thoughtcrime/securesms/dependencies/NetworkDependenciesModule.kt b/app/src/main/java/org/thoughtcrime/securesms/dependencies/NetworkDependenciesModule.kt index 8753e831c4..74cff95d79 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/dependencies/NetworkDependenciesModule.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/dependencies/NetworkDependenciesModule.kt @@ -207,7 +207,7 @@ class NetworkDependenciesModule( } val profileApi: ProfileApi by lazy { - provider.provideProfileApi(authWebSocket, pushServiceSocket) + provider.provideProfileApi(authWebSocket, unauthWebSocket, pushServiceSocket, groupsV2Operations.profileOperations) } val remoteConfigApi: RemoteConfigApi by lazy { diff --git a/app/src/main/java/org/thoughtcrime/securesms/jobs/RetrieveProfileJob.kt b/app/src/main/java/org/thoughtcrime/securesms/jobs/RetrieveProfileJob.kt index ccb6511cf4..41a7b18588 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/jobs/RetrieveProfileJob.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/jobs/RetrieveProfileJob.kt @@ -1,20 +1,20 @@ package org.thoughtcrime.securesms.jobs import androidx.annotation.WorkerThread -import io.reactivex.rxjava3.core.Observable -import io.reactivex.rxjava3.schedulers.Schedulers +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext import org.signal.core.util.Base64.decode import org.signal.core.util.Stopwatch import org.signal.core.util.concurrent.SignalExecutors -import org.signal.core.util.concurrent.safeBlockingGet import org.signal.core.util.logging.Log import org.signal.libsignal.protocol.IdentityKey import org.signal.libsignal.protocol.InvalidKeyException -import org.signal.libsignal.protocol.util.Pair import org.signal.libsignal.zkgroup.profiles.ExpiringProfileKeyCredential import org.signal.libsignal.zkgroup.profiles.ProfileKey import org.thoughtcrime.securesms.badges.Badges import org.thoughtcrime.securesms.crypto.ProfileKeyUtil +import org.thoughtcrime.securesms.crypto.SealedSenderAccessUtil import org.thoughtcrime.securesms.database.GroupTable import org.thoughtcrime.securesms.database.RecipientTable import org.thoughtcrime.securesms.database.RecipientTable.Companion.maskCapabilitiesToLong @@ -27,6 +27,7 @@ import org.thoughtcrime.securesms.jobmanager.Job import org.thoughtcrime.securesms.jobmanager.JsonJobData import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint import org.thoughtcrime.securesms.keyvalue.SignalStore +import org.thoughtcrime.securesms.net.SignalNetwork import org.thoughtcrime.securesms.notifications.v2.ConversationId.Companion.forConversation import org.thoughtcrime.securesms.profiles.ProfileName import org.thoughtcrime.securesms.recipients.Recipient @@ -38,13 +39,14 @@ import org.thoughtcrime.securesms.util.ProfileUtil import org.thoughtcrime.securesms.util.Util import org.whispersystems.signalservice.api.crypto.InvalidCiphertextException import org.whispersystems.signalservice.api.crypto.ProfileCipher -import org.whispersystems.signalservice.api.profiles.ProfileAndCredential +import org.whispersystems.signalservice.api.profiles.ProfileRepository +import org.whispersystems.signalservice.api.profiles.ProfileRepository.IdProfilePair +import org.whispersystems.signalservice.api.profiles.ProfileRepository.ProfileFetchRequest +import org.whispersystems.signalservice.api.profiles.ProfileRepository.ProfileFetchResult +import org.whispersystems.signalservice.api.profiles.ProfileRepository.SignalServiceProfileWithCredential import org.whispersystems.signalservice.api.profiles.SignalServiceProfile -import org.whispersystems.signalservice.api.services.ProfileService.ProfileResponseProcessor import org.whispersystems.signalservice.api.util.ExpiringProfileCredentialUtil -import org.whispersystems.signalservice.internal.ServiceResponse import java.io.IOException -import java.util.Optional import java.util.concurrent.TimeUnit import kotlin.time.Duration.Companion.minutes @@ -108,53 +110,45 @@ class RetrieveProfileJob private constructor(parameters: Parameters, private val Log.i(TAG, "Debouncing: Fetching ${recipientsToFetch.size} of ${recipients.size} recipients (${recipients.size - recipientsToFetch.size} were fetched recently)") } - val requests: List>>> = recipientsToFetch - .map { ProfileUtil.retrieveProfile(context, it, getRequestType(it)).toObservable() } + val fetchingRecipientIds = recipientsToFetch.map { it.id }.toSet() + val recipientsById: Map = recipients.associateBy { it.id } + + val requests: List> = recipientsToFetch + .map { recipient -> + ProfileFetchRequest( + id = recipient.id, + serviceId = recipient.requireServiceId(), + profileKey = recipient.profileKey?.let { ProfileKey(it) }, + sealedSenderAccess = SealedSenderAccessUtil.getSealedSenderAccessFor(recipient), + fetchExpiringCredential = !ExpiringProfileCredentialUtil.isValid(recipient.expiringProfileKeyCredential) + ) + } + stopwatch.split("requests") - val fetchingRecipientIds = recipientsToFetch.map { it.id }.toSet() - - val operationState = Observable.mergeDelayError(requests, 16, 1) - .observeOn(Schedulers.io(), true) - .scan(OperationState()) { state: OperationState, pair: Pair> -> - val recipient = pair.first() - val processor = ProfileResponseProcessor(pair.second()) - - if (processor.hasResult()) { - state.profiles.add(processor.getResult(recipient)) - } else if (processor.notFound()) { - Log.w(TAG, "Failed to find a profile for ${recipient.id}") - if (recipient.isRegistered) { - state.unregistered.add(recipient.id) - } - } else if (processor.genericIoError()) { - state.retries.add(recipient.id) - } else { - Log.w(TAG, "Failed to retrieve profile for ${recipient.id}", processor.error) - } - state + val response: ProfileFetchResult = runBlocking { + withContext(Dispatchers.IO) { + ProfileRepository(SignalNetwork.profile).fetchProfiles(requests) } - .lastOrError() - .safeBlockingGet() + } stopwatch.split("responses") val localRecords = SignalDatabase.recipients.getExistingRecords(fetchingRecipientIds) Log.d(TAG, "Fetched ${localRecords.size} existing records.") stopwatch.split("disk-fetch") - val successIds: Set = fetchingRecipientIds - operationState.retries - val newlyRegisteredIds: Set = operationState.profiles - .map { it.first() } + val successIds: Set = recipientIds - response.retryableFailures + val newlyRegisteredIds: Set = response.successes + .mapNotNull { recipientsById[it.id] } .filterNot { it.isRegistered } .map { it.id } .toSet() - val updatedProfiles = operationState.profiles - .filter { recipientProfileAndCredentialPair: Pair -> - val recipientToUpdate = recipientProfileAndCredentialPair.first() - val localRecipientRecord = localRecords[recipientToUpdate.id] ?: return@filter true - val remoteProfile = recipientProfileAndCredentialPair.second().profile - val remoteCredential = recipientProfileAndCredentialPair.second().expiringProfileKeyCredential + val updatedProfiles = response.successes + .filter { idProfilePair: IdProfilePair -> + val recipientToUpdate: Recipient = recipientsById[idProfilePair.id]!! + val localRecipientRecord: RecipientRecord = localRecords[recipientToUpdate.id] ?: return@filter true + val (remoteProfile, remoteCredential) = idProfilePair.profileWithCredential return@filter try { isUpdated(localRecipientRecord, remoteProfile, remoteCredential) @@ -169,11 +163,11 @@ class RetrieveProfileJob private constructor(parameters: Parameters, private val .toList() stopwatch.split("filter") - Log.d(TAG, "Committing updates to " + updatedProfiles.size + " of " + operationState.profiles.size + " retrieved profiles.") - updatedProfiles.chunked(150).forEach { list: List> -> + Log.d(TAG, "Committing updates to " + updatedProfiles.size + " of " + response.successes.size + " retrieved profiles.") + updatedProfiles.chunked(150).forEach { list: List> -> SignalDatabase.runInTransaction { - for (profile in list) { - process(profile.first(), profile.second()) + for (idProfilePair in list) { + process(recipientsById[idProfilePair.id]!!, idProfilePair.profileWithCredential) } } } @@ -186,29 +180,29 @@ class RetrieveProfileJob private constructor(parameters: Parameters, private val Log.i(TAG, "Marking " + newlyRegisteredIds.size + " users as registered.") SignalDatabase.recipients.bulkUpdatedRegisteredStatus(newlyRegisteredIds, emptySet()) } - if (operationState.unregistered.isNotEmpty()) { - Log.i(TAG, "Marking " + operationState.unregistered.size + " users as unregistered.") - for (recipientId in operationState.unregistered) { + if (response.unregistered.isNotEmpty()) { + Log.i(TAG, "Marking ${response.unregistered.size} users as unregistered.") + for (recipientId in response.unregistered) { SignalDatabase.recipients.markUnregistered(recipientId) } } stopwatch.split("registered-update") - for (profile in operationState.profiles) { - setIdentityKey(profile.first(), profile.second().profile.identityKey) + for (idProfilePair in response.successes) { + setIdentityKey(recipientsById[idProfilePair.id]!!, idProfilePair.profileWithCredential.profile.identityKey) } stopwatch.split("identityKeys") - val keyCount = operationState.profiles.map { it.first() }.mapNotNull { it.profileKey }.count() + val keyCount = response.successes.mapNotNull { recipientsById[it.id] }.mapNotNull { it.profileKey }.count() - Log.d(TAG, "Started with ${recipients.size} recipient(s). Found ${operationState.profiles.size} profile(s), and had keys for $keyCount of them. Will retry ${operationState.retries.size}.") + Log.d(TAG, "Started with ${recipients.size} recipient(s). Of those, ${recipientsToFetch.size} were outside the cache period. Found ${response.successes.size} profile(s), and had keys for $keyCount of them. Will retry ${response.retryableFailures.size}.") stopwatch.stop(TAG) recipientIds.clear() - recipientIds.addAll(operationState.retries) + recipientIds.addAll(response.retryableFailures) if (recipientIds.isNotEmpty()) { - throw RetryLaterException() + throw RetryLaterException(response.retryAfter?.inWholeMilliseconds ?: defaultBackoff()) } } @@ -216,10 +210,18 @@ class RetrieveProfileJob private constructor(parameters: Parameters, private val return e is RetryLaterException } + override fun getNextRunAttemptBackoff(pastAttemptCount: Int, exception: Exception): Long { + return if (exception is RetryLaterException) { + exception.backoff + } else { + super.getNextRunAttemptBackoff(pastAttemptCount, exception) + } + } + override fun onFailure() {} @Throws(InvalidCiphertextException::class, IOException::class) - private fun isUpdated(localRecipientRecord: RecipientRecord, remoteProfile: SignalServiceProfile, remoteExpiringProfileKeyCredential: Optional): Boolean { + private fun isUpdated(localRecipientRecord: RecipientRecord, remoteProfile: SignalServiceProfile, remoteExpiringProfileKeyCredential: ExpiringProfileKeyCredential?): Boolean { if (localRecipientRecord.signalProfileAvatar != remoteProfile.avatar) { return true } @@ -256,7 +258,7 @@ class RetrieveProfileJob private constructor(parameters: Parameters, private val return true } - if (remoteExpiringProfileKeyCredential.isPresent && localRecipientRecord.expiringProfileKeyCredential != remoteExpiringProfileKeyCredential.get()) { + if (remoteExpiringProfileKeyCredential != null && localRecipientRecord.expiringProfileKeyCredential != remoteExpiringProfileKeyCredential) { return true } @@ -271,8 +273,8 @@ class RetrieveProfileJob private constructor(parameters: Parameters, private val return false } - private fun process(recipient: Recipient, profileAndCredential: ProfileAndCredential) { - val profile = profileAndCredential.profile + private fun process(recipient: Recipient, profileAndCredential: SignalServiceProfileWithCredential) { + val (profile, expiringCredential) = profileAndCredential val recipientProfileKey = ProfileKeyUtil.profileKeyOrNull(recipient.profileKey) val wroteNewProfileName = setProfileName(recipient, profile.name) @@ -284,8 +286,7 @@ class RetrieveProfileJob private constructor(parameters: Parameters, private val setPhoneNumberSharingMode(recipient, profile.phoneNumberSharing) if (recipientProfileKey != null) { - profileAndCredential.expiringProfileKeyCredential - .ifPresent { profileKeyCredential: ExpiringProfileKeyCredential -> setExpiringProfileKeyCredential(recipient, recipientProfileKey, profileKeyCredential) } + expiringCredential?. let { credential -> setExpiringProfileKeyCredential(recipient, recipientProfileKey, credential) } } if (recipient.hasNonUsernameDisplayName(context) || wroteNewProfileName) { @@ -517,19 +518,6 @@ class RetrieveProfileJob private constructor(parameters: Parameters, private val } } - private fun getRequestType(recipient: Recipient): SignalServiceProfile.RequestType { - return if (ExpiringProfileCredentialUtil.isValid(recipient.expiringProfileKeyCredential)) SignalServiceProfile.RequestType.PROFILE else SignalServiceProfile.RequestType.PROFILE_AND_CREDENTIAL - } - - /** - * Collective state as responses are processed as they come in. - */ - private class OperationState { - val retries: MutableSet = HashSet() - val unregistered: MutableSet = HashSet() - val profiles: MutableList> = ArrayList() - } - class Factory : Job.Factory { override fun create(parameters: Parameters, serializedData: ByteArray?): RetrieveProfileJob { val data = JsonJobData.deserialize(serializedData) diff --git a/app/src/test/java/org/thoughtcrime/securesms/dependencies/MockApplicationDependencyProvider.kt b/app/src/test/java/org/thoughtcrime/securesms/dependencies/MockApplicationDependencyProvider.kt index cec0b8db73..c6bc9e160f 100644 --- a/app/src/test/java/org/thoughtcrime/securesms/dependencies/MockApplicationDependencyProvider.kt +++ b/app/src/test/java/org/thoughtcrime/securesms/dependencies/MockApplicationDependencyProvider.kt @@ -289,7 +289,7 @@ class MockApplicationDependencyProvider : AppDependencies.Provider { return mockk(relaxed = true) } - override fun provideProfileApi(authWebSocket: SignalWebSocket.AuthenticatedWebSocket, pushServiceSocket: PushServiceSocket): ProfileApi { + override fun provideProfileApi(authWebSocket: SignalWebSocket.AuthenticatedWebSocket, unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket, pushServiceSocket: PushServiceSocket, clientZkProfileOperations: ClientZkProfileOperations): ProfileApi { return mockk(relaxed = true) } diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/profiles/ProfileApi.kt b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/profiles/ProfileApi.kt index 85709459d0..da534a1660 100644 --- a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/profiles/ProfileApi.kt +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/profiles/ProfileApi.kt @@ -5,21 +5,31 @@ package org.whispersystems.signalservice.api.profiles +import org.signal.core.util.logging.Log +import org.signal.libsignal.zkgroup.VerificationFailedException +import org.signal.libsignal.zkgroup.profiles.ClientZkProfileOperations +import org.signal.libsignal.zkgroup.profiles.ExpiringProfileKeyCredential import org.signal.libsignal.zkgroup.profiles.ProfileKey +import org.signal.libsignal.zkgroup.profiles.ProfileKeyCredentialRequestContext import org.whispersystems.signalservice.api.NetworkResult import org.whispersystems.signalservice.api.crypto.ProfileCipher import org.whispersystems.signalservice.api.crypto.ProfileCipherOutputStream +import org.whispersystems.signalservice.api.crypto.SealedSenderAccess import org.whispersystems.signalservice.api.push.ServiceId import org.whispersystems.signalservice.api.services.ProfileService import org.whispersystems.signalservice.api.websocket.SignalWebSocket +import org.whispersystems.signalservice.internal.get import org.whispersystems.signalservice.internal.push.PaymentAddress import org.whispersystems.signalservice.internal.push.ProfileAvatarData import org.whispersystems.signalservice.internal.push.ProfileAvatarUploadAttributes import org.whispersystems.signalservice.internal.push.PushServiceSocket import org.whispersystems.signalservice.internal.push.http.ProfileCipherOutputStreamFactory import org.whispersystems.signalservice.internal.put +import org.whispersystems.signalservice.internal.util.Hex import org.whispersystems.signalservice.internal.util.JsonUtil import org.whispersystems.signalservice.internal.websocket.WebSocketRequestMessage +import org.whispersystems.signalservice.internal.websocket.WebsocketResponse +import java.security.SecureRandom /** * Endpoints to interact with profiles. Currently contains only setting profile but will @@ -27,9 +37,15 @@ import org.whispersystems.signalservice.internal.websocket.WebSocketRequestMessa */ class ProfileApi( private val authWebSocket: SignalWebSocket.AuthenticatedWebSocket, - private val pushServiceSocket: PushServiceSocket + private val unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket, + private val pushServiceSocket: PushServiceSocket, + private val clientZkProfileOperations: ClientZkProfileOperations ) { + companion object { + private val TAG = Log.tag(ProfileApi::class) + } + /** * Set/update the user's profile on the service, including uploading an avatar if one is provided. * @@ -89,4 +105,104 @@ class ProfileApi( } } } + + /** + * Retrieve the users profile at the requested version, along with a profile credential. Will use sealed sender if provided, falling back to an authenticated + * request when appropriate. + * + * GET /v1/profile/:aci/:version/:zkProfileRequest + * - 200: Success + * - 404: Recipient is not a registered Signal user + * - 429: Rate-limited + */ + fun getVersionedProfileAndCredential(aci: ServiceId.ACI, profileKey: ProfileKey, sealedSenderAccess: SealedSenderAccess?): NetworkResult> { + val profileVersion = profileKey.getProfileKeyVersion(aci.libSignalAci).serialize() + val profileRequestContext = clientZkProfileOperations.createProfileKeyCredentialRequestContext(SecureRandom(), aci.libSignalAci, profileKey) + val serializedProfileRequest = Hex.toStringCondensed(profileRequestContext.request.serialize()) + + val request = WebSocketRequestMessage.get("/v1/profile/$aci/$profileVersion/$serializedProfileRequest?credentialType=expiringProfileKey") + val converter = ProfileAndCredentialResponseConverter(clientZkProfileOperations, profileRequestContext) + + return if (sealedSenderAccess == null) { + NetworkResult.fromWebSocket(converter) { authWebSocket.request(request) } + } else { + NetworkResult.fromWebSocket(converter) { unauthWebSocket.request(request, sealedSenderAccess) } + .fallback( + predicate = { it is NetworkResult.StatusCodeError && it.code == 401 }, + fallback = { NetworkResult.fromWebSocket(converter) { authWebSocket.request(request) } } + ) + } + } + + /** + * Retrieve the users profile at the requested version. Will use sealed sender if provided, falling back to an authenticated request when appropriate. + * + * GET /v1/profile/:serviceId/:version + * - 200: Success + * - 404: Recipient is not a registered Signal user + * - 429: Rate-limited + */ + fun getVersionedProfile(aci: ServiceId.ACI, profileKey: ProfileKey, sealedSenderAccess: SealedSenderAccess?): NetworkResult { + val profileKeyIdentifier = profileKey.getProfileKeyVersion(aci.libSignalAci) + val profileVersion = profileKeyIdentifier.serialize() + + val request = WebSocketRequestMessage.get("/v1/profile/$aci/$profileVersion") + val converter = NetworkResult.DefaultWebSocketConverter(SignalServiceProfile::class) + + return if (sealedSenderAccess == null) { + NetworkResult.fromWebSocket(converter) { authWebSocket.request(request) } + } else { + NetworkResult.fromWebSocket(converter) { unauthWebSocket.request(request, sealedSenderAccess) } + .fallback( + predicate = { it is NetworkResult.StatusCodeError && it.code == 401 }, + fallback = { NetworkResult.fromWebSocket(converter) { authWebSocket.request(request) } } + ) + } + } + + /** + * Get the user's unversioned profile. Will use sealed sender if provided, falling back to an authenticated request when appropriate. + * + * GET /v1/profile/:serviceId + * - 200: Success + * - 404: Recipient is not a registered Signal user + * - 429: Rate-limited + */ + fun getUnversionedProfile(serviceId: ServiceId, sealedSenderAccess: SealedSenderAccess?): NetworkResult { + val request = WebSocketRequestMessage.get("/v1/profile/$serviceId") + val converter = NetworkResult.DefaultWebSocketConverter(SignalServiceProfile::class) + + return if (sealedSenderAccess == null) { + NetworkResult.fromWebSocket(converter) { authWebSocket.request(request) } + } else { + NetworkResult.fromWebSocket(converter) { unauthWebSocket.request(request, sealedSenderAccess) } + .fallback( + predicate = { it is NetworkResult.StatusCodeError && it.code == 401 }, + fallback = { NetworkResult.fromWebSocket(converter) { authWebSocket.request(request) } } + ) + } + } + + private class ProfileAndCredentialResponseConverter( + private val clientZkProfileOperations: ClientZkProfileOperations, + private val requestContext: ProfileKeyCredentialRequestContext + ) : NetworkResult.WebSocketResponseConverter> { + + override fun convert(response: WebsocketResponse): NetworkResult> { + if (response.status != 200) { + return response.toStatusCodeError() + } + + return try { + response + .toSuccess(SignalServiceProfile::class) + .map { + val credential = clientZkProfileOperations.receiveExpiringProfileKeyCredential(requestContext, it.expiringProfileKeyCredentialResponse) + it to credential + } + } catch (e: VerificationFailedException) { + NetworkResult.ApplicationError(e) + } + } + } } diff --git a/libsignal-service/src/main/java/org/whispersystems/signalservice/api/profiles/ProfileRepository.kt b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/profiles/ProfileRepository.kt new file mode 100644 index 0000000000..0a34cee038 --- /dev/null +++ b/libsignal-service/src/main/java/org/whispersystems/signalservice/api/profiles/ProfileRepository.kt @@ -0,0 +1,139 @@ +/* + * Copyright 2025 Signal Messenger, LLC + * SPDX-License-Identifier: AGPL-3.0-only + */ + +package org.whispersystems.signalservice.api.profiles + +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import org.signal.core.util.logging.Log +import org.signal.libsignal.zkgroup.VerificationFailedException +import org.signal.libsignal.zkgroup.profiles.ExpiringProfileKeyCredential +import org.signal.libsignal.zkgroup.profiles.ProfileKey +import org.whispersystems.signalservice.api.NetworkResult +import org.whispersystems.signalservice.api.crypto.SealedSenderAccess +import org.whispersystems.signalservice.api.push.ServiceId +import org.whispersystems.signalservice.api.push.exceptions.RateLimitException +import kotlin.time.Duration + +/** + * Collection of high-level profile operations. + */ +class ProfileRepository(private val profileApi: ProfileApi) { + + companion object { + private val TAG = Log.tag(ProfileRepository::class) + } + + /** + * Fetches all profiles in parallel, returning an overall result. + * If we hit a rate limit exception or an unexpected runtime exception, execution halts immediately. + */ + suspend fun fetchProfiles(requests: List>): ProfileFetchResult = coroutineScope { + val successes: MutableList> = mutableListOf() + val unregistered: MutableList = mutableListOf() + val retryableFailures: MutableSet = requests.map { it.id }.toMutableSet() + var retryAfter: Duration? = null + + val mutex = Mutex() + + val tasks: List> = requests.map { request -> + async { + val response: NetworkResult = if (request.serviceId is ServiceId.ACI && request.profileKey != null && request.fetchExpiringCredential) { + profileApi + .getVersionedProfileAndCredential(request.serviceId, request.profileKey, request.sealedSenderAccess) + .map { SignalServiceProfileWithCredential(it.first, it.second) } // to handle nullability conversion + } else if (request.serviceId is ServiceId.ACI && request.profileKey != null) { + profileApi + .getVersionedProfile(request.serviceId, request.profileKey, request.sealedSenderAccess) + .map { SignalServiceProfileWithCredential(it, null) } + } else { + profileApi + .getUnversionedProfile(request.serviceId, request.sealedSenderAccess) + .map { SignalServiceProfileWithCredential(it, null) } + } + + when (response) { + is NetworkResult.Success -> mutex.withLock { + successes += IdProfilePair(request.id, response.result) + retryableFailures -= request.id + } + is NetworkResult.StatusCodeError -> { + when (response.code) { + 404 -> mutex.withLock { + unregistered += request.id + retryableFailures -= request.id + } + 429 -> { + mutex.withLock { + retryAfter = response.retryAfter() + } + throw RateLimitException(response.code, "Hit rate limit exception! Stopping immediately. Retry-After: ${response.retryAfter()}") + } + else -> { + throw response.exception + } + } + } + is NetworkResult.NetworkError -> Unit + is NetworkResult.ApplicationError -> { + mutex.withLock { + retryableFailures -= request.id + } + if (response.throwable is VerificationFailedException) { + Log.w(TAG, "Failed to verify ZK profile operation for ${request.id}. Continuing with other lookups.") + } else { + throw response.throwable + } + } + } + } + } + + try { + tasks.awaitAll() + } catch (e: Exception) { + Log.w(TAG, "Hit an exception that caused us to end early.", e) + } + + return@coroutineScope ProfileFetchResult( + successes = successes, + unregistered = unregistered.toSet(), + retryableFailures = retryableFailures, + retryAfter = retryAfter + ) + } + + data class ProfileFetchResult( + val successes: List>, + val unregistered: Set, + val retryableFailures: Set, + val retryAfter: Duration? + ) + + /** + * @param id A user-defined identifier that will be used to identify entities in the result. + */ + data class ProfileFetchRequest( + val id: Id, + val serviceId: ServiceId, + val profileKey: ProfileKey?, + val sealedSenderAccess: SealedSenderAccess?, + val fetchExpiringCredential: Boolean + ) + + data class IdProfilePair( + val id: Id, + val profileWithCredential: SignalServiceProfileWithCredential + ) + + data class SignalServiceProfileWithCredential( + val profile: SignalServiceProfile, + val credential: ExpiringProfileKeyCredential? + ) +}