mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-20 01:28:03 +01:00
Add dynamic configuration to optionally use shared MRM data
This commit is contained in:
@@ -5,9 +5,10 @@
|
||||
|
||||
package org.whispersystems.textsecuregcm.configuration.dynamic;
|
||||
|
||||
public record DynamicMessagesConfiguration(boolean storeSharedMrmData, boolean mrmViewExperimentEnabled) {
|
||||
public record DynamicMessagesConfiguration(boolean storeSharedMrmData, boolean fetchSharedMrmData,
|
||||
boolean useSharedMrmData) {
|
||||
|
||||
public DynamicMessagesConfiguration() {
|
||||
this(false, false);
|
||||
this(false, false, false);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -957,7 +957,7 @@ public class MessageController {
|
||||
if (sharedMrmKey != null) {
|
||||
messageBuilder.setSharedMrmKey(ByteString.copyFrom(sharedMrmKey));
|
||||
}
|
||||
// mrm views phase 1: always set content
|
||||
// mrm views phase 2: always set content
|
||||
messageBuilder.setContent(ByteString.copyFrom(payload));
|
||||
|
||||
messageSender.sendMessage(destinationAccount, destinationDevice, messageBuilder.build(), online);
|
||||
|
||||
@@ -137,6 +137,9 @@ public class MessagesCache {
|
||||
private final Counter staleEphemeralMessagesCounter = Metrics.counter(
|
||||
name(MessagesCache.class, "staleEphemeralMessages"));
|
||||
private final Counter mrmContentRetrievedCounter = Metrics.counter(name(MessagesCache.class, "mrmViewRetrieved"));
|
||||
private final Counter mrmRetrievalErrorCounter = Metrics.counter(name(MessagesCache.class, "mrmRetrievalError"));
|
||||
private final Counter mrmPhaseTwoMissingContentCounter = Metrics.counter(
|
||||
name(MessagesCache.class, "mrmPhaseTwoMissingContent"));
|
||||
private final Counter sharedMrmDataKeyRemovedCounter = Metrics.counter(
|
||||
name(MessagesCache.class, "sharedMrmKeyRemoved"));
|
||||
|
||||
@@ -349,19 +352,15 @@ public class MessagesCache {
|
||||
final Mono<MessageProtos.Envelope> messageMono;
|
||||
if (message.hasSharedMrmKey()) {
|
||||
|
||||
final Mono<?> experimentMono;
|
||||
if (isStaleEphemeralMessage(message, earliestAllowableEphemeralTimestamp)) {
|
||||
// skip fetching content for message that will be discarded
|
||||
experimentMono = Mono.empty();
|
||||
messageMono = Mono.just(message.toBuilder().clearSharedMrmKey().build());
|
||||
} else {
|
||||
experimentMono = maybeRunMrmViewExperiment(message, destinationUuid, destinationDevice);
|
||||
// mrm views phase 2: fetch shared MRM data -- internally depends on dynamic config that
|
||||
// enables fetching and using it (the stored messages still always have `content` set upstream)
|
||||
messageMono = getMessageWithSharedMrmData(message, destinationDevice);
|
||||
}
|
||||
|
||||
// mrm views phase 1: messageMono for sharedMrmKey is always Mono.just(), because messages always have content
|
||||
// To avoid races, wait for the experiment to run, but ignore any errors
|
||||
messageMono = experimentMono
|
||||
.onErrorComplete()
|
||||
.then(Mono.just(message.toBuilder().clearSharedMrmKey().build()));
|
||||
} else {
|
||||
messageMono = Mono.just(message);
|
||||
}
|
||||
@@ -378,14 +377,23 @@ public class MessagesCache {
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs the fetch and compare logic for the MRM view experiment, if it is enabled.
|
||||
* Returns the given message with its shared MRM data.
|
||||
*
|
||||
* @see DynamicMessagesConfiguration#mrmViewExperimentEnabled()
|
||||
* @see DynamicMessagesConfiguration#fetchSharedMrmData()
|
||||
* @see DynamicMessagesConfiguration#useSharedMrmData()
|
||||
*/
|
||||
private Mono<?> maybeRunMrmViewExperiment(final MessageProtos.Envelope mrmMessage, final UUID destinationUuid,
|
||||
private Mono<MessageProtos.Envelope> getMessageWithSharedMrmData(final MessageProtos.Envelope mrmMessage,
|
||||
final byte destinationDevice) {
|
||||
if (dynamicConfigurationManager.getConfiguration().getMessagesConfiguration()
|
||||
.mrmViewExperimentEnabled()) {
|
||||
|
||||
assert mrmMessage.hasSharedMrmKey();
|
||||
|
||||
// mrm views phase 2: messages have content
|
||||
if (!mrmMessage.hasContent()) {
|
||||
mrmPhaseTwoMissingContentCounter.increment();
|
||||
}
|
||||
|
||||
if (dynamicConfigurationManager.getConfiguration().getMessagesConfiguration().fetchSharedMrmData()
|
||||
|| !mrmMessage.hasContent()) {
|
||||
|
||||
final Experiment experiment = new Experiment(MRM_VIEWS_EXPERIMENT_NAME);
|
||||
|
||||
@@ -394,7 +402,7 @@ public class MessagesCache {
|
||||
// the message might be addressed to the account's PNI, so use the service ID from the envelope
|
||||
ServiceIdentifier.valueOf(mrmMessage.getDestinationServiceId()), destinationDevice);
|
||||
|
||||
final Mono<MessageProtos.Envelope> mrmMessageMono = Mono.from(redisCluster.withBinaryClusterReactive(
|
||||
final Mono<MessageProtos.Envelope> messageFromRedisMono = Mono.from(redisCluster.withBinaryClusterReactive(
|
||||
conn -> conn.reactive().hmget(key, "data".getBytes(StandardCharsets.UTF_8), sharedMrmViewKey)
|
||||
.collectList()
|
||||
.publishOn(messageDeliveryScheduler)))
|
||||
@@ -416,14 +424,25 @@ public class MessagesCache {
|
||||
sink.error(e);
|
||||
}
|
||||
})
|
||||
.onErrorResume(throwable -> {
|
||||
logger.warn("Failed to retrieve shared mrm data", throwable);
|
||||
mrmRetrievalErrorCounter.increment();
|
||||
return Mono.empty();
|
||||
})
|
||||
.share();
|
||||
|
||||
experiment.compareMonoResult(mrmMessage.toBuilder().clearSharedMrmKey().build(), mrmMessageMono);
|
||||
if (mrmMessage.hasContent()) {
|
||||
experiment.compareMonoResult(mrmMessage.toBuilder().clearSharedMrmKey().build(), messageFromRedisMono);
|
||||
}
|
||||
|
||||
return mrmMessageMono;
|
||||
} else {
|
||||
return Mono.empty();
|
||||
if (dynamicConfigurationManager.getConfiguration().getMessagesConfiguration().useSharedMrmData()
|
||||
|| !mrmMessage.hasContent()) {
|
||||
return messageFromRedisMono;
|
||||
}
|
||||
}
|
||||
|
||||
// if fetching or using shared data is disabled, fallback to just() with the existing message
|
||||
return Mono.just(mrmMessage.toBuilder().clearSharedMrmKey().build());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -497,13 +516,9 @@ public class MessagesCache {
|
||||
.concatMap(message -> {
|
||||
final Mono<MessageProtos.Envelope> messageMono;
|
||||
if (message.hasSharedMrmKey()) {
|
||||
final Mono<?> experimentMono = maybeRunMrmViewExperiment(message, accountUuid, destinationDevice);
|
||||
|
||||
// mrm views phase 1: messageMono for sharedMrmKey is always Mono.just(), because messages always have content
|
||||
// To avoid races, wait for the experiment to run, but ignore any errors
|
||||
messageMono = experimentMono
|
||||
.onErrorComplete()
|
||||
.then(Mono.just(message.toBuilder().clearSharedMrmKey().build()));
|
||||
// mrm views phase 2: fetch shared MRM data -- internally depends on dynamic config that
|
||||
// enables fetching and using it (the stored messages still always have `content` set upstream)
|
||||
messageMono = getMessageWithSharedMrmData(message, destinationDevice);
|
||||
} else {
|
||||
messageMono = Mono.just(message);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user