Use destination service ID from the envelope when removing views from shared MRM data

This commit is contained in:
Chris Eager
2024-09-13 17:38:18 -05:00
committed by Chris Eager
parent 11691c3122
commit 374fe087bc
8 changed files with 68 additions and 52 deletions

View File

@@ -70,7 +70,7 @@ public class MessageSender {
if (clientPresent) {
messagesManager.insert(account.getUuid(), device.getId(), message.toBuilder().setEphemeral(true).build());
} else {
messagesManager.removeRecipientViewFromMrmData(account.getUuid(), device.getId(), message);
messagesManager.removeRecipientViewFromMrmData(device.getId(), message);
}
} else {
messagesManager.insert(account.getUuid(), device.getId(), message);

View File

@@ -48,7 +48,6 @@ import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfigurati
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicMessagesConfiguration;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.experiment.Experiment;
import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier;
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection;
@@ -296,21 +295,25 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
.thenApplyAsync(serialized -> {
final List<RemovedMessage> removedMessages = new ArrayList<>(serialized.size());
final List<byte[]> sharedMrmKeysToUpdate = new ArrayList<>();
final Map<ServiceIdentifier, List<byte[]>> serviceIdentifierToMrmKeys = new HashMap<>();
for (final byte[] bytes : serialized) {
try {
final MessageProtos.Envelope envelope = MessageProtos.Envelope.parseFrom(bytes);
removedMessages.add(RemovedMessage.fromEnvelope(envelope));
if (envelope.hasSharedMrmKey()) {
sharedMrmKeysToUpdate.add(envelope.getSharedMrmKey().toByteArray());
serviceIdentifierToMrmKeys.computeIfAbsent(
ServiceIdentifier.valueOf(envelope.getDestinationServiceId()), ignored -> new ArrayList<>())
.add(envelope.getSharedMrmKey().toByteArray());
}
} catch (final InvalidProtocolBufferException e) {
logger.warn("Failed to parse envelope", e);
}
}
removeRecipientViewFromMrmData(sharedMrmKeysToUpdate, destinationUuid, destinationDevice);
serviceIdentifierToMrmKeys.forEach(
(serviceId, keysToUpdate) -> removeRecipientViewFromMrmData(keysToUpdate, serviceId, destinationDevice));
return removedMessages;
}, messageDeletionExecutorService).whenComplete((ignored, throwable) -> sample.stop(removeByGuidTimer));
@@ -472,7 +475,8 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
/**
* Makes a best-effort attempt at asynchronously updating (and removing when empty) the MRM data structure
*/
void removeRecipientViewFromMrmData(final List<byte[]> sharedMrmKeys, final UUID accountUuid, final byte deviceId) {
void removeRecipientViewFromMrmData(final List<byte[]> sharedMrmKeys, final ServiceIdentifier serviceIdentifier,
final byte deviceId) {
if (sharedMrmKeys.isEmpty()) {
return;
@@ -483,7 +487,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
.collectMultimap(SlotHash::getSlot)
.flatMapMany(slotsAndKeys -> Flux.fromIterable(slotsAndKeys.values()))
.flatMap(
keys -> removeRecipientViewFromMrmDataScript.execute(keys, new AciServiceIdentifier(accountUuid), deviceId),
keys -> removeRecipientViewFromMrmDataScript.execute(keys, serviceIdentifier, deviceId),
REMOVE_MRM_RECIPIENT_VIEW_CONCURRENCY)
.doOnNext(sharedMrmDataKeyRemovedCounter::increment)
.onErrorResume(e -> {
@@ -575,7 +579,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
return Mono.empty();
}
final List<byte[]> mrmKeys = new ArrayList<>(messagesToProcess.size());
final Map<ServiceIdentifier, List<byte[]>> serviceIdentifierToMrmKeys = new HashMap<>();
final List<String> processedMessages = new ArrayList<>(messagesToProcess.size());
for (byte[] serialized : messagesToProcess) {
try {
@@ -584,14 +588,17 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
processedMessages.add(message.getServerGuid());
if (message.hasSharedMrmKey()) {
mrmKeys.add(message.getSharedMrmKey().toByteArray());
serviceIdentifierToMrmKeys.computeIfAbsent(ServiceIdentifier.valueOf(message.getDestinationServiceId()),
ignored -> new ArrayList<>())
.add(message.getSharedMrmKey().toByteArray());
}
} catch (final InvalidProtocolBufferException e) {
logger.warn("Failed to parse envelope", e);
}
}
removeRecipientViewFromMrmData(mrmKeys, destinationUuid, deviceId);
serviceIdentifierToMrmKeys.forEach((serviceId, keysToUpdate) ->
removeRecipientViewFromMrmData(keysToUpdate, serviceId, deviceId));
return removeQueueScript.execute(destinationUuid, deviceId, processedMessages);
})

View File

@@ -10,7 +10,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier;
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import reactor.core.publisher.Mono;
@@ -30,8 +30,9 @@ class MessagesCacheRemoveRecipientViewFromMrmDataScript {
"lua/remove_recipient_view_from_mrm_data.lua", ScriptOutputType.INTEGER);
}
Mono<Long> execute(final Collection<byte[]> keysCollection, final AciServiceIdentifier serviceIdentifier,
Mono<Long> execute(final Collection<byte[]> keysCollection, final ServiceIdentifier serviceIdentifier,
final byte deviceId) {
final List<byte[]> keys = keysCollection instanceof List<byte[]>
? (List<byte[]>) keysCollection
: new ArrayList<>(keysCollection);

View File

@@ -24,6 +24,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.util.Pair;
import reactor.core.observability.micrometer.Micrometer;
@@ -214,10 +215,10 @@ public class MessagesManager {
/**
* Removes the recipient's view from shared MRM data if necessary
*/
public void removeRecipientViewFromMrmData(final UUID destinationUuid, final byte destinationDeviceId,
final Envelope message) {
public void removeRecipientViewFromMrmData(final byte destinationDeviceId, final Envelope message) {
if (message.hasSharedMrmKey()) {
messagesCache.removeRecipientViewFromMrmData(List.of(message.getSharedMrmKey().toByteArray()), destinationUuid,
messagesCache.removeRecipientViewFromMrmData(List.of(message.getSharedMrmKey().toByteArray()),
ServiceIdentifier.valueOf(message.getDestinationServiceId()),
destinationDeviceId);
}
}