Remove recurring background VOIP notification machinery

This commit is contained in:
Jon Chambers
2024-09-19 11:45:21 -04:00
committed by Jon Chambers
parent 3ed142d0a9
commit b693cb98d0
7 changed files with 3 additions and 321 deletions

View File

@@ -141,18 +141,6 @@ public class PushNotificationManager {
result.errorCode(),
result.unregisteredTimestamp());
}
if (result.accepted() &&
pushNotification.tokenType() == PushNotification.TokenType.APN_VOIP &&
pushNotification.notificationType() == PushNotification.NotificationType.NOTIFICATION &&
pushNotification.destination() != null &&
pushNotification.destinationDevice() != null) {
pushNotificationScheduler.scheduleRecurringApnsVoipNotification(
pushNotification.destination(),
pushNotification.destinationDevice())
.whenComplete(logErrors());
}
} else {
logger.debug("Failed to deliver {} push notification to {} ({})",
pushNotification.notificationType(), pushNotification.deviceToken(), pushNotification.tokenType(),

View File

@@ -19,20 +19,17 @@ import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.identity.IdentityType;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.storage.Account;
@@ -41,14 +38,12 @@ import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.util.RedisClusterUtil;
import org.whispersystems.textsecuregcm.util.Util;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class PushNotificationScheduler implements Managed {
private static final Logger logger = LoggerFactory.getLogger(PushNotificationScheduler.class);
private static final String PENDING_RECURRING_VOIP_NOTIFICATIONS_KEY_PREFIX = "PENDING_APN";
private static final String PENDING_BACKGROUND_NOTIFICATIONS_KEY_PREFIX = "PENDING_BACKGROUND_APN";
private static final String LAST_BACKGROUND_NOTIFICATION_TIMESTAMP_KEY_PREFIX = "LAST_BACKGROUND_NOTIFICATION";
private static final String PENDING_DELAYED_NOTIFICATIONS_KEY_PREFIX = "DELAYED";
@@ -56,11 +51,6 @@ public class PushNotificationScheduler implements Managed {
@VisibleForTesting
static final String NEXT_SLOT_TO_PROCESS_KEY = "pending_notification_next_slot";
private static final Counter delivered = Metrics.counter("chat.ApnPushNotificationScheduler.voip_delivered");
private static final Counter sent = Metrics.counter("chat.ApnPushNotificationScheduler.voip_sent");
private static final Counter retry = Metrics.counter("chat.ApnPushNotificationScheduler.voip_retry");
private static final Counter evicted = Metrics.counter("chat.ApnPushNotificationScheduler.voip_evicted");
private static final Counter BACKGROUND_NOTIFICATION_SCHEDULED_COUNTER = Metrics.counter(name(PushNotificationScheduler.class, "backgroundNotification", "scheduled"));
private static final String BACKGROUND_NOTIFICATION_SENT_COUNTER_NAME = name(PushNotificationScheduler.class, "backgroundNotification", "sent");
@@ -75,10 +65,6 @@ public class PushNotificationScheduler implements Managed {
private final FaultTolerantRedisCluster pushSchedulingCluster;
private final Clock clock;
private final ClusterLuaScript getPendingVoipDestinationsScript;
private final ClusterLuaScript insertPendingVoipDestinationScript;
private final ClusterLuaScript removePendingVoipDestinationScript;
private final ClusterLuaScript scheduleBackgroundApnsNotificationScript;
private final Thread[] workerThreads;
@@ -117,37 +103,7 @@ public class PushNotificationScheduler implements Managed {
final int slot = (int) (pushSchedulingCluster.withCluster(connection ->
connection.sync().incr(NEXT_SLOT_TO_PROCESS_KEY)) % SlotHash.SLOT_COUNT);
return processRecurringApnsVoipNotifications(slot) +
processScheduledBackgroundApnsNotifications(slot) +
processScheduledDelayedNotifications(slot);
}
@VisibleForTesting
long processRecurringApnsVoipNotifications(final int slot) {
List<String> pendingDestinations;
long entriesProcessed = 0;
do {
pendingDestinations = getPendingDestinationsForRecurringApnsVoipNotifications(slot, PAGE_SIZE);
entriesProcessed += pendingDestinations.size();
Flux.fromIterable(pendingDestinations)
.flatMap(destination -> Mono.fromFuture(() -> getAccountAndDeviceFromPairString(destination))
.flatMap(maybeAccountAndDevice -> {
if (maybeAccountAndDevice.isPresent()) {
final Pair<Account, Device> accountAndDevice = maybeAccountAndDevice.get();
return Mono.fromFuture(() -> sendRecurringApnsVoipNotification(accountAndDevice.first(), accountAndDevice.second()));
} else {
final Pair<UUID, Byte> aciAndDeviceId = decodeAciAndDeviceId(destination);
return Mono.fromFuture(() -> removeRecurringApnsVoipNotificationEntry(aciAndDeviceId.first(), aciAndDeviceId.second()))
.then();
}
}), maxConcurrency)
.then()
.block();
} while (!pendingDestinations.isEmpty());
return entriesProcessed;
return processScheduledBackgroundApnsNotifications(slot) + processScheduledDelayedNotifications(slot);
}
@VisibleForTesting
@@ -216,13 +172,6 @@ public class PushNotificationScheduler implements Managed {
this.pushSchedulingCluster = pushSchedulingCluster;
this.clock = clock;
this.getPendingVoipDestinationsScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/get.lua",
ScriptOutputType.MULTI);
this.insertPendingVoipDestinationScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/insert.lua",
ScriptOutputType.VALUE);
this.removePendingVoipDestinationScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/remove.lua",
ScriptOutputType.INTEGER);
this.scheduleBackgroundApnsNotificationScript = ClusterLuaScript.fromResource(pushSchedulingCluster,
"lua/apn/schedule_background_notification.lua", ScriptOutputType.VALUE);
@@ -233,17 +182,6 @@ public class PushNotificationScheduler implements Managed {
}
}
/**
* Schedule a recurring VOIP notification until {@link this#cancelScheduledNotifications} is called or the device is
* removed
*
* @return A CompletionStage that completes when the recurring notification has successfully been scheduled
*/
public CompletionStage<Void> scheduleRecurringApnsVoipNotification(Account account, Device device) {
sent.increment();
return insertRecurringApnsVoipNotificationEntry(account.getIdentifier(IdentityType.ACI), device.getId(), clock.millis() + (15 * 1000), (15 * 1000));
}
/**
* Schedule a background APNs notification to be sent some time in the future.
*
@@ -292,32 +230,16 @@ public class PushNotificationScheduler implements Managed {
}
/**
* Cancel a scheduled recurring VOIP notification
* Cancel scheduled notifications for the given account and device.
*
* @return A CompletionStage that completes when the scheduled task has been cancelled.
* @return A CompletionStage that completes when the scheduled notification has been cancelled.
*/
public CompletionStage<Void> cancelScheduledNotifications(Account account, Device device) {
return CompletableFuture.allOf(
cancelRecurringApnsVoipNotifications(account, device),
cancelBackgroundApnsNotifications(account, device),
cancelDelayedNotifications(account, device));
}
private CompletableFuture<Void> cancelRecurringApnsVoipNotifications(final Account account, final Device device) {
return removeRecurringApnsVoipNotificationEntry(account.getIdentifier(IdentityType.ACI), device.getId())
.thenCompose(removed -> {
if (removed) {
delivered.increment();
}
return pushSchedulingCluster.withCluster(connection ->
connection.async().zrem(
getPendingBackgroundApnsNotificationQueueKey(account, device),
encodeAciAndDeviceId(account, device)));
})
.thenRun(Util.NOOP)
.toCompletableFuture();
}
@VisibleForTesting
CompletableFuture<Void> cancelBackgroundApnsNotifications(final Account account, final Device device) {
return pushSchedulingCluster.withCluster(connection -> connection.async()
@@ -353,21 +275,6 @@ public class PushNotificationScheduler implements Managed {
}
}
private CompletableFuture<Void> sendRecurringApnsVoipNotification(final Account account, final Device device) {
if (StringUtils.isBlank(device.getVoipApnId())) {
return removeRecurringApnsVoipNotificationEntry(account.getIdentifier(IdentityType.ACI), device.getId())
.thenRun(Util.NOOP);
}
if (device.getLastSeen() < clock.millis() - TimeUnit.DAYS.toMillis(7)) {
return removeRecurringApnsVoipNotificationEntry(account.getIdentifier(IdentityType.ACI), device.getId())
.thenRun(evicted::increment);
}
return apnSender.sendNotification(new PushNotification(device.getVoipApnId(), PushNotification.TokenType.APN_VOIP, PushNotification.NotificationType.NOTIFICATION, null, account, device, true))
.thenRun(retry::increment);
}
@VisibleForTesting
CompletableFuture<Void> sendBackgroundApnsNotification(final Account account, final Device device) {
if (StringUtils.isBlank(device.getApnId())) {
@@ -443,48 +350,6 @@ public class PushNotificationScheduler implements Managed {
.flatMap(account -> account.getDevice(aciAndDeviceId.second()).map(device -> new Pair<>(account, device))));
}
private CompletableFuture<Boolean> removeRecurringApnsVoipNotificationEntry(final UUID aci, final byte deviceId) {
final String endpoint = getVoipEndpointKey(aci, deviceId);
return removePendingVoipDestinationScript.executeAsync(
List.of(getPendingRecurringApnsVoipNotificationQueueKey(endpoint), endpoint), Collections.emptyList())
.thenApply(result -> ((long) result) > 0);
}
@SuppressWarnings("unchecked")
@VisibleForTesting
List<String> getPendingDestinationsForRecurringApnsVoipNotifications(final int slot, final int limit) {
return (List<String>) getPendingVoipDestinationsScript.execute(
List.of(getPendingRecurringApnsVoipNotificationQueueKey(slot)),
List.of(String.valueOf(clock.millis()), String.valueOf(limit)));
}
@SuppressWarnings("SameParameterValue")
private CompletionStage<Void> insertRecurringApnsVoipNotificationEntry(final UUID aci, final byte deviceId, final long timestamp, final long interval) {
final String endpoint = getVoipEndpointKey(aci, deviceId);
return insertPendingVoipDestinationScript.executeAsync(
List.of(getPendingRecurringApnsVoipNotificationQueueKey(endpoint), endpoint),
List.of(String.valueOf(timestamp),
String.valueOf(interval),
aci.toString(),
String.valueOf(deviceId)))
.thenRun(Util.NOOP);
}
@VisibleForTesting
static String getVoipEndpointKey(final UUID aci, final byte deviceId) {
return "apn_device::{" + aci + "::" + deviceId + "}";
}
private static String getPendingRecurringApnsVoipNotificationQueueKey(final String endpoint) {
return getPendingRecurringApnsVoipNotificationQueueKey(SlotHash.getSlot(endpoint));
}
private static String getPendingRecurringApnsVoipNotificationQueueKey(final int slot) {
return PENDING_RECURRING_VOIP_NOTIFICATIONS_KEY_PREFIX + "::{" + RedisClusterUtil.getMinimalHashTag(slot) + "}";
}
@VisibleForTesting
static String getPendingBackgroundApnsNotificationQueueKey(final Account account, final Device device) {
return getPendingBackgroundApnsNotificationQueueKey(SlotHash.getSlot(encodeAciAndDeviceId(account, device)));