Move profile fetches to ProfileApi.

This commit is contained in:
Greyson Parrelli
2025-05-30 13:25:47 -04:00
committed by Cody Henthorne
parent b2f1867787
commit faf0b630c1
7 changed files with 324 additions and 82 deletions

View File

@@ -445,7 +445,7 @@ object AppDependencies {
fun provideMessageApi(authWebSocket: SignalWebSocket.AuthenticatedWebSocket, unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket): MessageApi
fun provideProvisioningApi(authWebSocket: SignalWebSocket.AuthenticatedWebSocket, unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket): ProvisioningApi
fun provideCertificateApi(authWebSocket: SignalWebSocket.AuthenticatedWebSocket): CertificateApi
fun provideProfileApi(authWebSocket: SignalWebSocket.AuthenticatedWebSocket, pushServiceSocket: PushServiceSocket): ProfileApi
fun provideProfileApi(authWebSocket: SignalWebSocket.AuthenticatedWebSocket, unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket, pushServiceSocket: PushServiceSocket, clientZkProfileOperations: ClientZkProfileOperations): ProfileApi
fun provideRemoteConfigApi(authWebSocket: SignalWebSocket.AuthenticatedWebSocket): RemoteConfigApi
fun provideDonationsApi(authWebSocket: SignalWebSocket.AuthenticatedWebSocket, unauthWebSocket: SignalWebSocket.UnauthenticatedWebSocket): DonationsApi
}

View File

