Improve performance of first send to large group.

This commit is contained in:
Cody Henthorne
2026-02-03 15:26:27 -05:00
committed by GitHub
parent 9369cafd38
commit 8353ad4962
3 changed files with 110 additions and 5 deletions

View File

@@ -296,6 +296,18 @@ class InternalConversationSettingsFragment : ComposeFragment(), InternalConversa
SignalDatabase.senderKeyShared.deleteAllFor(group.distributionId) SignalDatabase.senderKeyShared.deleteAllFor(group.distributionId)
} }
override fun clearSenderKeyAndArchiveSessions(recipientId: RecipientId) {
clearSenderKey(recipientId)
val group = SignalDatabase.groups.getGroup(recipientId).orNull()
if (group == null) {
Log.w(TAG, "Couldn't find group for recipientId: $recipientId")
return
}
group.members.forEach { archiveSessions(it) }
}
class InternalViewModel( class InternalViewModel(
val recipientId: RecipientId val recipientId: RecipientId
) : ViewModel(), RecipientForeverObserver { ) : ViewModel(), RecipientForeverObserver {

View File

@@ -33,6 +33,7 @@ private enum class Dialog {
NONE, NONE,
DISABLE_PROFILE_SHARING, DISABLE_PROFILE_SHARING,
CLEAR_SENDER_KEY, CLEAR_SENDER_KEY,
CLEAR_SENDER_KEY_AND_ARCHIVE_SESSIONS,
DELETE_SESSIONS, DELETE_SESSIONS,
ARCHIVE_SESSIONS, ARCHIVE_SESSIONS,
DELETE_AVATAR, DELETE_AVATAR,
@@ -207,6 +208,16 @@ fun InternalConversationSettingsScreen(
} }
) )
} }
item {
Rows.TextRow(
text = "Clear sender key and archive sessions",
label = "Resets any sender key state and archives all sessions for group members, will force creating new sessions and re-distributing sender key material.",
onClick = {
dialog = Dialog.CLEAR_SENDER_KEY_AND_ARCHIVE_SESSIONS
}
)
}
} else { } else {
item { item {
Rows.TextRow( Rows.TextRow(
@@ -375,6 +386,9 @@ private fun rememberOnConfirm(
Dialog.CLEAR_SENDER_KEY -> { Dialog.CLEAR_SENDER_KEY -> {
{ callbacks.clearSenderKey(state.recipientId) } { callbacks.clearSenderKey(state.recipientId) }
} }
Dialog.CLEAR_SENDER_KEY_AND_ARCHIVE_SESSIONS -> {
{ callbacks.clearSenderKeyAndArchiveSessions(state.recipientId) }
}
} }
} }
} }
@@ -473,6 +487,7 @@ interface InternalConversationSettingsScreenCallbacks {
fun splitAndCreateThreads(recipientId: RecipientId) = Unit fun splitAndCreateThreads(recipientId: RecipientId) = Unit
fun splitWithoutCreatingThreads(recipientId: RecipientId) = Unit fun splitWithoutCreatingThreads(recipientId: RecipientId) = Unit
fun clearSenderKey(recipientId: RecipientId) = Unit fun clearSenderKey(recipientId: RecipientId) = Unit
fun clearSenderKeyAndArchiveSessions(recipientId: RecipientId) = Unit
object Empty : InternalConversationSettingsScreenCallbacks object Empty : InternalConversationSettingsScreenCallbacks
} }

View File

