Add support for scheduling background push notifications

This commit is contained in:
Jon Chambers
2022-08-08 16:11:24 -04:00
committed by Jon Chambers
parent c2be0af9d9
commit 5f6b66dad6
5 changed files with 291 additions and 42 deletions

View File

@@ -5,20 +5,28 @@
package org.whispersystems.textsecuregcm.push;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import com.google.common.annotations.VisibleForTesting;
import io.dropwizard.lifecycle.Managed;
import io.lettuce.core.Limit;
import io.lettuce.core.Range;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.SetArgs;
import io.lettuce.core.cluster.SlotHash;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
@@ -30,22 +38,25 @@ import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.util.RedisClusterUtil;
import org.whispersystems.textsecuregcm.util.Util;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
public class ApnPushNotificationScheduler implements Managed {
private static final Logger logger = LoggerFactory.getLogger(ApnPushNotificationScheduler.class);
private static final String PENDING_NOTIFICATIONS_KEY = "PENDING_APN";
private static final String PENDING_RECURRING_VOIP_NOTIFICATIONS_KEY_PREFIX = "PENDING_APN";
private static final String PENDING_BACKGROUND_NOTIFICATIONS_KEY_PREFIX = "PENDING_BACKGROUND_APN";
private static final String LAST_BACKGROUND_NOTIFICATION_TIMESTAMP_KEY_PREFIX = "LAST_BACKGROUND_NOTIFICATION";
@VisibleForTesting
static final String NEXT_SLOT_TO_PERSIST_KEY = "pending_notification_next_slot";
static final String NEXT_SLOT_TO_PROCESS_KEY = "pending_notification_next_slot";
private static final Counter delivered = Metrics.counter(name(ApnPushNotificationScheduler.class, "voip_delivered"));
private static final Counter sent = Metrics.counter(name(ApnPushNotificationScheduler.class, "voip_sent"));
private static final Counter retry = Metrics.counter(name(ApnPushNotificationScheduler.class, "voip_retry"));
private static final Counter evicted = Metrics.counter(name(ApnPushNotificationScheduler.class, "voip_evicted"));
private static final Counter backgroundNotificationScheduledCounter = Metrics.counter(name(ApnPushNotificationScheduler.class, "backgroundNotification", "scheduled"));
private static final Counter backgroundNotificationSentCounter = Metrics.counter(name(ApnPushNotificationScheduler.class, "backgroundNotification", "sent"));
private final APNSender apnSender;
private final AccountsManager accountsManager;
private final FaultTolerantRedisCluster pushSchedulingCluster;
@@ -55,14 +66,21 @@ public class ApnPushNotificationScheduler implements Managed {
private final ClusterLuaScript insertPendingVoipDestinationScript;
private final ClusterLuaScript removePendingVoipDestinationScript;
private final ClusterLuaScript scheduleBackgroundNotificationScript;
private final Thread[] workerThreads = new Thread[WORKER_THREAD_COUNT];
private static final int WORKER_THREAD_COUNT = 4;
@VisibleForTesting
static final Duration BACKGROUND_NOTIFICATION_PERIOD = Duration.ofMinutes(20);
private final AtomicBoolean running = new AtomicBoolean(false);
class NotificationWorker implements Runnable {
private static final int PAGE_SIZE = 128;
@Override
public void run() {
while (running.get()) {
@@ -78,36 +96,68 @@ public class ApnPushNotificationScheduler implements Managed {
}
}
long processNextSlot() {
final int slot = getNextSlot();
private long processNextSlot() {
final int slot = (int) (pushSchedulingCluster.withCluster(connection ->
connection.sync().incr(NEXT_SLOT_TO_PROCESS_KEY)) % SlotHash.SLOT_COUNT);
return processRecurringVoipNotifications(slot) + processScheduledBackgroundNotifications(slot);
}
@VisibleForTesting
long processRecurringVoipNotifications(final int slot) {
List<String> pendingDestinations;
long entriesProcessed = 0;
do {
pendingDestinations = getPendingDestinationsForRecurringVoipNotifications(slot, 100);
pendingDestinations = getPendingDestinationsForRecurringVoipNotifications(slot, PAGE_SIZE);
entriesProcessed += pendingDestinations.size();
for (final String uuidAndDevice : pendingDestinations) {
final Optional<Pair<String, Long>> separated = getSeparated(uuidAndDevice);
final Optional<Account> maybeAccount = separated.map(Pair::first)
.map(UUID::fromString)
.flatMap(accountsManager::getByAccountIdentifier);
final Optional<Device> maybeDevice = separated.map(Pair::second)
.flatMap(deviceId -> maybeAccount.flatMap(account -> account.getDevice(deviceId)));
if (maybeAccount.isPresent() && maybeDevice.isPresent()) {
sendRecurringVoipNotification(maybeAccount.get(), maybeDevice.get());
} else {
removeRecurringVoipNotificationEntry(uuidAndDevice);
for (final String destination : pendingDestinations) {
try {
getAccountAndDeviceFromPairString(destination).ifPresentOrElse(
accountAndDevice -> sendRecurringVoipNotification(accountAndDevice.first(), accountAndDevice.second()),
() -> removeRecurringVoipNotificationEntry(destination));
} catch (final IllegalArgumentException e) {
logger.warn("Failed to parse account/device pair: {}", destination, e);
}
}
} while (!pendingDestinations.isEmpty());
return entriesProcessed;
}
@VisibleForTesting
long processScheduledBackgroundNotifications(final int slot) {
final long currentTimeMillis = clock.millis();
final String queueKey = getPendingBackgroundNotificationQueueKey(slot);
final long processedBackgroundNotifications = pushSchedulingCluster.withCluster(connection -> {
List<String> destinations;
long offset = 0;
do {
destinations = connection.sync().zrangebyscore(queueKey, Range.create(0, currentTimeMillis), Limit.create(offset, PAGE_SIZE));
for (final String destination : destinations) {
try {
getAccountAndDeviceFromPairString(destination).ifPresent(accountAndDevice ->
sendBackgroundNotification(accountAndDevice.first(), accountAndDevice.second()));
} catch (final IllegalArgumentException e) {
logger.warn("Failed to parse account/device pair: {}", destination, e);
}
}
offset += destinations.size();
} while (destinations.size() == PAGE_SIZE);
return offset;
});
pushSchedulingCluster.useCluster(connection ->
connection.sync().zremrangebyscore(queueKey, Range.create(0, currentTimeMillis)));
return processedBackgroundNotifications;
}
}
public ApnPushNotificationScheduler(FaultTolerantRedisCluster pushSchedulingCluster,
@@ -132,20 +182,38 @@ public class ApnPushNotificationScheduler implements Managed {
this.insertPendingVoipDestinationScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/insert.lua", ScriptOutputType.VALUE);
this.removePendingVoipDestinationScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/remove.lua", ScriptOutputType.INTEGER);
this.scheduleBackgroundNotificationScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/schedule_background_notification.lua", ScriptOutputType.VALUE);
for (int i = 0; i < this.workerThreads.length; i++) {
this.workerThreads[i] = new Thread(new NotificationWorker(), "ApnFallbackManagerWorker-" + i);
}
}
public void scheduleRecurringVoipNotification(Account account, Device device) {
void scheduleRecurringVoipNotification(Account account, Device device) {
sent.increment();
insertRecurringVoipNotificationEntry(account, device, clock.millis() + (15 * 1000), (15 * 1000));
}
public void cancelRecurringVoipNotification(Account account, Device device) {
void scheduleBackgroundNotification(final Account account, final Device device) {
backgroundNotificationScheduledCounter.increment();
scheduleBackgroundNotificationScript.execute(
List.of(
getLastBackgroundNotificationTimestampKey(account, device),
getPendingBackgroundNotificationQueueKey(account, device)),
List.of(
getPairString(account, device),
String.valueOf(clock.millis()),
String.valueOf(BACKGROUND_NOTIFICATION_PERIOD.toMillis())));
}
public void cancelScheduledNotifications(Account account, Device device) {
if (removeRecurringVoipNotificationEntry(account, device)) {
delivered.increment();
}
pushSchedulingCluster.useCluster(connection ->
connection.sync().zrem(getPendingBackgroundNotificationQueueKey(account, device), getPairString(account, device)));
}
@Override
@@ -186,6 +254,22 @@ public class ApnPushNotificationScheduler implements Managed {
retry.increment();
}
@VisibleForTesting
void sendBackgroundNotification(final Account account, final Device device) {
if (StringUtils.isNotBlank(device.getApnId())) {
// It's okay for the "last notification" timestamp to expire after the "cooldown" period has elapsed; a missing
// timestamp and a timestamp older than the period are functionally equivalent.
pushSchedulingCluster.useCluster(connection -> connection.sync().set(
getLastBackgroundNotificationTimestampKey(account, device),
String.valueOf(clock.millis()), new SetArgs().ex(BACKGROUND_NOTIFICATION_PERIOD)));
// TODO Set priority, etc.
apnSender.sendNotification(new PushNotification(device.getApnId(), PushNotification.TokenType.APN, PushNotification.NotificationType.NOTIFICATION, null, account, device));
backgroundNotificationSentCounter.increment();
}
}
@VisibleForTesting
static Optional<Pair<String, Long>> getSeparated(String encoded) {
try {
@@ -205,6 +289,34 @@ public class ApnPushNotificationScheduler implements Managed {
}
}
@VisibleForTesting
static String getPairString(final Account account, final Device device) {
return account.getUuid() + ":" + device.getId();
}
@VisibleForTesting
Optional<Pair<Account, Device>> getAccountAndDeviceFromPairString(final String endpoint) {
try {
if (StringUtils.isBlank(endpoint)) {
throw new IllegalArgumentException("Endpoint must not be blank");
}
final String[] parts = endpoint.split(":");
if (parts.length != 2) {
throw new IllegalArgumentException("Could not parse endpoint string: " + endpoint);
}
final Optional<Account> maybeAccount = accountsManager.getByAccountIdentifier(UUID.fromString(parts[0]));
return maybeAccount.flatMap(account -> account.getDevice(Long.parseLong(parts[1])))
.map(device -> new Pair<>(maybeAccount.get(), device));
} catch (final NumberFormatException e) {
throw new IllegalArgumentException(e);
}
}
private boolean removeRecurringVoipNotificationEntry(Account account, Device device) {
return removeRecurringVoipNotificationEntry(getEndpointKey(account, device));
}
@@ -235,19 +347,45 @@ public class ApnPushNotificationScheduler implements Managed {
}
@VisibleForTesting
String getEndpointKey(final Account account, final Device device) {
static String getEndpointKey(final Account account, final Device device) {
return "apn_device::{" + account.getUuid() + "::" + device.getId() + "}";
}
private String getPendingRecurringVoipNotificationQueueKey(final String endpoint) {
private static String getPendingRecurringVoipNotificationQueueKey(final String endpoint) {
return getPendingRecurringVoipNotificationQueueKey(SlotHash.getSlot(endpoint));
}
private String getPendingRecurringVoipNotificationQueueKey(final int slot) {
return PENDING_NOTIFICATIONS_KEY + "::{" + RedisClusterUtil.getMinimalHashTag(slot) + "}";
private static String getPendingRecurringVoipNotificationQueueKey(final int slot) {
return PENDING_RECURRING_VOIP_NOTIFICATIONS_KEY_PREFIX + "::{" + RedisClusterUtil.getMinimalHashTag(slot) + "}";
}
private int getNextSlot() {
return (int)(pushSchedulingCluster.withCluster(connection -> connection.sync().incr(NEXT_SLOT_TO_PERSIST_KEY)) % SlotHash.SLOT_COUNT);
@VisibleForTesting
static String getPendingBackgroundNotificationQueueKey(final Account account, final Device device) {
return getPendingBackgroundNotificationQueueKey(SlotHash.getSlot(getPairString(account, device)));
}
private static String getPendingBackgroundNotificationQueueKey(final int slot) {
return PENDING_BACKGROUND_NOTIFICATIONS_KEY_PREFIX + "::{" + RedisClusterUtil.getMinimalHashTag(slot) + "}";
}
private static String getLastBackgroundNotificationTimestampKey(final Account account, final Device device) {
return LAST_BACKGROUND_NOTIFICATION_TIMESTAMP_KEY_PREFIX + "::{" + getPairString(account, device) + "}";
}
@VisibleForTesting
Optional<Instant> getLastBackgroundNotificationTimestamp(final Account account, final Device device) {
return Optional.ofNullable(
pushSchedulingCluster.withCluster(connection ->
connection.sync().get(getLastBackgroundNotificationTimestampKey(account, device))))
.map(timestampString -> Instant.ofEpochMilli(Long.parseLong(timestampString)));
}
@VisibleForTesting
Optional<Instant> getNextScheduledBackgroundNotificationTimestamp(final Account account, final Device device) {
return Optional.ofNullable(
pushSchedulingCluster.withCluster(connection ->
connection.sync().zscore(getPendingBackgroundNotificationQueueKey(account, device),
getPairString(account, device))))
.map(timestamp -> Instant.ofEpochMilli(timestamp.longValue()));
}
}

View File

@@ -70,7 +70,7 @@ public class PushNotificationManager {
public void handleMessagesRetrieved(final Account account, final Device device, final String userAgent) {
RedisOperation.unchecked(() -> pushLatencyManager.recordQueueRead(account.getUuid(), device.getId(), userAgent));
RedisOperation.unchecked(() -> apnPushNotificationScheduler.cancelRecurringVoipNotification(account, device));
RedisOperation.unchecked(() -> apnPushNotificationScheduler.cancelScheduledNotifications(account, device));
}
@VisibleForTesting
@@ -139,7 +139,7 @@ public class PushNotificationManager {
d.setUninstalledFeedbackTimestamp(Util.todayInMillis()));
}
} else {
RedisOperation.unchecked(() -> apnPushNotificationScheduler.cancelRecurringVoipNotification(account, device));
RedisOperation.unchecked(() -> apnPushNotificationScheduler.cancelScheduledNotifications(account, device));
}
}
}