@@ -8,7 +8,6 @@ import android.os.HandlerThread;
import androidx.annotation.NonNull;
import androidx.annotation.VisibleForTesting;
import org.jetbrains.annotations.NotNull;
import org.signal.billing.BillingFactory;
import org.signal.core.util.ThreadUtil;
import org.signal.core.util.billing.BillingApi;
@@ -549,7 +548,7 @@ public class ApplicationDependencyProvider implements AppDependencies.Provider {
}
@Override
public @NonNull ProvisioningApi provideProvisioningApi(@NonNull SignalWebSocket.AuthenticatedWebSocket authWebSocket, SignalWebSocket.@NotNull UnauthenticatedWebSocket unauthWebSocket) {
public @NonNull ProvisioningApi provideProvisioningApi(@NonNull SignalWebSocket.AuthenticatedWebSocket authWebSocket, @NonNull SignalWebSocket.UnauthenticatedWebSocket unauthWebSocket) {
return new ProvisioningApi(authWebSocket, unauthWebSocket);
}
@@ -559,8 +558,8 @@ public class ApplicationDependencyProvider implements AppDependencies.Provider {
}
@Override
public @NonNull ProfileApi provideProfileApi(@NonNull SignalWebSocket.AuthenticatedWebSocket authWebSocket, @NonNull PushServiceSocket pushServiceSocket) {
return new ProfileApi(authWebSocket, pushServiceSocket);
public @NonNull ProfileApi provideProfileApi(@NonNull SignalWebSocket.AuthenticatedWebSocket authWebSocket, @NonNull SignalWebSocket.UnauthenticatedWebSocket unauthWebSocket, @NonNull PushServiceSocket pushServiceSocket, @NonNull ClientZkProfileOperations clientZkProfileOperations) {
return new ProfileApi(authWebSocket, unauthWebSocket, pushServiceSocket, clientZkProfileOperations);
}
@Override

View File

@@ -207,7 +207,7 @@ class NetworkDependenciesModule(
}
val profileApi: ProfileApi by lazy {
provider.provideProfileApi(authWebSocket, pushServiceSocket)
provider.provideProfileApi(authWebSocket, unauthWebSocket, pushServiceSocket, groupsV2Operations.profileOperations)
}
val remoteConfigApi: RemoteConfigApi by lazy {

View File

@@ -1,20 +1,20 @@
package org.thoughtcrime.securesms.jobs
import androidx.annotation.WorkerThread
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.schedulers.Schedulers
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import org.signal.core.util.Base64.decode
import org.signal.core.util.Stopwatch
import org.signal.core.util.concurrent.SignalExecutors
import org.signal.core.util.concurrent.safeBlockingGet
import org.signal.core.util.logging.Log
import org.signal.libsignal.protocol.IdentityKey
import org.signal.libsignal.protocol.InvalidKeyException
import org.signal.libsignal.protocol.util.Pair
import org.signal.libsignal.zkgroup.profiles.ExpiringProfileKeyCredential
import org.signal.libsignal.zkgroup.profiles.ProfileKey
import org.thoughtcrime.securesms.badges.Badges
import org.thoughtcrime.securesms.crypto.ProfileKeyUtil
import org.thoughtcrime.securesms.crypto.SealedSenderAccessUtil
import org.thoughtcrime.securesms.database.GroupTable
import org.thoughtcrime.securesms.database.RecipientTable
import org.thoughtcrime.securesms.database.RecipientTable.Companion.maskCapabilitiesToLong
@@ -27,6 +27,7 @@ import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.JsonJobData
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
import org.thoughtcrime.securesms.keyvalue.SignalStore
import org.thoughtcrime.securesms.net.SignalNetwork
import org.thoughtcrime.securesms.notifications.v2.ConversationId.Companion.forConversation
import org.thoughtcrime.securesms.profiles.ProfileName
import org.thoughtcrime.securesms.recipients.Recipient
@@ -38,13 +39,14 @@ import org.thoughtcrime.securesms.util.ProfileUtil
import org.thoughtcrime.securesms.util.Util
import org.whispersystems.signalservice.api.crypto.InvalidCiphertextException
import org.whispersystems.signalservice.api.crypto.ProfileCipher
import org.whispersystems.signalservice.api.profiles.ProfileAndCredential
import org.whispersystems.signalservice.api.profiles.ProfileRepository
import org.whispersystems.signalservice.api.profiles.ProfileRepository.IdProfilePair
import org.whispersystems.signalservice.api.profiles.ProfileRepository.ProfileFetchRequest
import org.whispersystems.signalservice.api.profiles.ProfileRepository.ProfileFetchResult
import org.whispersystems.signalservice.api.profiles.ProfileRepository.SignalServiceProfileWithCredential
import org.whispersystems.signalservice.api.profiles.SignalServiceProfile
import org.whispersystems.signalservice.api.services.ProfileService.ProfileResponseProcessor
import org.whispersystems.signalservice.api.util.ExpiringProfileCredentialUtil
import org.whispersystems.signalservice.internal.ServiceResponse
import java.io.IOException
import java.util.Optional
import java.util.concurrent.TimeUnit
import kotlin.time.Duration.Companion.minutes
@@ -108,53 +110,45 @@ class RetrieveProfileJob private constructor(parameters: Parameters, private val
Log.i(TAG, "Debouncing: Fetching ${recipientsToFetch.size} of ${recipients.size} recipients (${recipients.size - recipientsToFetch.size} were fetched recently)")
}
val requests: List<Observable<Pair<Recipient, ServiceResponse<ProfileAndCredential>>>> = recipientsToFetch
.map { ProfileUtil.retrieveProfile(context, it, getRequestType(it)).toObservable() }
val fetchingRecipientIds = recipientsToFetch.map { it.id }.toSet()
val recipientsById: Map<RecipientId, Recipient> = recipients.associateBy { it.id }
val requests: List<ProfileFetchRequest<RecipientId>> = recipientsToFetch
.map { recipient ->
ProfileFetchRequest(
id = recipient.id,
serviceId = recipient.requireServiceId(),
profileKey = recipient.profileKey?.let { ProfileKey(it) },
sealedSenderAccess = SealedSenderAccessUtil.getSealedSenderAccessFor(recipient),
fetchExpiringCredential = !ExpiringProfileCredentialUtil.isValid(recipient.expiringProfileKeyCredential)
)
}
stopwatch.split("requests")
val fetchingRecipientIds = recipientsToFetch.map { it.id }.toSet()
val operationState = Observable.mergeDelayError(requests, 16, 1)
.observeOn(Schedulers.io(), true)
.scan(OperationState()) { state: OperationState, pair: Pair<Recipient, ServiceResponse<ProfileAndCredential>> ->
val recipient = pair.first()
val processor = ProfileResponseProcessor(pair.second())
if (processor.hasResult()) {
state.profiles.add(processor.getResult(recipient))
} else if (processor.notFound()) {
Log.w(TAG, "Failed to find a profile for ${recipient.id}")
if (recipient.isRegistered) {
state.unregistered.add(recipient.id)
}
} else if (processor.genericIoError()) {
state.retries.add(recipient.id)
} else {
Log.w(TAG, "Failed to retrieve profile for ${recipient.id}", processor.error)
}
state
val response: ProfileFetchResult<RecipientId> = runBlocking {
withContext(Dispatchers.IO) {
ProfileRepository(SignalNetwork.profile).fetchProfiles(requests)
}
.lastOrError()
.safeBlockingGet()
}
stopwatch.split("responses")
val localRecords = SignalDatabase.recipients.getExistingRecords(fetchingRecipientIds)
Log.d(TAG, "Fetched ${localRecords.size} existing records.")
stopwatch.split("disk-fetch")
val successIds: Set<RecipientId> = fetchingRecipientIds - operationState.retries
val newlyRegisteredIds: Set<RecipientId> = operationState.profiles
.map { it.first() }
val successIds: Set<RecipientId> = recipientIds - response.retryableFailures
val newlyRegisteredIds: Set<RecipientId> = response.successes
.mapNotNull { recipientsById[it.id] }
.filterNot { it.isRegistered }
.map { it.id }
.toSet()
val updatedProfiles = operationState.profiles
.filter { recipientProfileAndCredentialPair: Pair<Recipient, ProfileAndCredential> ->
val recipientToUpdate = recipientProfileAndCredentialPair.first()
val localRecipientRecord = localRecords[recipientToUpdate.id] ?: return@filter true
val remoteProfile = recipientProfileAndCredentialPair.second().profile
val remoteCredential = recipientProfileAndCredentialPair.second().expiringProfileKeyCredential
val updatedProfiles = response.successes
.filter { idProfilePair: IdProfilePair<RecipientId> ->
val recipientToUpdate: Recipient = recipientsById[idProfilePair.id]!!
val localRecipientRecord: RecipientRecord = localRecords[recipientToUpdate.id] ?: return@filter true
val (remoteProfile, remoteCredential) = idProfilePair.profileWithCredential
return@filter try {
isUpdated(localRecipientRecord, remoteProfile, remoteCredential)
@@ -169,11 +163,11 @@ class RetrieveProfileJob private constructor(parameters: Parameters, private val
.toList()
stopwatch.split("filter")
Log.d(TAG, "Committing updates to " + updatedProfiles.size + " of " + operationState.profiles.size + " retrieved profiles.")
updatedProfiles.chunked(150).forEach { list: List<Pair<Recipient, ProfileAndCredential>> ->
Log.d(TAG, "Committing updates to " + updatedProfiles.size + " of " + response.successes.size + " retrieved profiles.")
updatedProfiles.chunked(150).forEach { list: List<IdProfilePair<RecipientId>> ->
SignalDatabase.runInTransaction {
for (profile in list) {
process(profile.first(), profile.second())
for (idProfilePair in list) {
process(recipientsById[idProfilePair.id]!!, idProfilePair.profileWithCredential)
}
}
}
@@ -186,29 +180,29 @@ class RetrieveProfileJob private constructor(parameters: Parameters, private val
Log.i(TAG, "Marking " + newlyRegisteredIds.size + " users as registered.")
SignalDatabase.recipients.bulkUpdatedRegisteredStatus(newlyRegisteredIds, emptySet())
}
if (operationState.unregistered.isNotEmpty()) {
Log.i(TAG, "Marking " + operationState.unregistered.size + " users as unregistered.")
for (recipientId in operationState.unregistered) {
if (response.unregistered.isNotEmpty()) {
Log.i(TAG, "Marking ${response.unregistered.size} users as unregistered.")
for (recipientId in response.unregistered) {
SignalDatabase.recipients.markUnregistered(recipientId)
}
}
stopwatch.split("registered-update")
for (profile in operationState.profiles) {
setIdentityKey(profile.first(), profile.second().profile.identityKey)
for (idProfilePair in response.successes) {
setIdentityKey(recipientsById[idProfilePair.id]!!, idProfilePair.profileWithCredential.profile.identityKey)
}
stopwatch.split("identityKeys")
val keyCount = operationState.profiles.map { it.first() }.mapNotNull { it.profileKey }.count()
val keyCount = response.successes.mapNotNull { recipientsById[it.id] }.mapNotNull { it.profileKey }.count()
Log.d(TAG, "Started with ${recipients.size} recipient(s). Found ${operationState.profiles.size} profile(s), and had keys for $keyCount of them. Will retry ${operationState.retries.size}.")
Log.d(TAG, "Started with ${recipients.size} recipient(s). Of those, ${recipientsToFetch.size} were outside the cache period. Found ${response.successes.size} profile(s), and had keys for $keyCount of them. Will retry ${response.retryableFailures.size}.")
stopwatch.stop(TAG)
recipientIds.clear()
recipientIds.addAll(operationState.retries)
recipientIds.addAll(response.retryableFailures)
if (recipientIds.isNotEmpty()) {
throw RetryLaterException()
throw RetryLaterException(response.retryAfter?.inWholeMilliseconds ?: defaultBackoff())
}
}
@@ -216,10 +210,18 @@ class RetrieveProfileJob private constructor(parameters: Parameters, private val
return e is RetryLaterException
}
override fun getNextRunAttemptBackoff(pastAttemptCount: Int, exception: Exception): Long {
return if (exception is RetryLaterException) {
exception.backoff
} else {
super.getNextRunAttemptBackoff(pastAttemptCount, exception)
}
}
override fun onFailure() {}
@Throws(InvalidCiphertextException::class, IOException::class)
private fun isUpdated(localRecipientRecord: RecipientRecord, remoteProfile: SignalServiceProfile, remoteExpiringProfileKeyCredential: Optional<ExpiringProfileKeyCredential>): Boolean {
private fun isUpdated(localRecipientRecord: RecipientRecord, remoteProfile: SignalServiceProfile, remoteExpiringProfileKeyCredential: ExpiringProfileKeyCredential?): Boolean {
if (localRecipientRecord.signalProfileAvatar != remoteProfile.avatar) {
return true
}
@@ -256,7 +258,7 @@ class RetrieveProfileJob private constructor(parameters: Parameters, private val
return true
}
if (remoteExpiringProfileKeyCredential.isPresent && localRecipientRecord.expiringProfileKeyCredential != remoteExpiringProfileKeyCredential.get()) {
if (remoteExpiringProfileKeyCredential != null && localRecipientRecord.expiringProfileKeyCredential != remoteExpiringProfileKeyCredential) {
return true
}
@@ -271,8 +273,8 @@ class RetrieveProfileJob private constructor(parameters: Parameters, private val
return false
}
private fun process(recipient: Recipient, profileAndCredential: ProfileAndCredential) {
val profile = profileAndCredential.profile
private fun process(recipient: Recipient, profileAndCredential: SignalServiceProfileWithCredential) {
val (profile, expiringCredential) = profileAndCredential
val recipientProfileKey = ProfileKeyUtil.profileKeyOrNull(recipient.profileKey)
val wroteNewProfileName = setProfileName(recipient, profile.name)
@@ -284,8 +286,7 @@ class RetrieveProfileJob private constructor(parameters: Parameters, private val
setPhoneNumberSharingMode(recipient, profile.phoneNumberSharing)
if (recipientProfileKey != null) {
profileAndCredential.expiringProfileKeyCredential
.ifPresent { profileKeyCredential: ExpiringProfileKeyCredential -> setExpiringProfileKeyCredential(recipient, recipientProfileKey, profileKeyCredential) }
expiringCredential?. let { credential -> setExpiringProfileKeyCredential(recipient, recipientProfileKey, credential) }
}
if (recipient.hasNonUsernameDisplayName(context) || wroteNewProfileName) {
@@ -517,19 +518,6 @@ class RetrieveProfileJob private constructor(parameters: Parameters, private val
}
}
private fun getRequestType(recipient: Recipient): SignalServiceProfile.RequestType {
return if (ExpiringProfileCredentialUtil.isValid(recipient.expiringProfileKeyCredential)) SignalServiceProfile.RequestType.PROFILE else SignalServiceProfile.RequestType.PROFILE_AND_CREDENTIAL
}
/**
* Collective state as responses are processed as they come in.
*/
private class OperationState {
val retries: MutableSet<RecipientId> = HashSet()
val unregistered: MutableSet<RecipientId> = HashSet()
val profiles: MutableList<Pair<Recipient, ProfileAndCredential>> = ArrayList()
}
class Factory : Job.Factory<RetrieveProfileJob?> {
override fun create(parameters: Parameters, serializedData: ByteArray?): RetrieveProfileJob {
val data = JsonJobData.deserialize(serializedData)