mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-20 07:38:03 +01:00
Generalize "is idle?" check in idle device notification scheduler
This commit is contained in:
committed by
Jon Chambers
parent
46d04d9d1a
commit
1af8bb494e
@@ -26,10 +26,7 @@ public class IdleDeviceNotificationScheduler extends JobScheduler {
|
||||
private final Clock clock;
|
||||
|
||||
@VisibleForTesting
|
||||
static final Duration MIN_IDLE_DURATION = Duration.ofDays(14);
|
||||
|
||||
@VisibleForTesting
|
||||
record AccountAndDeviceIdentifier(UUID accountIdentifier, byte deviceId) {}
|
||||
record JobDescriptor(UUID accountIdentifier, byte deviceId, long lastSeen) {}
|
||||
|
||||
public IdleDeviceNotificationScheduler(final AccountsManager accountsManager,
|
||||
final PushNotificationManager pushNotificationManager,
|
||||
@@ -52,24 +49,24 @@ public class IdleDeviceNotificationScheduler extends JobScheduler {
|
||||
|
||||
@Override
|
||||
protected CompletableFuture<String> processJob(@Nullable final byte[] jobData) {
|
||||
final AccountAndDeviceIdentifier accountAndDeviceIdentifier;
|
||||
final JobDescriptor jobDescriptor;
|
||||
|
||||
try {
|
||||
accountAndDeviceIdentifier = SystemMapper.jsonMapper().readValue(jobData, AccountAndDeviceIdentifier.class);
|
||||
jobDescriptor = SystemMapper.jsonMapper().readValue(jobData, JobDescriptor.class);
|
||||
} catch (final IOException e) {
|
||||
return CompletableFuture.failedFuture(e);
|
||||
}
|
||||
|
||||
return accountsManager.getByAccountIdentifierAsync(accountAndDeviceIdentifier.accountIdentifier())
|
||||
return accountsManager.getByAccountIdentifierAsync(jobDescriptor.accountIdentifier())
|
||||
.thenCompose(maybeAccount -> maybeAccount.map(account ->
|
||||
account.getDevice(accountAndDeviceIdentifier.deviceId()).map(device -> {
|
||||
if (!isIdle(device)) {
|
||||
account.getDevice(jobDescriptor.deviceId()).map(device -> {
|
||||
if (jobDescriptor.lastSeen() != device.getLastSeen()) {
|
||||
return CompletableFuture.completedFuture("deviceSeenRecently");
|
||||
}
|
||||
|
||||
try {
|
||||
return pushNotificationManager
|
||||
.sendNewMessageNotification(account, accountAndDeviceIdentifier.deviceId(), true)
|
||||
.sendNewMessageNotification(account, jobDescriptor.deviceId(), true)
|
||||
.thenApply(ignored -> "sent");
|
||||
} catch (final NotPushRegisteredException e) {
|
||||
return CompletableFuture.completedFuture("deviceTokenDeleted");
|
||||
@@ -79,18 +76,12 @@ public class IdleDeviceNotificationScheduler extends JobScheduler {
|
||||
.orElse(CompletableFuture.completedFuture("accountDeleted")));
|
||||
}
|
||||
|
||||
public boolean isIdle(final Device device) {
|
||||
final Duration idleDuration = Duration.between(Instant.ofEpochMilli(device.getLastSeen()), clock.instant());
|
||||
|
||||
return idleDuration.compareTo(MIN_IDLE_DURATION) >= 0;
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> scheduleNotification(final Account account, final byte deviceId, final LocalTime preferredDeliveryTime) {
|
||||
public CompletableFuture<Void> scheduleNotification(final Account account, final Device device, final LocalTime preferredDeliveryTime) {
|
||||
final Instant runAt = SchedulingUtil.getNextRecommendedNotificationTime(account, preferredDeliveryTime, clock);
|
||||
|
||||
try {
|
||||
return scheduleJob(runAt, SystemMapper.jsonMapper().writeValueAsBytes(
|
||||
new AccountAndDeviceIdentifier(account.getIdentifier(IdentityType.ACI), deviceId)));
|
||||
new JobDescriptor(account.getIdentifier(IdentityType.ACI), device.getId(), device.getLastSeen())));
|
||||
} catch (final JsonProcessingException e) {
|
||||
// This should never happen when serializing an `AccountAndDeviceIdentifier`
|
||||
throw new AssertionError(e);
|
||||
|
||||
@@ -18,6 +18,8 @@ import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.util.function.Tuples;
|
||||
import java.time.Clock;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalTime;
|
||||
|
||||
public class NotifyIdleDevicesWithoutMessagesCommand extends AbstractSinglePassCrawlAccountsCommand {
|
||||
@@ -33,6 +35,12 @@ public class NotifyIdleDevicesWithoutMessagesCommand extends AbstractSinglePassC
|
||||
@VisibleForTesting
|
||||
static final LocalTime PREFERRED_NOTIFICATION_TIME = LocalTime.of(14, 0);
|
||||
|
||||
@VisibleForTesting
|
||||
static final Duration MIN_IDLE_DURATION = Duration.ofDays(15);
|
||||
|
||||
@VisibleForTesting
|
||||
static final Duration MAX_IDLE_DURATION = Duration.ofDays(30);
|
||||
|
||||
private static final Counter DEVICE_INSPECTED_COUNTER =
|
||||
Metrics.counter(MetricsUtil.name(StartPushNotificationExperimentCommand.class, "deviceInspected"));
|
||||
|
||||
@@ -72,11 +80,12 @@ public class NotifyIdleDevicesWithoutMessagesCommand extends AbstractSinglePassC
|
||||
|
||||
final MessagesManager messagesManager = getCommandDependencies().messagesManager();
|
||||
final IdleDeviceNotificationScheduler idleDeviceNotificationScheduler = buildIdleDeviceNotificationScheduler();
|
||||
final Clock clock = getClock();
|
||||
|
||||
accounts
|
||||
.flatMap(account -> Flux.fromIterable(account.getDevices()).map(device -> Tuples.of(account, device)))
|
||||
.doOnNext(ignored -> DEVICE_INSPECTED_COUNTER.increment())
|
||||
.flatMap(accountAndDevice -> isDeviceEligible(accountAndDevice.getT1(), accountAndDevice.getT2(), idleDeviceNotificationScheduler, messagesManager)
|
||||
.flatMap(accountAndDevice -> isDeviceEligible(accountAndDevice.getT1(), accountAndDevice.getT2(), messagesManager, clock)
|
||||
.mapNotNull(eligible -> eligible ? accountAndDevice : null), maxConcurrency)
|
||||
.flatMap(accountAndDevice -> {
|
||||
final Account account = accountAndDevice.getT1();
|
||||
@@ -84,7 +93,7 @@ public class NotifyIdleDevicesWithoutMessagesCommand extends AbstractSinglePassC
|
||||
|
||||
final Mono<Void> scheduleNotificationMono = dryRun
|
||||
? Mono.empty()
|
||||
: Mono.fromFuture(() -> idleDeviceNotificationScheduler.scheduleNotification(account, device.getId(), PREFERRED_NOTIFICATION_TIME))
|
||||
: Mono.fromFuture(() -> idleDeviceNotificationScheduler.scheduleNotification(account, device, PREFERRED_NOTIFICATION_TIME))
|
||||
.onErrorResume(throwable -> {
|
||||
log.warn("Failed to schedule notification for {}:{}",
|
||||
account.getIdentifier(IdentityType.ACI),
|
||||
@@ -103,6 +112,11 @@ public class NotifyIdleDevicesWithoutMessagesCommand extends AbstractSinglePassC
|
||||
.block();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected Clock getClock() {
|
||||
return Clock.systemUTC();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected IdleDeviceNotificationScheduler buildIdleDeviceNotificationScheduler() {
|
||||
final DynamoDbTables.TableWithExpiration tableConfiguration = getConfiguration().getDynamoDbTables().getScheduledJobs();
|
||||
@@ -119,14 +133,14 @@ public class NotifyIdleDevicesWithoutMessagesCommand extends AbstractSinglePassC
|
||||
@VisibleForTesting
|
||||
static Mono<Boolean> isDeviceEligible(final Account account,
|
||||
final Device device,
|
||||
final IdleDeviceNotificationScheduler idleDeviceNotificationScheduler,
|
||||
final MessagesManager messagesManager) {
|
||||
final MessagesManager messagesManager,
|
||||
final Clock clock) {
|
||||
|
||||
if (!hasPushToken(device)) {
|
||||
return Mono.just(false);
|
||||
}
|
||||
|
||||
if (!idleDeviceNotificationScheduler.isIdle(device)) {
|
||||
if (!isIdle(device, clock)) {
|
||||
return Mono.just(false);
|
||||
}
|
||||
|
||||
@@ -134,6 +148,13 @@ public class NotifyIdleDevicesWithoutMessagesCommand extends AbstractSinglePassC
|
||||
.map(mayHavePersistedMessages -> !mayHavePersistedMessages);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static boolean isIdle(final Device device, final Clock clock) {
|
||||
final Duration idleDuration = Duration.between(Instant.ofEpochMilli(device.getLastSeen()), clock.instant());
|
||||
|
||||
return idleDuration.compareTo(MIN_IDLE_DURATION) >= 0 && idleDuration.compareTo(MAX_IDLE_DURATION) < 0;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static boolean hasPushToken(final Device device) {
|
||||
// Exclude VOIP tokens since they have their own, distinct delivery mechanism
|
||||
|
||||
Reference in New Issue
Block a user