Introduce extra caching for group message processing.

This commit is contained in:
Clark
2023-05-17 13:54:37 -04:00
committed by Greyson Parrelli
parent 44ab1643fa
commit 2d6b16b2ce
15 changed files with 149 additions and 33 deletions

View File

@@ -44,6 +44,7 @@ import org.thoughtcrime.securesms.groups.BadGroupIdException
import org.thoughtcrime.securesms.groups.GroupId
import org.thoughtcrime.securesms.groups.GroupId.Push
import org.thoughtcrime.securesms.groups.GroupMigrationMembershipChange
import org.thoughtcrime.securesms.groups.GroupsV1MigratedCache
import org.thoughtcrime.securesms.groups.v2.processing.GroupsV2StateProcessor
import org.thoughtcrime.securesms.jobs.RequestGroupV2InfoJob
import org.thoughtcrime.securesms.keyvalue.SignalStore
@@ -669,7 +670,7 @@ class GroupTable(context: Context?, databaseHelper: SignalDatabase?) : DatabaseT
fun create(groupMasterKey: GroupMasterKey, groupState: DecryptedGroup, force: Boolean = false): GroupId.V2? {
val groupId = GroupId.v2(groupMasterKey)
if (!force && getGroupV1ByExpectedV2(groupId).isPresent) {
if (!force && GroupsV1MigratedCache.hasV1Group(groupId)) {
throw MissedGroupMigrationInsertException(groupId)
} else if (force) {
Log.w(TAG, "Forcing the creation of a group even though we already have a V1 ID!")
@@ -688,7 +689,7 @@ class GroupTable(context: Context?, databaseHelper: SignalDatabase?) : DatabaseT
*/
fun fixMissingMasterKey(groupMasterKey: GroupMasterKey) {
val groupId = GroupId.v2(groupMasterKey)
if (getGroupV1ByExpectedV2(groupId).isPresent) {
if (GroupsV1MigratedCache.hasV1Group(groupId)) {
Log.w(TAG, "There already exists a V1 group that should be migrated into this group. But if the recipient already exists, there's not much we can do here.")
}

View File

@@ -77,6 +77,7 @@ import org.thoughtcrime.securesms.groups.BadGroupIdException
import org.thoughtcrime.securesms.groups.GroupId
import org.thoughtcrime.securesms.groups.GroupId.V1
import org.thoughtcrime.securesms.groups.GroupId.V2
import org.thoughtcrime.securesms.groups.GroupsV1MigratedCache
import org.thoughtcrime.securesms.groups.v2.ProfileKeySet
import org.thoughtcrime.securesms.groups.v2.processing.GroupsV2StateProcessor
import org.thoughtcrime.securesms.jobs.RequestGroupV2InfoJob
@@ -576,7 +577,7 @@ open class RecipientTable(context: Context, databaseHelper: SignalDatabase) : Da
return existing.get()
} else if (groupId.isV1 && groups.groupExists(groupId.requireV1().deriveV2MigrationGroupId())) {
throw LegacyGroupInsertException(groupId)
} else if (groupId.isV2 && groups.getGroupV1ByExpectedV2(groupId.requireV2()).isPresent) {
} else if (groupId.isV2 && GroupsV1MigratedCache.hasV1Group(groupId.requireV2())) {
throw MissedGroupMigrationInsertException(groupId)
} else {
val values = ContentValues().apply {
@@ -641,10 +642,10 @@ open class RecipientTable(context: Context, databaseHelper: SignalDatabase) : Da
}
if (groupId.isV2) {
val v1 = groups.getGroupV1ByExpectedV2(groupId.requireV2())
if (v1.isPresent) {
val v1 = GroupsV1MigratedCache.getV1GroupByV2Id(groupId.requireV2())
if (v1 != null) {
db.setTransactionSuccessful()
return v1.get().recipientId
return v1.recipientId
}
}

View File

@@ -12,6 +12,7 @@ import org.signal.libsignal.zkgroup.InvalidInputException;
import org.signal.libsignal.zkgroup.groups.GroupIdentifier;
import org.signal.libsignal.zkgroup.groups.GroupMasterKey;
import org.signal.libsignal.zkgroup.groups.GroupSecretParams;
import org.thoughtcrime.securesms.util.LRUCache;
import org.thoughtcrime.securesms.util.Util;
import java.io.IOException;
@@ -29,6 +30,8 @@ public abstract class GroupId implements DatabaseId {
private final String encodedId;
private static final LRUCache<GroupMasterKey, GroupIdentifier> groupIdentifierCache = new LRUCache<>(1000);
private GroupId(@NonNull String prefix, @NonNull byte[] bytes) {
this.encodedId = prefix + Hex.toStringCondensed(bytes);
}
@@ -80,9 +83,23 @@ public abstract class GroupId implements DatabaseId {
}
public static GroupId.V2 v2(@NonNull GroupMasterKey masterKey) {
return v2(GroupSecretParams.deriveFromMasterKey(masterKey)
.getPublicParams()
.getGroupIdentifier());
return v2(getIdentifierForMasterKey(masterKey));
}
public static GroupIdentifier getIdentifierForMasterKey(@NonNull GroupMasterKey masterKey) {
GroupIdentifier cachedIdentifier;
synchronized (groupIdentifierCache) {
cachedIdentifier = groupIdentifierCache.get(masterKey);
}
if (cachedIdentifier == null) {
cachedIdentifier = GroupSecretParams.deriveFromMasterKey(masterKey)
.getPublicParams()
.getGroupIdentifier();
synchronized (groupIdentifierCache) {
groupIdentifierCache.put(masterKey, cachedIdentifier);
}
}
return cachedIdentifier;
}
public static GroupId.Push push(ByteString bytes) throws BadGroupIdException {

View File

@@ -32,6 +32,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
@@ -187,12 +188,15 @@ public final class GroupManager {
@Nullable byte[] signedGroupChange)
throws GroupChangeBusyException, IOException, GroupNotAMemberException
{
return updateGroupFromServer(context, groupMasterKey, null, revision, timestamp, signedGroupChange);
try (GroupManagerV2.GroupUpdater updater = new GroupManagerV2(context).updater(groupMasterKey)) {
return updater.updateLocalToServerRevision(revision, timestamp, null, signedGroupChange);
}
}
@WorkerThread
public static GroupsV2StateProcessor.GroupUpdateResult updateGroupFromServer(@NonNull Context context,
@NonNull GroupMasterKey groupMasterKey,
@NonNull Optional<GroupRecord> groupRecord,
@Nullable GroupSecretParams groupSecretParams,
int revision,
long timestamp,
@@ -200,7 +204,7 @@ public final class GroupManager {
throws GroupChangeBusyException, IOException, GroupNotAMemberException
{
try (GroupManagerV2.GroupUpdater updater = new GroupManagerV2(context).updater(groupMasterKey)) {
return updater.updateLocalToServerRevision(revision, timestamp, groupSecretParams, signedGroupChange);
return updater.updateLocalToServerRevision(revision, timestamp, groupRecord, groupSecretParams, signedGroupChange);
}
}

View File

@@ -804,6 +804,14 @@ final class GroupManagerV2 {
.updateLocalGroupToRevision(revision, timestamp, getDecryptedGroupChange(signedGroupChange));
}
@WorkerThread
GroupsV2StateProcessor.GroupUpdateResult updateLocalToServerRevision(int revision, long timestamp, @NonNull Optional<GroupRecord> localRecord, @Nullable GroupSecretParams groupSecretParams, @Nullable byte[] signedGroupChange)
throws IOException, GroupNotAMemberException
{
return new GroupsV2StateProcessor(context).forGroup(serviceIds, groupMasterKey, groupSecretParams)
.updateLocalGroupToRevision(revision, timestamp, localRecord, getDecryptedGroupChange(signedGroupChange));
}
@WorkerThread
void forceSanityUpdateFromServer(long timestamp)
throws IOException, GroupNotAMemberException
@@ -929,11 +937,11 @@ final class GroupManagerV2 {
alreadyAMember = true;
}
Optional<GroupRecord> unmigratedV1Group = groupDatabase.getGroupV1ByExpectedV2(groupId);
GroupRecord unmigratedV1Group = GroupsV1MigratedCache.getV1GroupByV2Id(groupId);
if (unmigratedV1Group.isPresent()) {
if (unmigratedV1Group != null) {
Log.i(TAG, "Group link was for a migrated V1 group we know about! Migrating it and using that as the base.");
GroupsV1MigrationUtil.performLocalMigration(context, unmigratedV1Group.get().getId().requireV1());
GroupsV1MigrationUtil.performLocalMigration(context, unmigratedV1Group.getId().requireV1());
}
DecryptedGroup decryptedGroup = createPlaceholderGroup(joinInfo, requestToJoin);

View File

@@ -0,0 +1,46 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.thoughtcrime.securesms.groups
import androidx.annotation.WorkerThread
import org.signal.core.util.orNull
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.database.model.GroupRecord
import org.thoughtcrime.securesms.util.LRUCache
/**
* Cache to keep track of groups we know do not need a migration run on. This is to save time looking for a gv1 group
* with the expected v2 id.
*/
object GroupsV1MigratedCache {
private const val MAX_CACHE = 1000
private val noV1GroupCache = LRUCache<GroupId.V2, Boolean>(MAX_CACHE)
@JvmStatic
@WorkerThread
fun hasV1Group(groupId: GroupId.V2): Boolean {
return getV1GroupByV2Id(groupId) != null
}
@JvmStatic
@WorkerThread
fun getV1GroupByV2Id(groupId: GroupId.V2): GroupRecord? {
synchronized(noV1GroupCache) {
if (noV1GroupCache.containsKey(groupId)) {
return null
}
}
val v1Group = SignalDatabase.groups.getGroupV1ByExpectedV2(groupId)
if (!v1Group.isPresent) {
synchronized(noV1GroupCache) {
noV1GroupCache.put(groupId, true)
}
}
return v1Group.orNull()
}
}

View File

@@ -266,8 +266,21 @@ public class GroupsV2StateProcessor {
@Nullable DecryptedGroupChange signedGroupChange)
throws IOException, GroupNotAMemberException
{
Optional<GroupRecord> localRecord = groupDatabase.getGroup(groupId);
return updateLocalGroupToRevision(revision, timestamp, groupDatabase.getGroup(groupId), signedGroupChange);
}
/**
* Using network where required, will attempt to bring the local copy of the group up to the revision specified.
*
* @param revision use {@link #LATEST} to get latest.
*/
@WorkerThread
public GroupUpdateResult updateLocalGroupToRevision(final int revision,
final long timestamp,
@NonNull Optional<GroupRecord> localRecord,
@Nullable DecryptedGroupChange signedGroupChange)
throws IOException, GroupNotAMemberException
{
if (localIsAtLeast(localRecord, revision)) {
return new GroupUpdateResult(GroupState.GROUP_CONSISTENT_OR_AHEAD, null);
}

View File

@@ -11,6 +11,7 @@ import org.thoughtcrime.securesms.database.SignalDatabase;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.groups.GroupChangeBusyException;
import org.thoughtcrime.securesms.groups.GroupId;
import org.thoughtcrime.securesms.groups.GroupsV1MigratedCache;
import org.thoughtcrime.securesms.jobmanager.JsonJobData;
import org.thoughtcrime.securesms.jobmanager.Job;
import org.thoughtcrime.securesms.jobmanager.impl.ChangeNumberConstraint;
@@ -107,7 +108,7 @@ public final class PushProcessMessageJob extends BaseJob {
int localRevision = SignalDatabase.groups().getGroupV2Revision(groupId.requireV2());
if (signalServiceGroupContext.getRevision() > localRevision ||
SignalDatabase.groups().getGroupV1ByExpectedV2(groupId.requireV2()).isPresent())
GroupsV1MigratedCache.hasV1Group(groupId.requireV2()))
{
Log.i(TAG, "Adding network constraint to group-related job.");
builder.addConstraint(NetworkConstraint.KEY)

View File

@@ -6,6 +6,7 @@ import okio.ByteString.Companion.toByteString
import org.signal.core.util.logging.Log
import org.thoughtcrime.securesms.database.SignalDatabase.Companion.groups
import org.thoughtcrime.securesms.groups.GroupChangeBusyException
import org.thoughtcrime.securesms.groups.GroupsV1MigratedCache
import org.thoughtcrime.securesms.jobmanager.Job
import org.thoughtcrime.securesms.jobmanager.impl.ChangeNumberConstraint
import org.thoughtcrime.securesms.jobmanager.impl.NetworkConstraint
@@ -126,7 +127,7 @@ class PushProcessMessageJobV2 private constructor(
if (groupId.isV2) {
val localRevision = groups.getGroupV2Revision(groupId.requireV2())
if (groupContext.revision > localRevision || groups.getGroupV1ByExpectedV2(groupId.requireV2()).isPresent) {
if (groupContext.revision > localRevision || GroupsV1MigratedCache.hasV1Group(groupId)) {
Log.i(TAG, "Adding network constraint to group-related job.")
builder.addConstraint(NetworkConstraint.KEY).setLifespan(TimeUnit.DAYS.toMillis(30))
}

View File

@@ -74,6 +74,7 @@ import org.thoughtcrime.securesms.groups.GroupChangeBusyException;
import org.thoughtcrime.securesms.groups.GroupId;
import org.thoughtcrime.securesms.groups.GroupManager;
import org.thoughtcrime.securesms.groups.GroupNotAMemberException;
import org.thoughtcrime.securesms.groups.GroupsV1MigratedCache;
import org.thoughtcrime.securesms.groups.GroupsV1MigrationUtil;
import org.thoughtcrime.securesms.jobmanager.JobManager;
import org.thoughtcrime.securesms.jobs.AttachmentDownloadJob;
@@ -521,11 +522,11 @@ public class MessageContentProcessor {
private boolean handleGv2PreProcessing(@NonNull GroupId.V2 groupId, @NonNull SignalServiceContent content, @NonNull SignalServiceGroupV2 groupV2, @NonNull Recipient senderRecipient)
throws IOException, GroupChangeBusyException
{
GroupTable groupDatabase = SignalDatabase.groups();
Optional<GroupRecord> possibleGv1 = groupDatabase.getGroupV1ByExpectedV2(groupId);
GroupTable groupDatabase = SignalDatabase.groups();
GroupRecord possibleGv1 = GroupsV1MigratedCache.getV1GroupByV2Id(groupId);
if (possibleGv1.isPresent()) {
GroupsV1MigrationUtil.performLocalMigration(context, possibleGv1.get().getId().requireV1());
if (possibleGv1 != null) {
GroupsV1MigrationUtil.performLocalMigration(context, possibleGv1.getId().requireV1());
}
if (!updateGv2GroupFromServerOrP2PChange(content, groupV2)) {

View File

@@ -9,6 +9,7 @@ import org.signal.libsignal.protocol.ecc.ECPublicKey
import org.signal.libsignal.protocol.message.DecryptionErrorMessage
import org.signal.libsignal.zkgroup.groups.GroupSecretParams
import org.thoughtcrime.securesms.database.SignalDatabase
import org.thoughtcrime.securesms.database.model.GroupRecord
import org.thoughtcrime.securesms.database.model.MessageLogEntry
import org.thoughtcrime.securesms.database.model.MessageRecord
import org.thoughtcrime.securesms.database.model.PendingRetryReceiptModel
@@ -18,6 +19,7 @@ import org.thoughtcrime.securesms.groups.GroupChangeBusyException
import org.thoughtcrime.securesms.groups.GroupId
import org.thoughtcrime.securesms.groups.GroupManager
import org.thoughtcrime.securesms.groups.GroupNotAMemberException
import org.thoughtcrime.securesms.groups.GroupsV1MigratedCache
import org.thoughtcrime.securesms.groups.GroupsV1MigrationUtil
import org.thoughtcrime.securesms.groups.v2.processing.GroupsV2StateProcessor
import org.thoughtcrime.securesms.jobs.NullMessageSendJob
@@ -50,6 +52,7 @@ import org.whispersystems.signalservice.internal.push.SignalServiceProtos.Conten
import org.whispersystems.signalservice.internal.push.SignalServiceProtos.Envelope
import org.whispersystems.signalservice.internal.push.SignalServiceProtos.TypingMessage
import java.io.IOException
import java.util.Optional
open class MessageContentProcessorV2(private val context: Context) {
@@ -224,22 +227,21 @@ open class MessageContentProcessorV2(private val context: Context) {
senderRecipient: Recipient,
groupSecretParams: GroupSecretParams? = null
): Gv2PreProcessResult {
val possibleV1OrV2Group = SignalDatabase.groups.getGroupV1OrV2ByExpectedV2(groupId)
val needsV1Migration = possibleV1OrV2Group.isPresent && possibleV1OrV2Group.get().isV1Group
if (needsV1Migration) {
GroupsV1MigrationUtil.performLocalMigration(context, possibleV1OrV2Group.get().id.requireV1())
val v1Group = GroupsV1MigratedCache.getV1GroupByV2Id(groupId)
if (v1Group != null) {
GroupsV1MigrationUtil.performLocalMigration(context, v1Group.id.requireV1())
}
val groupUpdateResult = updateGv2GroupFromServerOrP2PChange(context, timestamp, groupV2, groupSecretParams)
val preUpdateGroupRecord = SignalDatabase.groups.getGroup(groupId)
val groupUpdateResult = updateGv2GroupFromServerOrP2PChange(context, timestamp, groupV2, preUpdateGroupRecord, groupSecretParams)
if (groupUpdateResult == null) {
log(timestamp, "Ignoring GV2 message for group we are not currently in $groupId")
return Gv2PreProcessResult.IGNORE
}
val groupRecord = if (groupUpdateResult.groupState == GroupsV2StateProcessor.GroupState.GROUP_UPDATED || needsV1Migration) {
SignalDatabase.groups.getGroup(groupId)
val groupRecord = if (groupUpdateResult.groupState == GroupsV2StateProcessor.GroupState.GROUP_CONSISTENT_OR_AHEAD) {
preUpdateGroupRecord
} else {
possibleV1OrV2Group
SignalDatabase.groups.getGroup(groupId)
}
if (groupRecord.isPresent && !groupRecord.get().members.contains(senderRecipient.id)) {
@@ -270,12 +272,13 @@ open class MessageContentProcessorV2(private val context: Context) {
context: Context,
timestamp: Long,
groupV2: SignalServiceProtos.GroupContextV2,
localRecord: Optional<GroupRecord>,
groupSecretParams: GroupSecretParams? = null
): GroupsV2StateProcessor.GroupUpdateResult? {
return try {
val signedGroupChange: ByteArray? = if (groupV2.hasSignedGroupChange) groupV2.signedGroupChange else null
val updatedTimestamp = if (signedGroupChange != null) timestamp else timestamp - 1
GroupManager.updateGroupFromServer(context, groupV2.groupMasterKey, groupSecretParams, groupV2.revision, updatedTimestamp, signedGroupChange)
GroupManager.updateGroupFromServer(context, groupV2.groupMasterKey, localRecord, groupSecretParams, groupV2.revision, updatedTimestamp, signedGroupChange)
} catch (e: GroupNotAMemberException) {
warn(timestamp, "Ignoring message for a group we're not in")
null

View File

@@ -591,7 +591,7 @@ object SyncMessageProcessor {
val dataMessage: DataMessage = sent.message
val groupId: GroupId.V2? = dataMessage.groupV2.groupId
if (MessageContentProcessorV2.updateGv2GroupFromServerOrP2PChange(context, envelope.timestamp, dataMessage.groupV2) == null) {
if (MessageContentProcessorV2.updateGv2GroupFromServerOrP2PChange(context, envelope.timestamp, dataMessage.groupV2, SignalDatabase.groups.getGroup(GroupId.v2(dataMessage.groupV2.groupMasterKey))) == null) {
log(envelope.timestamp, "Ignoring GV2 message for group we are not currently in $groupId")
}
}

View File

@@ -336,7 +336,7 @@ public class Recipient {
*/
@WorkerThread
public static @NonNull Recipient externalPossiblyMigratedGroup(@NonNull GroupId groupId) {
return Recipient.resolved(SignalDatabase.recipients().getOrInsertFromPossiblyMigratedGroupId(groupId));
return Recipient.resolved(RecipientId.from(groupId));
}
/**

View File

@@ -13,6 +13,7 @@ import com.annimon.stream.Stream;
import org.signal.core.util.DatabaseId;
import org.signal.core.util.LongSerializer;
import org.thoughtcrime.securesms.database.SignalDatabase;
import org.thoughtcrime.securesms.groups.GroupId;
import org.thoughtcrime.securesms.util.DelimiterUtil;
import org.thoughtcrime.securesms.util.Util;
import org.whispersystems.signalservice.api.push.ServiceId;
@@ -69,6 +70,16 @@ public class RecipientId implements Parcelable, Comparable<RecipientId>, Databas
return from(null, identifier);
}
public static @NonNull RecipientId from(@NonNull GroupId groupId) {
RecipientId recipientId = RecipientIdCache.INSTANCE.get(groupId);
if (recipientId == null) {
recipientId = SignalDatabase.recipients().getOrInsertFromPossiblyMigratedGroupId(groupId);
if (groupId.isV2()) {
RecipientIdCache.INSTANCE.put(groupId, recipientId);
}
}
return recipientId;
}
/**
* Used for when you have a string that could be either a UUID or an e164. This was primarily
* created for interacting with protocol stores.

View File

@@ -4,6 +4,7 @@ import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import org.signal.core.util.logging.Log;
import org.thoughtcrime.securesms.groups.GroupId;
import org.whispersystems.signalservice.api.push.ServiceId;
import java.util.LinkedHashMap;
@@ -50,6 +51,14 @@ final class RecipientIdCache {
put(recipientId, e164.orElse(null), serviceId.orElse(null));
}
synchronized @Nullable RecipientId get(@NonNull GroupId groupId) {
return ids.get(groupId);
}
synchronized void put(@NonNull GroupId groupId, @NonNull RecipientId recipientId) {
ids.put(groupId, recipientId);
}
synchronized @Nullable RecipientId get(@Nullable ServiceId serviceId, @Nullable String e164) {
if (serviceId != null && e164 != null) {
RecipientId recipientIdByAci = ids.get(serviceId);