Add skip low urgency push experiment

This commit is contained in:
Ravi Khadiwala
2025-04-21 14:42:09 -05:00
committed by ravi-signal
parent 51569ce0a5
commit ab4fc4f459
11 changed files with 128 additions and 19 deletions

View File

@@ -668,7 +668,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
final AccountAuthenticator accountAuthenticator = new AccountAuthenticator(accountsManager);
final MessageSender messageSender = new MessageSender(messagesManager, pushNotificationManager);
final MessageSender messageSender = new MessageSender(messagesManager, pushNotificationManager, experimentEnrollmentManager);
final ReceiptSender receiptSender = new ReceiptSender(accountsManager, messageSender, receiptSenderExecutor);
final CloudflareTurnCredentialsManager cloudflareTurnCredentialsManager = new CloudflareTurnCredentialsManager(
config.getTurnConfiguration().cloudflare().apiToken().value(),
@@ -988,7 +988,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
webSocketEnvironment.setConnectListener(
new AuthenticatedConnectListener(receiptSender, messagesManager, messageMetrics, pushNotificationManager,
pushNotificationScheduler, webSocketConnectionEventManager, websocketScheduledExecutor,
messageDeliveryScheduler, clientReleaseManager, messageDeliveryLoopMonitor));
messageDeliveryScheduler, clientReleaseManager, messageDeliveryLoopMonitor, experimentEnrollmentManager));
webSocketEnvironment.jersey()
.register(new WebsocketRefreshApplicationEventListener(accountsManager, disconnectionRequestManager));
webSocketEnvironment.jersey().register(new RateLimitByIpFilter(rateLimiters));

View File

