Improve prekey fetch performance.

This commit is contained in:
Cody Henthorne
2026-05-12 16:50:19 -04:00
committed by Michelle Tang
parent 64cdff4638
commit 04d2b3b0fe
13 changed files with 415 additions and 130 deletions
@@ -147,7 +147,7 @@ class ChangeNumberRepository(
Log.i(TAG, "Submitting prekeys with PNI identity key: ${pniIdentityKeyPair.publicKey.fingerprint}")
retryChangeLocalNumberNetworkOperation {
SignalNetwork.keys.setPreKeys(
SignalNetwork.keys.setPreKeysSync(
PreKeyUpload(
serviceIdType = ServiceIdType.PNI,
signedPreKey = signedPreKey,
@@ -28,6 +28,7 @@ import org.signal.libsignal.zkgroup.profiles.ProfileKey
import org.thoughtcrime.securesms.MainActivity
import org.thoughtcrime.securesms.attachments.Attachment
import org.thoughtcrime.securesms.attachments.UriAttachment
import org.thoughtcrime.securesms.components.SignalProgressDialog
import org.thoughtcrime.securesms.database.AttachmentTable
import org.thoughtcrime.securesms.database.MessageType
import org.thoughtcrime.securesms.database.SignalDatabase
@@ -297,15 +298,38 @@ class InternalConversationSettingsFragment : ComposeFragment(), InternalConversa
}
override fun clearSenderKeyAndArchiveSessions(recipientId: RecipientId) {
clearSenderKey(recipientId)
lifecycleScope.launch {
val dialog = withContext(Dispatchers.Main) {
SignalProgressDialog.show(requireContext(), "Clearing...", cancelable = false, indeterminate = true)
}
val group = SignalDatabase.groups.getGroup(recipientId).orNull()
if (group == null) {
Log.w(TAG, "Couldn't find group for recipientId: $recipientId")
return
withContext(Dispatchers.Default) {
clearSenderKey(recipientId)
val group = SignalDatabase.groups.getGroup(recipientId).orNull()
if (group == null) {
Log.w(TAG, "Couldn't find group for recipientId: $recipientId")
return@withContext
}
group.members.forEach { memberId ->
archiveSessions(memberId)
val member = Recipient.resolved(memberId)
if (member.hasAci) {
AppDependencies.protocolStore.aci().identities().delete(member.requireAci().toString())
}
if (member.hasPni) {
AppDependencies.protocolStore.aci().identities().delete(member.requirePni().toString())
}
}
}
withContext(Dispatchers.Main) {
dialog.dismiss()
}
}
group.members.forEach { archiveSessions(it) }
}
class InternalViewModel(
@@ -212,7 +212,7 @@ fun InternalConversationSettingsScreen(
item {
Rows.TextRow(
text = "Clear sender key and archive sessions",
label = "Resets any sender key state and archives all sessions for group members, will force creating new sessions and re-distributing sender key material.",
label = "Resets any sender key state, archives all sessions, and removes identity keys for group members, will force creating new sessions and re-distributing sender key material.",
onClick = {
dialog = Dialog.CLEAR_SENDER_KEY_AND_ARCHIVE_SESSIONS
}
@@ -49,6 +49,16 @@ class IdentityTable internal constructor(context: Context?, databaseHelper: Sign
companion object {
private val TAG = Log.tag(IdentityTable::class.java)
/**
* When set, [saveIdentity] will skip its per-recipient `markNeedsSync` + `scheduleSyncForDataChange`
* side effects and instead deposit the affected [RecipientId] into the set. The caller is then
* responsible for performing a single bulk follow-up (storage-id rotation, cache invalidate,
* storage-sync schedule).
*/
@JvmField
val SUPPRESS_RECIPIENT_REFRESH: ThreadLocal<MutableSet<RecipientId>> = ThreadLocal()
const val TABLE_NAME = "identities"
private const val ID = "_id"
const val ADDRESS = "address"
@@ -125,8 +135,14 @@ class IdentityTable internal constructor(context: Context?, databaseHelper: Sign
nonBlockingApproval: Boolean
) {
saveIdentityInternal(addressName, recipientId, identityKey, verifiedStatus, firstUse, timestamp, nonBlockingApproval)
recipients.markNeedsSync(recipientId)
StorageSyncHelper.scheduleSyncForDataChange()
val deferred = SUPPRESS_RECIPIENT_REFRESH.get()
if (deferred != null) {
deferred += recipientId
} else {
recipients.markNeedsSync(recipientId)
StorageSyncHelper.scheduleSyncForDataChange()
}
}
fun setApproval(addressName: String, recipientId: RecipientId, nonBlockingApproval: Boolean) {
@@ -10,13 +10,27 @@ import androidx.annotation.VisibleForTesting;
import org.jetbrains.annotations.NotNull;
import org.signal.billing.BillingFactory;
import org.signal.core.models.ServiceId.ACI;
import org.signal.core.models.ServiceId.PNI;
import org.signal.core.util.ThreadUtil;
import org.signal.core.util.billing.BillingApi;
import org.signal.core.util.concurrent.DeadlockDetector;
import org.signal.core.util.concurrent.SignalExecutors;
import org.signal.libsignal.net.Network;
import org.signal.libsignal.protocol.SignalProtocolAddress;
import org.signal.libsignal.zkgroup.profiles.ClientZkProfileOperations;
import org.signal.libsignal.zkgroup.receipts.ClientZkReceiptOperations;
import org.signal.network.api.ArchiveApi;
import org.signal.network.api.CallingApi;
import org.signal.network.api.CdsApi;
import org.signal.network.api.CertificateApi;
import org.signal.network.api.LinkDeviceApi;
import org.signal.network.api.PaymentsApi;
import org.signal.network.api.ProvisioningApi;
import org.signal.network.api.RateLimitChallengeApi;
import org.signal.network.api.RemoteConfigApi;
import org.signal.network.api.SvrBApi;
import org.signal.network.api.UsernameApi;
import org.thoughtcrime.securesms.BuildConfig;
import org.thoughtcrime.securesms.components.TypingStatusRepository;
import org.thoughtcrime.securesms.components.TypingStatusSender;
@@ -79,6 +93,7 @@ import org.thoughtcrime.securesms.util.ByteUnit;
import org.thoughtcrime.securesms.util.EarlyMessageCache;
import org.thoughtcrime.securesms.util.Environment;
import org.thoughtcrime.securesms.util.FrameRateTracker;
import org.thoughtcrime.securesms.util.PreKeyBatcher;
import org.thoughtcrime.securesms.util.RemoteConfig;
import org.thoughtcrime.securesms.util.TextSecurePreferences;
import org.thoughtcrime.securesms.video.exo.GiphyMp4Cache;
@@ -89,30 +104,18 @@ import org.whispersystems.signalservice.api.SignalServiceDataStore;
import org.whispersystems.signalservice.api.SignalServiceMessageReceiver;
import org.whispersystems.signalservice.api.SignalServiceMessageSender;
import org.whispersystems.signalservice.api.account.AccountApi;
import org.signal.network.api.ArchiveApi;
import org.whispersystems.signalservice.api.attachment.AttachmentApi;
import org.signal.network.api.CallingApi;
import org.signal.network.api.CdsApi;
import org.signal.network.api.CertificateApi;
import org.whispersystems.signalservice.api.donations.DonationsApi;
import org.whispersystems.signalservice.api.groupsv2.ClientZkOperations;
import org.whispersystems.signalservice.api.groupsv2.GroupsV2Operations;
import org.whispersystems.signalservice.api.keys.KeysApi;
import org.signal.network.api.LinkDeviceApi;
import org.whispersystems.signalservice.api.keys.PreKeyRepository;
import org.whispersystems.signalservice.api.message.MessageApi;
import org.signal.network.api.PaymentsApi;
import org.whispersystems.signalservice.api.profiles.ProfileApi;
import org.signal.network.api.ProvisioningApi;
import org.signal.core.models.ServiceId.ACI;
import org.signal.core.models.ServiceId.PNI;
import org.signal.network.api.RateLimitChallengeApi;
import org.whispersystems.signalservice.api.registration.RegistrationApi;
import org.signal.network.api.RemoteConfigApi;
import org.whispersystems.signalservice.api.services.DonationsService;
import org.whispersystems.signalservice.api.services.ProfileService;
import org.whispersystems.signalservice.api.storage.StorageServiceApi;
import org.signal.network.api.SvrBApi;
import org.signal.network.api.UsernameApi;
import org.whispersystems.signalservice.api.util.CredentialsProvider;
import org.whispersystems.signalservice.api.util.SleepTimer;
import org.whispersystems.signalservice.api.util.UptimeSleepTimer;
@@ -179,7 +182,15 @@ public class ApplicationDependencyProvider implements AppDependencies.Provider {
RemoteConfig.maxIncrementalMacsPerEnvelope(),
RemoteConfig::useMessageSendRestFallback,
RemoteConfig.useBinaryId(),
BuildConfig.USE_STRING_ID);
BuildConfig.USE_STRING_ID,
new PreKeyRepository(
keysApi,
protocolStore.aci(),
new SignalProtocolAddress(pushServiceSocket.getCredentialsProvider().getAci().getLibSignalServiceId(),
pushServiceSocket.getCredentialsProvider().getDeviceId()),
PreKeyBatcher.INSTANCE
)
);
}
@Override
@@ -167,7 +167,7 @@ class PreKeysSyncJob private constructor(
return
}
val availablePreKeyCounts = SignalNetwork.keys.getAvailablePreKeyCounts(serviceIdType).successOrThrow()
val availablePreKeyCounts = SignalNetwork.keys.getAvailablePreKeyCountsSync(serviceIdType).successOrThrow()
val signedPreKeyToUpload: SignedPreKeyRecord? = signedPreKeyUploadIfNeeded(serviceIdType, protocolStore, metadataStore, forceRotation)
@@ -191,7 +191,7 @@ class PreKeysSyncJob private constructor(
if (signedPreKeyToUpload != null || oneTimeEcPreKeysToUpload != null || lastResortKyberPreKeyToUpload != null || oneTimeKyberPreKeysToUpload != null) {
log(serviceIdType, "Something to upload. SignedPreKey: ${signedPreKeyToUpload != null}, OneTimeEcPreKeys: ${oneTimeEcPreKeysToUpload != null}, LastResortKyberPreKey: ${lastResortKyberPreKeyToUpload != null}, OneTimeKyberPreKeys: ${oneTimeKyberPreKeysToUpload != null}")
SignalNetwork.keys.setPreKeys(
SignalNetwork.keys.setPreKeysSync(
PreKeyUpload(
serviceIdType = serviceIdType,
signedPreKey = signedPreKeyToUpload,
@@ -260,7 +260,7 @@ class PreKeysSyncJob private constructor(
@Throws(IOException::class)
private fun checkPreKeyConsistency(serviceIdType: ServiceIdType, protocolStore: SignalServiceAccountDataStore, metadataStore: PreKeyMetadataStore): Boolean {
val result: NetworkResult<Unit> = try {
SignalNetwork.keys.checkRepeatedUseKeys(
SignalNetwork.keys.checkRepeatedUseKeysSync(
serviceIdType = serviceIdType,
identityKey = protocolStore.identityKeyPair.publicKey,
signedPreKeyId = metadataStore.activeSignedPreKeyId,
@@ -84,7 +84,7 @@ public class PniAccountInitializationMigrationJob extends MigrationJob {
SignedPreKeyRecord signedPreKey = PreKeyUtil.generateAndStoreSignedPreKey(protocolStore, metadataStore);
List<PreKeyRecord> oneTimePreKeys = PreKeyUtil.generateAndStoreOneTimeEcPreKeys(protocolStore, metadataStore);
NetworkResultUtil.toPreKeysLegacy(SignalNetwork.keys().setPreKeys(new PreKeyUpload(ServiceIdType.PNI, signedPreKey, oneTimePreKeys, null, null)));
NetworkResultUtil.toPreKeysLegacy(SignalNetwork.keys().setPreKeysSync(new PreKeyUpload(ServiceIdType.PNI, signedPreKey, oneTimePreKeys, null, null)));
metadataStore.setActiveSignedPreKeyId(signedPreKey.getId());
metadataStore.setSignedPreKeyRegistered(true);
} else {
@@ -15,6 +15,7 @@ import org.thoughtcrime.securesms.database.SignalDatabase;
import org.thoughtcrime.securesms.database.model.RecipientRecord;
import org.thoughtcrime.securesms.util.livedata.LiveDataUtil;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -7,7 +7,9 @@ import androidx.annotation.AnyThread;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.annotation.VisibleForTesting;
import androidx.annotation.WorkerThread;
import org.jetbrains.annotations.NotNull;
import org.signal.core.util.ThreadUtil;
import org.signal.core.util.concurrent.SignalExecutors;
import org.signal.core.util.logging.Log;
@@ -15,6 +17,7 @@ import org.thoughtcrime.securesms.database.RecipientTable;
import org.thoughtcrime.securesms.database.RecipientTable.MissingRecipientException;
import org.thoughtcrime.securesms.database.SignalDatabase;
import org.thoughtcrime.securesms.database.ThreadTable;
import org.thoughtcrime.securesms.database.model.RecipientRecord;
import org.thoughtcrime.securesms.database.model.ThreadRecord;
import org.thoughtcrime.securesms.keyvalue.SignalStore;
import org.signal.core.util.CursorUtil;
@@ -27,9 +30,11 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
public final class LiveRecipientCache {
@@ -102,6 +107,29 @@ public final class LiveRecipientCache {
}
}
/**
* Resolves and updates entries for each recipient already in the cache.
*/
@WorkerThread
public void refresh(@NonNull Collection<RecipientId> recipientIds) {
Set<RecipientId> cachedIds;
synchronized (recipients) {
cachedIds = recipientIds.stream().filter(recipients::containsKey).collect(Collectors.toSet());
}
if (!cachedIds.isEmpty()) {
Set<Recipient> recipients = SignalDatabase
.recipients()
.getExistingRecords(cachedIds)
.values()
.stream()
.map(record -> RecipientCreator.forRecord(context, record))
.collect(Collectors.toSet());
addToCache(recipients);
}
}
/**
* Adds a recipient to the cache if we don't have an entry. This will also update a cache entry
* if the provided recipient is resolved, or if the existing cache entry is unresolved.
@@ -0,0 +1,40 @@
/*
* Copyright 2026 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.util
import org.thoughtcrime.securesms.database.IdentityTable
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.dependencies.AppDependencies
import org.thoughtcrime.securesms.recipients.RecipientId
import org.thoughtcrime.securesms.storage.StorageSyncHelper.scheduleSyncForDataChange
import org.whispersystems.signalservice.api.keys.PreKeyRepository
/**
* Helper to batch recipient updates and storage sync when doing a large prekey fetch.
*
* See [PreKeyRepository.BatchHelper] for additional details.
*/
object PreKeyBatcher : PreKeyRepository.BatchHelper {
override fun batch(block: Runnable) {
val affected: MutableSet<RecipientId> = HashSet()
try {
IdentityTable.SUPPRESS_RECIPIENT_REFRESH.set(affected)
block.run()
if (!affected.isEmpty()) {
SignalDatabase.recipients.markNeedsSyncWithoutRefresh(affected)
}
} finally {
IdentityTable.SUPPRESS_RECIPIENT_REFRESH.remove()
}
if (!affected.isEmpty()) {
AppDependencies.recipientCache.refresh(affected)
scheduleSyncForDataChange()
}
}
}