mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-19 15:58:03 +01:00
Add ZeroTtlDevicePushNotificationExperiment
This commit is contained in:
@@ -265,6 +265,8 @@ import org.whispersystems.textsecuregcm.workers.BackupMetricsCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.CertificateCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.CheckDynamicConfigurationCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.DeleteUserCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.DiscardPushNotificationExperimentSamplesCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.FinishPushNotificationExperimentCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.IdleDeviceNotificationSchedulerFactory;
|
||||
import org.whispersystems.textsecuregcm.workers.MessagePersisterServiceCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.NotifyIdleDevicesCommand;
|
||||
@@ -277,7 +279,10 @@ import org.whispersystems.textsecuregcm.workers.ScheduledApnPushNotificationSend
|
||||
import org.whispersystems.textsecuregcm.workers.ServerVersionCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.SetRequestLoggingEnabledTask;
|
||||
import org.whispersystems.textsecuregcm.workers.SetUserDiscoverabilityCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.StartPushNotificationExperimentCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.UnlinkDeviceCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.ZeroTtlExperimentNotificationSchedulerFactory;
|
||||
import org.whispersystems.textsecuregcm.workers.ZeroTtlPushNotificationExperimentFactory;
|
||||
import org.whispersystems.textsecuregcm.workers.ZkParamsCommand;
|
||||
import org.whispersystems.websocket.WebSocketResourceProviderFactory;
|
||||
import org.whispersystems.websocket.setup.WebSocketEnvironment;
|
||||
@@ -331,6 +336,25 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
bootstrap.addCommand(new BackupMetricsCommand(Clock.systemUTC()));
|
||||
bootstrap.addCommand(new RemoveExpiredLinkedDevicesCommand());
|
||||
bootstrap.addCommand(new NotifyIdleDevicesCommand());
|
||||
|
||||
bootstrap.addCommand(new StartPushNotificationExperimentCommand<>("start-zero-ttl-push-notification-experiment",
|
||||
"Start an experiment to send push notifications with ttl=0 to idle android devices",
|
||||
new ZeroTtlPushNotificationExperimentFactory()));
|
||||
|
||||
bootstrap.addCommand(
|
||||
new FinishPushNotificationExperimentCommand<>("finish-zero-ttl-push-notification-experiment",
|
||||
"Finish an experiment to send push notifications with ttl=0 to idle android devices",
|
||||
new ZeroTtlPushNotificationExperimentFactory()));
|
||||
|
||||
bootstrap.addCommand(
|
||||
new DiscardPushNotificationExperimentSamplesCommand("discard-zero-ttl-push-notification-experiment",
|
||||
"Discard samples from the \"zero TTL push notification\" experiment",
|
||||
new ZeroTtlPushNotificationExperimentFactory()));
|
||||
|
||||
bootstrap.addCommand(new ProcessScheduledJobsServiceCommand("process-zero-ttl-notification-jobs",
|
||||
"Processes scheduled jobs to send zero-ttl experiment notifications to idle devices",
|
||||
new ZeroTtlExperimentNotificationSchedulerFactory()));
|
||||
|
||||
bootstrap.addCommand(new ProcessScheduledJobsServiceCommand("process-idle-device-notification-jobs",
|
||||
"Processes scheduled jobs to send notifications to idle devices",
|
||||
new IdleDeviceNotificationSchedulerFactory()));
|
||||
|
||||
@@ -6,18 +6,17 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.workers.IdleWakeupEligibilityChecker;
|
||||
import reactor.core.publisher.Flux;
|
||||
import javax.annotation.Nullable;
|
||||
import java.time.Clock;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
abstract class IdleDevicePushNotificationExperiment implements PushNotificationExperiment<DeviceLastSeenState> {
|
||||
|
||||
private final Clock clock;
|
||||
private final IdleWakeupEligibilityChecker idleWakeupEligibilityChecker;
|
||||
|
||||
private final Logger log = LoggerFactory.getLogger(getClass());
|
||||
|
||||
@@ -37,19 +36,8 @@ abstract class IdleDevicePushNotificationExperiment implements PushNotificationE
|
||||
UNCHANGED
|
||||
}
|
||||
|
||||
protected IdleDevicePushNotificationExperiment(final Clock clock) {
|
||||
this.clock = clock;
|
||||
}
|
||||
|
||||
protected abstract Duration getMinIdleDuration();
|
||||
|
||||
protected abstract Duration getMaxIdleDuration();
|
||||
|
||||
@VisibleForTesting
|
||||
boolean isIdle(final Device device) {
|
||||
final Duration idleDuration = Duration.between(Instant.ofEpochMilli(device.getLastSeen()), clock.instant());
|
||||
|
||||
return idleDuration.compareTo(getMinIdleDuration()) >= 0 && idleDuration.compareTo(getMaxIdleDuration()) < 0;
|
||||
protected IdleDevicePushNotificationExperiment(final IdleWakeupEligibilityChecker idleWakeupEligibilityChecker) {
|
||||
this.idleWakeupEligibilityChecker = idleWakeupEligibilityChecker;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
@@ -57,11 +45,18 @@ abstract class IdleDevicePushNotificationExperiment implements PushNotificationE
|
||||
return !StringUtils.isAllBlank(device.getApnId(), device.getGcmId());
|
||||
}
|
||||
|
||||
abstract boolean isIdleDeviceEligible(final Account account, final Device idleDevice, final DeviceLastSeenState state);
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> isDeviceEligible(final Account account, final Device device) {
|
||||
return idleWakeupEligibilityChecker.isDeviceEligible(account, device).thenApply(idle ->
|
||||
idle && isIdleDeviceEligible(account, device, getState(account, device)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeviceLastSeenState getState(@Nullable final Account account, @Nullable final Device device) {
|
||||
if (account != null && device != null) {
|
||||
final DeviceLastSeenState.PushTokenType pushTokenType;
|
||||
|
||||
if (StringUtils.isNotBlank(device.getApnId())) {
|
||||
pushTokenType = DeviceLastSeenState.PushTokenType.APNS;
|
||||
} else if (StringUtils.isNotBlank(device.getGcmId())) {
|
||||
@@ -69,7 +64,6 @@ abstract class IdleDevicePushNotificationExperiment implements PushNotificationE
|
||||
} else {
|
||||
pushTokenType = null;
|
||||
}
|
||||
|
||||
return new DeviceLastSeenState(true, device.getCreated(), hasPushToken(device), device.getLastSeen(), pushTokenType);
|
||||
} else {
|
||||
return DeviceLastSeenState.MISSING_DEVICE_STATE;
|
||||
|
||||
@@ -0,0 +1,51 @@
|
||||
/*
|
||||
* Copyright 2025 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.experiment;
|
||||
|
||||
import org.whispersystems.textsecuregcm.push.ZeroTtlNotificationScheduler;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.workers.IdleWakeupEligibilityChecker;
|
||||
import java.time.LocalTime;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
public class ZeroTtlPushNotificationExperiment extends IdleDevicePushNotificationExperiment {
|
||||
private static final LocalTime PREFERRED_NOTIFICATION_TIME = LocalTime.of(14, 0);
|
||||
|
||||
private final ZeroTtlNotificationScheduler zeroTtlNotificationScheduler;
|
||||
|
||||
public ZeroTtlPushNotificationExperiment(
|
||||
final IdleWakeupEligibilityChecker idleWakeupEligibilityChecker,
|
||||
final ZeroTtlNotificationScheduler zeroTtlNotificationScheduler) {
|
||||
super(idleWakeupEligibilityChecker);
|
||||
this.zeroTtlNotificationScheduler = zeroTtlNotificationScheduler;
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isIdleDeviceEligible(final Account account, final Device idleDevice, final DeviceLastSeenState state) {
|
||||
return state.pushTokenType() == DeviceLastSeenState.PushTokenType.FCM;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getExperimentName() {
|
||||
return "zero-ttl-notification";
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<DeviceLastSeenState> getStateClass() {
|
||||
return DeviceLastSeenState.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> applyExperimentTreatment(final Account account, final Device device) {
|
||||
return zeroTtlNotificationScheduler.scheduleNotification(account, device, PREFERRED_NOTIFICATION_TIME, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> applyControlTreatment(final Account account, final Device device) {
|
||||
return zeroTtlNotificationScheduler.scheduleNotification(account, device, PREFERRED_NOTIFICATION_TIME, false);
|
||||
}
|
||||
}
|
||||
@@ -78,11 +78,13 @@ public class FcmSender implements PushNotificationSender {
|
||||
|
||||
@Override
|
||||
public CompletableFuture<SendPushNotificationResult> sendNotification(PushNotification pushNotification) {
|
||||
final AndroidConfig.Builder androidConfig = AndroidConfig.builder()
|
||||
.setPriority(pushNotification.urgent() ? AndroidConfig.Priority.HIGH : AndroidConfig.Priority.NORMAL);
|
||||
pushNotification.ttl().ifPresent(androidConfig::setTtl);
|
||||
|
||||
Message.Builder builder = Message.builder()
|
||||
.setToken(pushNotification.deviceToken())
|
||||
.setAndroidConfig(AndroidConfig.builder()
|
||||
.setPriority(pushNotification.urgent() ? AndroidConfig.Priority.HIGH : AndroidConfig.Priority.NORMAL)
|
||||
.build());
|
||||
.setAndroidConfig(androidConfig.build());
|
||||
|
||||
final String key = switch (pushNotification.notificationType()) {
|
||||
case NOTIFICATION -> "newMessageAlert";
|
||||
|
||||
@@ -8,6 +8,7 @@ package org.whispersystems.textsecuregcm.push;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Optional;
|
||||
|
||||
public record PushNotification(String deviceToken,
|
||||
TokenType tokenType,
|
||||
@@ -15,7 +16,8 @@ public record PushNotification(String deviceToken,
|
||||
@Nullable String data,
|
||||
@Nullable Account destination,
|
||||
@Nullable Device destinationDevice,
|
||||
boolean urgent) {
|
||||
boolean urgent,
|
||||
Optional<Long> ttl) {
|
||||
|
||||
public enum NotificationType {
|
||||
NOTIFICATION,
|
||||
|
||||
@@ -51,11 +51,19 @@ public class PushNotificationManager {
|
||||
final Pair<String, PushNotification.TokenType> tokenAndType = getToken(device);
|
||||
|
||||
return sendNotification(new PushNotification(tokenAndType.first(), tokenAndType.second(),
|
||||
PushNotification.NotificationType.NOTIFICATION, null, destination, device, urgent));
|
||||
PushNotification.NotificationType.NOTIFICATION, null, destination, device, urgent, Optional.empty()));
|
||||
}
|
||||
|
||||
public CompletableFuture<Optional<SendPushNotificationResult>> sendNewMessageNotificationWithTtl(final Account destination, final byte destinationDeviceId, final boolean urgent, long ttl) throws NotPushRegisteredException {
|
||||
final Device device = destination.getDevice(destinationDeviceId).orElseThrow(NotPushRegisteredException::new);
|
||||
final Pair<String, PushNotification.TokenType> tokenAndType = getToken(device);
|
||||
|
||||
return sendNotification(new PushNotification(tokenAndType.first(), tokenAndType.second(),
|
||||
PushNotification.NotificationType.NOTIFICATION, null, destination, device, urgent, Optional.of(ttl)));
|
||||
}
|
||||
|
||||
public CompletableFuture<SendPushNotificationResult> sendRegistrationChallengeNotification(final String deviceToken, final PushNotification.TokenType tokenType, final String challengeToken) {
|
||||
return sendNotification(new PushNotification(deviceToken, tokenType, PushNotification.NotificationType.CHALLENGE, challengeToken, null, null, true))
|
||||
return sendNotification(new PushNotification(deviceToken, tokenType, PushNotification.NotificationType.CHALLENGE, challengeToken, null, null, true, Optional.empty()))
|
||||
.thenApply(maybeResponse -> maybeResponse.orElseThrow(() -> new AssertionError("Responses must be present for urgent notifications")));
|
||||
}
|
||||
|
||||
@@ -66,7 +74,7 @@ public class PushNotificationManager {
|
||||
final Pair<String, PushNotification.TokenType> tokenAndType = getToken(device);
|
||||
|
||||
return sendNotification(new PushNotification(tokenAndType.first(), tokenAndType.second(),
|
||||
PushNotification.NotificationType.RATE_LIMIT_CHALLENGE, challengeToken, destination, device, true))
|
||||
PushNotification.NotificationType.RATE_LIMIT_CHALLENGE, challengeToken, destination, device, true, Optional.empty()))
|
||||
.thenApply(maybeResponse -> maybeResponse.orElseThrow(() -> new AssertionError("Responses must be present for urgent notifications")));
|
||||
}
|
||||
|
||||
@@ -76,7 +84,7 @@ public class PushNotificationManager {
|
||||
|
||||
return sendNotification(new PushNotification(tokenAndType.first(), tokenAndType.second(),
|
||||
PushNotification.NotificationType.ATTEMPT_LOGIN_NOTIFICATION_HIGH_PRIORITY,
|
||||
context, destination, device, true))
|
||||
context, destination, device, true, Optional.empty()))
|
||||
.thenApply(maybeResponse -> maybeResponse.orElseThrow(() -> new AssertionError("Responses must be present for urgent notifications")));
|
||||
}
|
||||
|
||||
|
||||
@@ -286,7 +286,7 @@ public class PushNotificationScheduler implements Managed {
|
||||
return pushSchedulingCluster.withCluster(connection -> connection.async().set(
|
||||
getLastBackgroundApnsNotificationTimestampKey(account, device),
|
||||
String.valueOf(clock.millis()), new SetArgs().ex(BACKGROUND_NOTIFICATION_PERIOD)))
|
||||
.thenCompose(ignored -> apnSender.sendNotification(new PushNotification(device.getApnId(), PushNotification.TokenType.APN, PushNotification.NotificationType.NOTIFICATION, null, account, device, false)))
|
||||
.thenCompose(ignored -> apnSender.sendNotification(new PushNotification(device.getApnId(), PushNotification.TokenType.APN, PushNotification.NotificationType.NOTIFICATION, null, account, device, false, Optional.empty())))
|
||||
.thenAccept(response -> Metrics.counter(BACKGROUND_NOTIFICATION_SENT_COUNTER_NAME,
|
||||
ACCEPTED_TAG, String.valueOf(response.accepted()))
|
||||
.increment())
|
||||
@@ -308,7 +308,8 @@ public class PushNotificationScheduler implements Managed {
|
||||
null,
|
||||
account,
|
||||
device,
|
||||
true);
|
||||
true,
|
||||
Optional.empty());
|
||||
|
||||
final PushNotificationSender pushNotificationSender = isApnsDevice ? apnSender : fcmSender;
|
||||
|
||||
|
||||
@@ -0,0 +1,99 @@
|
||||
package org.whispersystems.textsecuregcm.push;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import java.io.IOException;
|
||||
import java.time.Clock;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalTime;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import javax.annotation.Nullable;
|
||||
import org.whispersystems.textsecuregcm.identity.IdentityType;
|
||||
import org.whispersystems.textsecuregcm.scheduler.JobScheduler;
|
||||
import org.whispersystems.textsecuregcm.scheduler.SchedulingUtil;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.util.SystemMapper;
|
||||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
||||
|
||||
public class ZeroTtlNotificationScheduler extends JobScheduler {
|
||||
|
||||
private final AccountsManager accountsManager;
|
||||
private final PushNotificationManager pushNotificationManager;
|
||||
private final Clock clock;
|
||||
|
||||
@VisibleForTesting
|
||||
record JobDescriptor(UUID accountIdentifier, byte deviceId, long lastSeen, boolean zeroTtl) {}
|
||||
|
||||
public ZeroTtlNotificationScheduler(
|
||||
final AccountsManager accountsManager,
|
||||
final PushNotificationManager pushNotificationManager,
|
||||
final DynamoDbAsyncClient dynamoDbAsyncClient,
|
||||
final String tableName,
|
||||
final Duration jobExpiration,
|
||||
final Clock clock) {
|
||||
|
||||
super(dynamoDbAsyncClient, tableName, jobExpiration, clock);
|
||||
|
||||
this.accountsManager = accountsManager;
|
||||
this.pushNotificationManager = pushNotificationManager;
|
||||
this.clock = clock;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSchedulerName() {
|
||||
return "ZeroTtlNotification";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CompletableFuture<String> processJob(@Nullable final byte[] jobData) {
|
||||
final JobDescriptor jobDescriptor;
|
||||
|
||||
try {
|
||||
jobDescriptor = SystemMapper.jsonMapper().readValue(jobData, JobDescriptor.class);
|
||||
} catch (final IOException e) {
|
||||
return CompletableFuture.failedFuture(e);
|
||||
}
|
||||
|
||||
return accountsManager.getByAccountIdentifierAsync(jobDescriptor.accountIdentifier())
|
||||
.thenCompose(maybeAccount -> maybeAccount.map(account ->
|
||||
account.getDevice(jobDescriptor.deviceId()).map(device -> {
|
||||
if (jobDescriptor.lastSeen() != device.getLastSeen()) {
|
||||
return CompletableFuture.completedFuture("deviceSeenRecently");
|
||||
}
|
||||
|
||||
try {
|
||||
return sendNotification(account, jobDescriptor)
|
||||
.thenApply(ignored -> "sent");
|
||||
} catch (final NotPushRegisteredException e) {
|
||||
return CompletableFuture.completedFuture("deviceTokenDeleted");
|
||||
}
|
||||
})
|
||||
.orElse(CompletableFuture.completedFuture("deviceDeleted")))
|
||||
.orElse(CompletableFuture.completedFuture("accountDeleted")));
|
||||
}
|
||||
|
||||
private CompletableFuture<Optional<SendPushNotificationResult>> sendNotification(final Account account,
|
||||
final JobDescriptor jobDescriptor) throws NotPushRegisteredException {
|
||||
return jobDescriptor.zeroTtl()
|
||||
? pushNotificationManager.sendNewMessageNotificationWithTtl(account, jobDescriptor.deviceId(), true, 0L)
|
||||
: pushNotificationManager.sendNewMessageNotification(account, jobDescriptor.deviceId(), true);
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> scheduleNotification(final Account account, final Device device,
|
||||
final LocalTime preferredDeliveryTime, boolean zeroTtl) {
|
||||
final Instant runAt = SchedulingUtil.getNextRecommendedNotificationTime(account, preferredDeliveryTime, clock);
|
||||
|
||||
try {
|
||||
return scheduleJob(runAt, SystemMapper.jsonMapper().writeValueAsBytes(
|
||||
new JobDescriptor(account.getIdentifier(IdentityType.ACI), device.getId(), device.getLastSeen(), zeroTtl)));
|
||||
} catch (final JsonProcessingException e) {
|
||||
// This should never happen when serializing an `AccountAndDeviceIdentifier`
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,96 @@
|
||||
/*
|
||||
* Copyright 2025 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.workers;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import java.time.Clock;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.whispersystems.textsecuregcm.identity.IdentityType;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
/**
|
||||
* Checks if a device may benefit from receiving a push notification
|
||||
*/
|
||||
public class IdleWakeupEligibilityChecker {
|
||||
|
||||
@VisibleForTesting
|
||||
static final Duration MIN_SHORT_IDLE_DURATION = Duration.ofDays(3);
|
||||
|
||||
@VisibleForTesting
|
||||
static final Duration MAX_SHORT_IDLE_DURATION = Duration.ofDays(30);
|
||||
|
||||
@VisibleForTesting
|
||||
static final Duration MIN_LONG_IDLE_DURATION = Duration.ofDays(60);
|
||||
|
||||
@VisibleForTesting
|
||||
static final Duration MAX_LONG_IDLE_DURATION = Duration.ofDays(75);
|
||||
|
||||
private final MessagesManager messagesManager;
|
||||
private final Clock clock;
|
||||
|
||||
public IdleWakeupEligibilityChecker(final Clock clock, final MessagesManager messagesManager) {
|
||||
this.messagesManager = messagesManager;
|
||||
this.clock = clock;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine whether the device may benefit from a push notification.
|
||||
*
|
||||
* @param account The account to check
|
||||
* @param device The device to check
|
||||
* @return true if the device may benefit from a push notification, otherwise false
|
||||
* @implNote There are two populations that may benefit from a wakeup:
|
||||
* <ol>
|
||||
* <li> Devices that have only been idle for a little while, but have messages that they don't seem to be retrieving
|
||||
* <li> Devices that have been idle for a long time, but don't have any messages
|
||||
* </ol>
|
||||
* We think the first group sometimes just needs a little nudge to wake up and get their messages, and the latter
|
||||
* group generally WOULD get their messages if they had any. We want to notify the first group to prompt them to
|
||||
* actually get their messages and the latter group to prevent them from getting deleted due to inactivity (since they
|
||||
* are otherwise healthy installations that just aren't getting much traffic).
|
||||
*/
|
||||
public CompletableFuture<Boolean> isDeviceEligible(final Account account, final Device device) {
|
||||
|
||||
if (!hasPushToken(device)) {
|
||||
return CompletableFuture.completedFuture(false);
|
||||
}
|
||||
|
||||
if (isShortIdle(device, clock)) {
|
||||
return messagesManager.mayHaveUrgentPersistedMessages(account.getIdentifier(IdentityType.ACI), device);
|
||||
} else if (isLongIdle(device, clock)) {
|
||||
return messagesManager.mayHavePersistedMessages(account.getIdentifier(IdentityType.ACI), device)
|
||||
.thenApply(mayHavePersistedMessages -> !mayHavePersistedMessages);
|
||||
} else {
|
||||
return CompletableFuture.completedFuture(false);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static boolean isShortIdle(final Device device, final Clock clock) {
|
||||
final Duration idleDuration = Duration.between(Instant.ofEpochMilli(device.getLastSeen()), clock.instant());
|
||||
|
||||
return idleDuration.compareTo(MIN_SHORT_IDLE_DURATION) >= 0 && idleDuration.compareTo(MAX_SHORT_IDLE_DURATION) < 0;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static boolean isLongIdle(final Device device, final Clock clock) {
|
||||
final Duration idleDuration = Duration.between(Instant.ofEpochMilli(device.getLastSeen()), clock.instant());
|
||||
|
||||
return idleDuration.compareTo(MIN_LONG_IDLE_DURATION) >= 0 && idleDuration.compareTo(MAX_LONG_IDLE_DURATION) < 0;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static boolean hasPushToken(final Device device) {
|
||||
return !StringUtils.isAllBlank(device.getApnId(), device.getGcmId());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -4,7 +4,6 @@ import com.google.common.annotations.VisibleForTesting;
|
||||
import io.micrometer.core.instrument.Counter;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import net.sourceforge.argparse4j.inf.Subparser;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.configuration.DynamoDbTables;
|
||||
@@ -18,8 +17,6 @@ import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.util.function.Tuples;
|
||||
import java.time.Clock;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalTime;
|
||||
|
||||
public class NotifyIdleDevicesCommand extends AbstractSinglePassCrawlAccountsCommand {
|
||||
@@ -35,18 +32,6 @@ public class NotifyIdleDevicesCommand extends AbstractSinglePassCrawlAccountsCom
|
||||
@VisibleForTesting
|
||||
static final LocalTime PREFERRED_NOTIFICATION_TIME = LocalTime.of(14, 0);
|
||||
|
||||
@VisibleForTesting
|
||||
static final Duration MIN_SHORT_IDLE_DURATION = Duration.ofDays(3);
|
||||
|
||||
@VisibleForTesting
|
||||
static final Duration MAX_SHORT_IDLE_DURATION = Duration.ofDays(30);
|
||||
|
||||
@VisibleForTesting
|
||||
static final Duration MIN_LONG_IDLE_DURATION = Duration.ofDays(60);
|
||||
|
||||
@VisibleForTesting
|
||||
static final Duration MAX_LONG_IDLE_DURATION = Duration.ofDays(75);
|
||||
|
||||
private static final Counter DEVICE_INSPECTED_COUNTER =
|
||||
Metrics.counter(MetricsUtil.name(NotifyIdleDevicesCommand.class, "deviceInspected"));
|
||||
|
||||
@@ -87,12 +72,15 @@ public class NotifyIdleDevicesCommand extends AbstractSinglePassCrawlAccountsCom
|
||||
final MessagesManager messagesManager = getCommandDependencies().messagesManager();
|
||||
final IdleDeviceNotificationScheduler idleDeviceNotificationScheduler = buildIdleDeviceNotificationScheduler();
|
||||
final Clock clock = getClock();
|
||||
final IdleWakeupEligibilityChecker idleWakeupEligibilityChecker = new IdleWakeupEligibilityChecker(clock, messagesManager);
|
||||
|
||||
accounts
|
||||
.flatMap(account -> Flux.fromIterable(account.getDevices()).map(device -> Tuples.of(account, device)))
|
||||
.doOnNext(ignored -> DEVICE_INSPECTED_COUNTER.increment())
|
||||
.flatMap(accountAndDevice -> isDeviceEligible(accountAndDevice.getT1(), accountAndDevice.getT2(), messagesManager, clock)
|
||||
.mapNotNull(eligible -> eligible ? accountAndDevice : null), maxConcurrency)
|
||||
.flatMap(accountAndDevice -> Mono.fromFuture(() ->
|
||||
idleWakeupEligibilityChecker.isDeviceEligible(accountAndDevice.getT1(), accountAndDevice.getT2()))
|
||||
.mapNotNull(eligible -> eligible ? accountAndDevice : null),
|
||||
maxConcurrency)
|
||||
.flatMap(accountAndDevice -> {
|
||||
final Account account = accountAndDevice.getT1();
|
||||
final Device device = accountAndDevice.getT2();
|
||||
@@ -136,52 +124,4 @@ public class NotifyIdleDevicesCommand extends AbstractSinglePassCrawlAccountsCom
|
||||
Clock.systemUTC());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static Mono<Boolean> isDeviceEligible(final Account account,
|
||||
final Device device,
|
||||
final MessagesManager messagesManager,
|
||||
final Clock clock) {
|
||||
|
||||
// There are two populations of interest for this crawler:
|
||||
//
|
||||
// 1. Devices that have only been idle for a little while, but have messages that they don't seem to be retrieving
|
||||
// 2. Devices that have been idle for a long time, but don't have any messages
|
||||
//
|
||||
// We think the first group sometimes just needs a little nudge to wake up and get their messages, and the latter
|
||||
// group generally WOULD get their messages if they had any. We want to notify the first group to prompt them to
|
||||
// actually get their messages and the latter group to prevent them from getting deleted due to inactivity (since
|
||||
// they are otherwise healthy installations that just aren't getting much traffic).
|
||||
|
||||
if (!hasPushToken(device)) {
|
||||
return Mono.just(false);
|
||||
}
|
||||
|
||||
if (isShortIdle(device, clock)) {
|
||||
return Mono.fromFuture(messagesManager.mayHaveUrgentPersistedMessages(account.getIdentifier(IdentityType.ACI), device));
|
||||
} else if (isLongIdle(device, clock)) {
|
||||
return Mono.fromFuture(messagesManager.mayHavePersistedMessages(account.getIdentifier(IdentityType.ACI), device))
|
||||
.map(mayHavePersistedMessages -> !mayHavePersistedMessages);
|
||||
} else {
|
||||
return Mono.just(false);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static boolean isShortIdle(final Device device, final Clock clock) {
|
||||
final Duration idleDuration = Duration.between(Instant.ofEpochMilli(device.getLastSeen()), clock.instant());
|
||||
|
||||
return idleDuration.compareTo(MIN_SHORT_IDLE_DURATION) >= 0 && idleDuration.compareTo(MAX_SHORT_IDLE_DURATION) < 0;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static boolean isLongIdle(final Device device, final Clock clock) {
|
||||
final Duration idleDuration = Duration.between(Instant.ofEpochMilli(device.getLastSeen()), clock.instant());
|
||||
|
||||
return idleDuration.compareTo(MIN_LONG_IDLE_DURATION) >= 0 && idleDuration.compareTo(MAX_LONG_IDLE_DURATION) < 0;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static boolean hasPushToken(final Device device) {
|
||||
return !StringUtils.isAllBlank(device.getApnId(), device.getGcmId());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,26 @@
|
||||
package org.whispersystems.textsecuregcm.workers;
|
||||
|
||||
import java.time.Clock;
|
||||
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.DynamoDbTables;
|
||||
import org.whispersystems.textsecuregcm.push.ZeroTtlNotificationScheduler;
|
||||
import org.whispersystems.textsecuregcm.scheduler.JobScheduler;
|
||||
|
||||
public class ZeroTtlExperimentNotificationSchedulerFactory implements JobSchedulerFactory {
|
||||
|
||||
@Override
|
||||
public JobScheduler buildJobScheduler(final CommandDependencies commandDependencies,
|
||||
final WhisperServerConfiguration configuration) {
|
||||
final DynamoDbTables.TableWithExpiration tableConfiguration = configuration.getDynamoDbTables().getScheduledJobs();
|
||||
|
||||
final Clock clock = Clock.systemUTC();
|
||||
|
||||
return new ZeroTtlNotificationScheduler(
|
||||
commandDependencies.accountsManager(),
|
||||
commandDependencies.pushNotificationManager(),
|
||||
commandDependencies.dynamoDbAsyncClient(),
|
||||
tableConfiguration.getTableName(),
|
||||
tableConfiguration.getExpiration(),
|
||||
clock);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
/*
|
||||
* Copyright 2025 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.workers;
|
||||
|
||||
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.DynamoDbTables;
|
||||
import org.whispersystems.textsecuregcm.experiment.DeviceLastSeenState;
|
||||
import org.whispersystems.textsecuregcm.experiment.PushNotificationExperiment;
|
||||
import org.whispersystems.textsecuregcm.experiment.ZeroTtlPushNotificationExperiment;
|
||||
import org.whispersystems.textsecuregcm.push.ZeroTtlNotificationScheduler;
|
||||
import java.time.Clock;
|
||||
|
||||
public class ZeroTtlPushNotificationExperimentFactory implements PushNotificationExperimentFactory<DeviceLastSeenState> {
|
||||
|
||||
@Override
|
||||
public PushNotificationExperiment<DeviceLastSeenState> buildExperiment(final CommandDependencies commandDependencies,
|
||||
final WhisperServerConfiguration configuration) {
|
||||
|
||||
final DynamoDbTables.TableWithExpiration tableConfiguration = configuration.getDynamoDbTables().getScheduledJobs();
|
||||
|
||||
final Clock clock = Clock.systemUTC();
|
||||
|
||||
return new ZeroTtlPushNotificationExperiment(
|
||||
new IdleWakeupEligibilityChecker(clock, commandDependencies.messagesManager()),
|
||||
new ZeroTtlNotificationScheduler(
|
||||
commandDependencies.accountsManager(),
|
||||
commandDependencies.pushNotificationManager(),
|
||||
commandDependencies.dynamoDbAsyncClient(),
|
||||
tableConfiguration.getTableName(),
|
||||
tableConfiguration.getExpiration(),
|
||||
clock));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user