@@ -24,16 +24,19 @@ import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.signal.libsignal.protocol.SealedSenderMultiRecipientMessage;
import org.signal.libsignal.protocol.util.Pair;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.controllers.MessageController;
import org.whispersystems.textsecuregcm.controllers.MismatchedDevices;
import org.whispersystems.textsecuregcm.controllers.MismatchedDevicesException;
import org.whispersystems.textsecuregcm.controllers.MultiRecipientMismatchedDevicesException;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.identity.IdentityType;
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.util.Util;
@@ -52,6 +55,9 @@ public class MessageSender {
private final MessagesManager messagesManager;
private final PushNotificationManager pushNotificationManager;
private final ExperimentEnrollmentManager experimentEnrollmentManager;
public static final String ANDROID_SKIP_LOW_URGENCY_PUSH_EXPERIMENT = "androidSkipLowUrgencyPush";
// Note that these names deliberately reference `MessageController` for metric continuity
private static final String REJECT_OVERSIZE_MESSAGE_COUNTER_NAME = name(MessageController.class, "rejectOversizeMessage");
@@ -72,9 +78,13 @@ public class MessageSender {
@VisibleForTesting
static final byte NO_EXCLUDED_DEVICE_ID = -1;
public MessageSender(final MessagesManager messagesManager, final PushNotificationManager pushNotificationManager) {
public MessageSender(
final MessagesManager messagesManager,
final PushNotificationManager pushNotificationManager,
final ExperimentEnrollmentManager experimentEnrollmentManager) {
this.messagesManager = messagesManager;
this.pushNotificationManager = pushNotificationManager;
this.experimentEnrollmentManager = experimentEnrollmentManager;
}
/**
@@ -145,7 +155,7 @@ public class MessageSender {
.forEach((deviceId, destinationPresent) -> {
final Envelope message = messagesByDeviceId.get(deviceId);
if (!destinationPresent && !message.getEphemeral()) {
if (!destinationPresent && !message.getEphemeral() && !shouldSkipPush(destination, deviceId, message.getUrgent())) {
try {
pushNotificationManager.sendNewMessageNotification(destination, deviceId, message.getUrgent());
} catch (final NotPushRegisteredException ignored) {
@@ -165,6 +175,13 @@ public class MessageSender {
});
}
private boolean shouldSkipPush(final Account account, byte deviceId, boolean urgent) {
final boolean isAndroidFcm = account.getDevice(deviceId).map(Device::getGcmId).isPresent();
return !urgent
&& isAndroidFcm
&& experimentEnrollmentManager.isEnrolled(account.getUuid(), ANDROID_SKIP_LOW_URGENCY_PUSH_EXPERIMENT);
}
/**
* Sends messages to a group of recipients. If a destination device has a valid push notification token and does not
* have an active connection to a Signal server, then this method will also send a push notification to that device to

View File

@@ -27,8 +27,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.identity.IdentityType;
import org.whispersystems.textsecuregcm.metrics.DevicePlatformUtil;
import org.whispersystems.textsecuregcm.push.MessageSender;
import org.whispersystems.textsecuregcm.util.Util;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -41,6 +43,7 @@ public class MessagePersister implements Managed {
private final MessagesManager messagesManager;
private final AccountsManager accountsManager;
private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
private final ExperimentEnrollmentManager experimentEnrollmentManager;
private final Duration persistDelay;
@@ -78,6 +81,7 @@ public class MessagePersister implements Managed {
final MessagesManager messagesManager,
final AccountsManager accountsManager,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
final ExperimentEnrollmentManager experimentEnrollmentManager,
final Duration persistDelay,
final int dedicatedProcessWorkerThreadCount) {
@@ -85,6 +89,7 @@ public class MessagePersister implements Managed {
this.messagesManager = messagesManager;
this.accountsManager = accountsManager;
this.dynamicConfigurationManager = dynamicConfigurationManager;
this.experimentEnrollmentManager = experimentEnrollmentManager;
this.persistDelay = persistDelay;
this.workerThreads = new Thread[dedicatedProcessWorkerThreadCount];
@@ -234,8 +239,11 @@ public class MessagePersister implements Managed {
} while (!messages.isEmpty());
final boolean inSkipExperiment = device.getGcmId() != null && experimentEnrollmentManager.isEnrolled(
accountUuid,
MessageSender.ANDROID_SKIP_LOW_URGENCY_PUSH_EXPERIMENT);
DistributionSummary.builder(QUEUE_SIZE_DISTRIBUTION_SUMMARY_NAME)
.tags(Tags.of(platformTag))
.tags(Tags.of(platformTag).and("lowUrgencySkip", Boolean.toString(inSkipExperiment)))
.publishPercentileHistogram(true)
.register(Metrics.globalRegistry)
.record(messageCount);

View File

@@ -12,6 +12,7 @@ import java.util.concurrent.ScheduledExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.AuthenticatedDevice;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.limits.MessageDeliveryLoopMonitor;
import org.whispersystems.textsecuregcm.metrics.MessageMetrics;
import org.whispersystems.textsecuregcm.metrics.OpenWebSocketCounter;
@@ -45,6 +46,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
private final Scheduler messageDeliveryScheduler;
private final ClientReleaseManager clientReleaseManager;
private final MessageDeliveryLoopMonitor messageDeliveryLoopMonitor;
private final ExperimentEnrollmentManager experimentEnrollmentManager;
private final OpenWebSocketCounter openAuthenticatedWebSocketCounter;
private final OpenWebSocketCounter openUnauthenticatedWebSocketCounter;
@@ -58,7 +60,8 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
ScheduledExecutorService scheduledExecutorService,
Scheduler messageDeliveryScheduler,
ClientReleaseManager clientReleaseManager,
MessageDeliveryLoopMonitor messageDeliveryLoopMonitor) {
MessageDeliveryLoopMonitor messageDeliveryLoopMonitor,
final ExperimentEnrollmentManager experimentEnrollmentManager) {
this.receiptSender = receiptSender;
this.messagesManager = messagesManager;
this.messageMetrics = messageMetrics;
@@ -69,6 +72,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
this.messageDeliveryScheduler = messageDeliveryScheduler;
this.clientReleaseManager = clientReleaseManager;
this.messageDeliveryLoopMonitor = messageDeliveryLoopMonitor;
this.experimentEnrollmentManager = experimentEnrollmentManager;
openAuthenticatedWebSocketCounter =
new OpenWebSocketCounter(OPEN_WEBSOCKET_GAUGE_NAME, CONNECTED_DURATION_TIMER_NAME, Tags.of(AUTHENTICATED_TAG_NAME, "true"));
@@ -98,7 +102,8 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
scheduledExecutorService,
messageDeliveryScheduler,
clientReleaseManager,
messageDeliveryLoopMonitor);
messageDeliveryLoopMonitor,
experimentEnrollmentManager);
context.addWebsocketClosedListener((closingContext, statusCode, reason) -> {
// We begin the shutdown process by removing this client's "presence," which means it will again begin to

View File

@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.AuthenticatedDevice;
import org.whispersystems.textsecuregcm.controllers.MessageController;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier;
import org.whispersystems.textsecuregcm.identity.IdentityType;
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
@@ -46,6 +47,7 @@ import org.whispersystems.textsecuregcm.limits.MessageDeliveryLoopMonitor;
import org.whispersystems.textsecuregcm.metrics.MessageMetrics;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
import org.whispersystems.textsecuregcm.push.MessageSender;
import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventListener;
import org.whispersystems.textsecuregcm.push.PushNotificationManager;
import org.whispersystems.textsecuregcm.push.PushNotificationScheduler;
@@ -120,6 +122,7 @@ public class WebSocketConnection implements WebSocketConnectionEventListener {
private final PushNotificationManager pushNotificationManager;
private final PushNotificationScheduler pushNotificationScheduler;
private final MessageDeliveryLoopMonitor messageDeliveryLoopMonitor;
private final ExperimentEnrollmentManager experimentEnrollmentManager;
private final AuthenticatedDevice auth;
private final WebSocketClient client;
@@ -159,7 +162,8 @@ public class WebSocketConnection implements WebSocketConnectionEventListener {
ScheduledExecutorService scheduledExecutorService,
Scheduler messageDeliveryScheduler,
ClientReleaseManager clientReleaseManager,
MessageDeliveryLoopMonitor messageDeliveryLoopMonitor) {
MessageDeliveryLoopMonitor messageDeliveryLoopMonitor,
ExperimentEnrollmentManager experimentEnrollmentManager) {
this(receiptSender,
messagesManager,
@@ -172,7 +176,7 @@ public class WebSocketConnection implements WebSocketConnectionEventListener {
scheduledExecutorService,
messageDeliveryScheduler,
clientReleaseManager,
messageDeliveryLoopMonitor);
messageDeliveryLoopMonitor, experimentEnrollmentManager);
}
@VisibleForTesting
@@ -187,7 +191,8 @@ public class WebSocketConnection implements WebSocketConnectionEventListener {
ScheduledExecutorService scheduledExecutorService,
Scheduler messageDeliveryScheduler,
ClientReleaseManager clientReleaseManager,
MessageDeliveryLoopMonitor messageDeliveryLoopMonitor) {
MessageDeliveryLoopMonitor messageDeliveryLoopMonitor,
ExperimentEnrollmentManager experimentEnrollmentManager) {
this.receiptSender = receiptSender;
this.messagesManager = messagesManager;
@@ -201,6 +206,7 @@ public class WebSocketConnection implements WebSocketConnectionEventListener {
this.messageDeliveryScheduler = messageDeliveryScheduler;
this.clientReleaseManager = clientReleaseManager;
this.messageDeliveryLoopMonitor = messageDeliveryLoopMonitor;
this.experimentEnrollmentManager = experimentEnrollmentManager;
}
public void start() {
@@ -331,7 +337,13 @@ public class WebSocketConnection implements WebSocketConnectionEventListener {
// Cleared the queue! Send a queue empty message if we need to
consecutiveRetries.set(0);
if (sentInitialQueueEmptyMessage.compareAndSet(false, true)) {
final Tags tags = Tags.of(UserAgentTagUtil.getPlatformTag(client.getUserAgent()));
final boolean inSkipExperiment = auth.getAuthenticatedDevice().getGcmId() != null && experimentEnrollmentManager.isEnrolled(
auth.getAccount().getUuid(),
MessageSender.ANDROID_SKIP_LOW_URGENCY_PUSH_EXPERIMENT);
final Tags tags = Tags
.of(UserAgentTagUtil.getPlatformTag(client.getUserAgent()))
.and("lowUrgencySkip", Boolean.toString(inSkipExperiment));
final long drainDuration = System.currentTimeMillis() - queueDrainStartTime.get();
Metrics.summary(INITIAL_QUEUE_LENGTH_DISTRIBUTION_NAME, tags).record(sentMessageCounter.sum());

View File

@@ -14,6 +14,7 @@ import java.time.Duration;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.storage.MessagePersister;
import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler;
@@ -64,6 +65,7 @@ public class MessagePersisterServiceCommand extends ServerCommand<WhisperServerC
deps.messagesManager(),
deps.accountsManager(),
deps.dynamicConfigurationManager(),
new ExperimentEnrollmentManager(deps.dynamicConfigurationManager()),
Duration.ofMinutes(configuration.getMessageCacheConfiguration().getPersistDelayMinutes()),
namespace.getInt(WORKER_COUNT));