@@ -5,7 +5,10 @@
*/ */
package org.whispersystems.signalservice.api; package org.whispersystems.signalservice.api;
import org.signal.core.models.ServiceId;
import org.signal.core.models.ServiceId.PNI;
import org.signal.core.util.Base64; import org.signal.core.util.Base64;
import org.signal.core.util.UuidUtil;
import org.signal.libsignal.metadata.certificate.SenderCertificate; import org.signal.libsignal.metadata.certificate.SenderCertificate;
import org.signal.libsignal.protocol.IdentityKey; import org.signal.libsignal.protocol.IdentityKey;
import org.signal.libsignal.protocol.IdentityKeyPair; import org.signal.libsignal.protocol.IdentityKeyPair;
@@ -68,8 +71,6 @@ import org.whispersystems.signalservice.api.messages.multidevice.ViewOnceOpenMes
import org.whispersystems.signalservice.api.messages.multidevice.ViewedMessage; import org.whispersystems.signalservice.api.messages.multidevice.ViewedMessage;
import org.whispersystems.signalservice.api.messages.shared.SharedContact; import org.whispersystems.signalservice.api.messages.shared.SharedContact;
import org.whispersystems.signalservice.api.push.DistributionId; import org.whispersystems.signalservice.api.push.DistributionId;
import org.signal.core.models.ServiceId;
import org.signal.core.models.ServiceId.PNI;
import org.whispersystems.signalservice.api.push.SignalServiceAddress; import org.whispersystems.signalservice.api.push.SignalServiceAddress;
import org.whispersystems.signalservice.api.push.exceptions.AuthorizationFailedException; import org.whispersystems.signalservice.api.push.exceptions.AuthorizationFailedException;
import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResponseCodeException; import org.whispersystems.signalservice.api.push.exceptions.NonSuccessfulResponseCodeException;
@@ -84,7 +85,6 @@ import org.whispersystems.signalservice.api.util.CredentialsProvider;
import org.whispersystems.signalservice.api.util.Preconditions; import org.whispersystems.signalservice.api.util.Preconditions;
import org.whispersystems.signalservice.api.util.Uint64RangeException; import org.whispersystems.signalservice.api.util.Uint64RangeException;
import org.whispersystems.signalservice.api.util.Uint64Util; import org.whispersystems.signalservice.api.util.Uint64Util;
import org.signal.core.util.UuidUtil;
import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException; import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
import org.whispersystems.signalservice.internal.crypto.AttachmentDigest; import org.whispersystems.signalservice.internal.crypto.AttachmentDigest;
import org.whispersystems.signalservice.internal.crypto.PaddingInputStream; import org.whispersystems.signalservice.internal.crypto.PaddingInputStream;
@@ -718,7 +718,7 @@ public class SignalServiceMessageSender {
boolean urgent = false; boolean urgent = false;
if (!aciStore.isMultiDevice()) { if (!aciStore.isMultiDevice()) {
Log.w(TAG, "We do not have any linked devices. Skipping."); Log.d(TAG, "We do not have any linked devices. Skipping.");
return SendMessageResult.success(localAddress, Collections.emptyList(), false, false, 0, Optional.empty()); return SendMessageResult.success(localAddress, Collections.emptyList(), false, false, 0, Optional.empty());
} }
@@ -2113,7 +2113,10 @@ public class SignalServiceMessageSender {
Log.d(TAG, "[" + timestamp + "] Sending to " + recipients.size() + " recipients."); Log.d(TAG, "[" + timestamp + "] Sending to " + recipients.size() + " recipients.");
enforceMaxEnvelopeContentSize(content); enforceMaxEnvelopeContentSize(content);
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
eagerlyFetchMissingPreKeys(recipients, sealedSenderAccesses, story);
List<Observable<SendMessageResult>> singleResults = new LinkedList<>(); List<Observable<SendMessageResult>> singleResults = new LinkedList<>();
Iterator<SignalServiceAddress> recipientIterator = recipients.iterator(); Iterator<SignalServiceAddress> recipientIterator = recipients.iterator();
Iterator<SealedSenderAccess> sealedSenderAccessIterator = sealedSenderAccesses.iterator(); Iterator<SealedSenderAccess> sealedSenderAccessIterator = sealedSenderAccesses.iterator();
@@ -2819,6 +2822,81 @@ public class SignalServiceMessageSender {
} }
} }
private void eagerlyFetchMissingPreKeys(List<SignalServiceAddress> recipients, List<SealedSenderAccess> sealedSenderAccesses, boolean story) {
long start = System.currentTimeMillis();
Iterator<SignalServiceAddress> recipientIterator = recipients.iterator();
Iterator<SealedSenderAccess> sealedSenderAccessIterator = sealedSenderAccesses.iterator();
List<Observable<Boolean>> eagerFetches = new LinkedList<>();
while (recipientIterator.hasNext()) {
SignalServiceAddress recipient = recipientIterator.next();
SealedSenderAccess sealedSenderAccess = sealedSenderAccessIterator.next();
SignalProtocolAddress signalProtocolAddress = new SignalProtocolAddress(recipient.getIdentifier(), SignalServiceAddress.DEFAULT_DEVICE_ID);
if (!aciStore.containsSession(signalProtocolAddress)) {
Observable<Boolean> thing = Single.fromCallable(() -> {
eagerlyFetchMissingPreKeys(recipient, sealedSenderAccess, story);
return true;
})
.subscribeOn(scheduler)
.toObservable();
eagerFetches.add(thing);
}
}
if (eagerFetches.isEmpty()) {
return;
}
Log.i(TAG, "[eagerPrefetch] Attempting to fetch prekeys for " + eagerFetches.size() + " recipients");
try {
//noinspection ResultOfMethodCallIgnored
Observable.mergeDelayError(eagerFetches, Integer.MAX_VALUE, 1)
.observeOn(scheduler)
.lastOrError()
.blockingGet();
} catch (RuntimeException e) {
Log.w(TAG, "[eagerPrefetch] Unexpectedly failed eager fetching prekeys", e);
return;
}
Log.i(TAG, "[eagerPrefetch] Completed in " + (System.currentTimeMillis() - start) + "ms");
}
private void eagerlyFetchMissingPreKeys(SignalServiceAddress recipient, SealedSenderAccess sealedSenderAccess, boolean story) {
SignalProtocolAddress signalProtocolAddress = new SignalProtocolAddress(recipient.getIdentifier(), SignalServiceAddress.DEFAULT_DEVICE_ID);
try {
List<PreKeyBundle> preKeys = getPreKeys(recipient, sealedSenderAccess, SignalServiceAddress.DEFAULT_DEVICE_ID, story);
for (PreKeyBundle preKey : preKeys) {
Log.d(TAG, "[eagerFetch] Initializing prekey session for " + signalProtocolAddress);
try {
SignalProtocolAddress preKeyAddress = new SignalProtocolAddress(recipient.getIdentifier(), preKey.getDeviceId());
SignalSessionBuilder sessionBuilder = new SignalSessionBuilder(sessionLock, new SessionBuilder(aciStore, preKeyAddress));
sessionBuilder.process(preKey);
} catch (org.signal.libsignal.protocol.UntrustedIdentityException e) {
Log.i(TAG, "[eagerPrefetch] Untrusted identity for recipient");
return;
}
}
if (eventListener.isPresent()) {
eventListener.get().onSecurityEvent(recipient);
}
} catch (IOException e) {
Log.i(TAG, "[eagerPrefetch] Network issue encountered");
} catch (InvalidKeyException e) {
Log.i(TAG, "[eagerPrefetch] Invalid pre-key");
return;
}
}
private List<PreKeyBundle> getPreKeys(SignalServiceAddress recipient, @Nullable SealedSenderAccess sealedSenderAccess, int deviceId, boolean story) throws IOException { private List<PreKeyBundle> getPreKeys(SignalServiceAddress recipient, @Nullable SealedSenderAccess sealedSenderAccess, int deviceId, boolean story) throws IOException {
try { try {
// If it's only unrestricted because it's a story send, then we know it'll fail // If it's only unrestricted because it's a story send, then we know it'll fail