Support scheduling background FCMs

This commit is contained in:
Ravi Khadiwala
2025-05-07 12:46:02 -05:00
committed by ravi-signal
parent 30c194c557
commit 703a05cb15
4 changed files with 110 additions and 73 deletions

View File

@@ -105,7 +105,7 @@ public class PushNotificationManager {
// APNs imposes a per-device limit on background push notifications; schedule a notification for some time in the
// future (possibly even now!) rather than sending a notification directly
return pushNotificationScheduler
.scheduleBackgroundApnsNotification(pushNotification.destination(), pushNotification.destinationDevice())
.scheduleBackgroundNotification(pushNotification.tokenType(), pushNotification.destination(), pushNotification.destinationDevice())
.whenComplete(logErrors())
.thenApply(ignored -> Optional.<SendPushNotificationResult>empty())
.toCompletableFuture();

View File

@@ -13,7 +13,6 @@ import io.lettuce.core.Range;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.SetArgs;
import io.lettuce.core.cluster.SlotHash;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import java.io.IOException;
import java.time.Clock;
@@ -44,14 +43,15 @@ public class PushNotificationScheduler implements Managed {
private static final Logger logger = LoggerFactory.getLogger(PushNotificationScheduler.class);
private static final String PENDING_BACKGROUND_NOTIFICATIONS_KEY_PREFIX = "PENDING_BACKGROUND_APN";
private static final String PENDING_BACKGROUND_APN_NOTIFICATIONS_KEY_PREFIX = "PENDING_BACKGROUND_APN";
private static final String PENDING_BACKGROUND_FCM_NOTIFICATIONS_KEY_PREFIX = "PENDING_BACKGROUND_FCM";
private static final String LAST_BACKGROUND_NOTIFICATION_TIMESTAMP_KEY_PREFIX = "LAST_BACKGROUND_NOTIFICATION";
private static final String PENDING_DELAYED_NOTIFICATIONS_KEY_PREFIX = "DELAYED";
@VisibleForTesting
static final String NEXT_SLOT_TO_PROCESS_KEY = "pending_notification_next_slot";
private static final Counter BACKGROUND_NOTIFICATION_SCHEDULED_COUNTER = Metrics.counter(name(PushNotificationScheduler.class, "backgroundNotification", "scheduled"));
private static final String BACKGROUND_NOTIFICATION_SCHEDULED_COUNTER_NAME = name(PushNotificationScheduler.class, "backgroundNotification", "scheduled");
private static final String BACKGROUND_NOTIFICATION_SENT_COUNTER_NAME = name(PushNotificationScheduler.class, "backgroundNotification", "sent");
private static final String DELAYED_NOTIFICATION_SCHEDULED_COUNTER_NAME = name(PushNotificationScheduler.class, "delayedNotificationScheduled");
@@ -65,7 +65,7 @@ public class PushNotificationScheduler implements Managed {
private final FaultTolerantRedisClusterClient pushSchedulingCluster;
private final Clock clock;
private final ClusterLuaScript scheduleBackgroundApnsNotificationScript;
private final ClusterLuaScript scheduleBackgroundNotificationScript;
private final Thread[] workerThreads;
@@ -103,15 +103,18 @@ public class PushNotificationScheduler implements Managed {
final int slot = (int) (pushSchedulingCluster.withCluster(connection ->
connection.sync().incr(NEXT_SLOT_TO_PROCESS_KEY)) % SlotHash.SLOT_COUNT);
return processScheduledBackgroundApnsNotifications(slot) + processScheduledDelayedNotifications(slot);
return processScheduledBackgroundNotifications(PushNotification.TokenType.APN, slot)
+ processScheduledBackgroundNotifications(PushNotification.TokenType.FCM, slot)
+ processScheduledDelayedNotifications(slot);
}
@VisibleForTesting
long processScheduledBackgroundApnsNotifications(final int slot) {
return processScheduledNotifications(getPendingBackgroundApnsNotificationQueueKey(slot),
PushNotificationScheduler.this::sendBackgroundApnsNotification);
long processScheduledBackgroundNotifications(PushNotification.TokenType tokenType, final int slot) {
return processScheduledNotifications(getPendingBackgroundNotificationQueueKey(tokenType, slot),
(account, device) -> sendBackgroundNotification(tokenType, account, device));
}
@VisibleForTesting
long processScheduledDelayedNotifications(final int slot) {
return processScheduledNotifications(getDelayedNotificationQueueKey(slot),
@@ -172,7 +175,7 @@ public class PushNotificationScheduler implements Managed {
this.pushSchedulingCluster = pushSchedulingCluster;
this.clock = clock;
this.scheduleBackgroundApnsNotificationScript = ClusterLuaScript.fromResource(pushSchedulingCluster,
this.scheduleBackgroundNotificationScript = ClusterLuaScript.fromResource(pushSchedulingCluster,
"lua/apn/schedule_background_notification.lua", ScriptOutputType.VALUE);
this.workerThreads = new Thread[dedicatedProcessThreadCount];
@@ -183,27 +186,25 @@ public class PushNotificationScheduler implements Managed {
}
/**
* Schedule a background APNs notification to be sent some time in the future.
* Schedule a background push notification to be sent some time in the future.
*
* @return A CompletionStage that completes when the notification has successfully been scheduled
*
* @throws IllegalArgumentException if the given device does not have an APNs token
* @throws IllegalArgumentException if the given device does not have a push token
*/
public CompletionStage<Void> scheduleBackgroundApnsNotification(final Account account, final Device device) {
if (StringUtils.isBlank(device.getApnId())) {
throw new IllegalArgumentException("Device must have an APNs token");
public CompletionStage<Void> scheduleBackgroundNotification(final PushNotification.TokenType tokenType, final Account account, final Device device) {
if (StringUtils.isBlank(getPushToken(tokenType, device))) {
throw new IllegalArgumentException("Device must have an " + tokenType + " token");
}
BACKGROUND_NOTIFICATION_SCHEDULED_COUNTER.increment();
return scheduleBackgroundApnsNotificationScript.executeAsync(
List.of(
getLastBackgroundApnsNotificationTimestampKey(account, device),
getPendingBackgroundApnsNotificationQueueKey(account, device)),
List.of(
encodeAciAndDeviceId(account, device),
String.valueOf(clock.millis()),
String.valueOf(BACKGROUND_NOTIFICATION_PERIOD.toMillis())))
Metrics.counter(BACKGROUND_NOTIFICATION_SCHEDULED_COUNTER_NAME, "type", tokenType.name()).increment();
return scheduleBackgroundNotificationScript.executeAsync(
List.of(
getLastBackgroundNotificationTimestampKey(account, device),
getPendingBackgroundNotificationQueueKey(tokenType, account, device)),
List.of(
encodeAciAndDeviceId(account, device),
String.valueOf(clock.millis()),
String.valueOf(BACKGROUND_NOTIFICATION_PERIOD.toMillis())))
.thenRun(Util.NOOP);
}
@@ -236,14 +237,15 @@ public class PushNotificationScheduler implements Managed {
*/
public CompletionStage<Void> cancelScheduledNotifications(Account account, Device device) {
return CompletableFuture.allOf(
cancelBackgroundApnsNotifications(account, device),
cancelBackgroundNotifications(PushNotification.TokenType.FCM, account, device),
cancelBackgroundNotifications(PushNotification.TokenType.APN, account, device),
cancelDelayedNotifications(account, device));
}
@VisibleForTesting
CompletableFuture<Void> cancelBackgroundApnsNotifications(final Account account, final Device device) {
CompletableFuture<Void> cancelBackgroundNotifications(final PushNotification.TokenType tokenType, final Account account, final Device device) {
return pushSchedulingCluster.withCluster(connection -> connection.async()
.zrem(getPendingBackgroundApnsNotificationQueueKey(account, device), encodeAciAndDeviceId(account, device)))
.zrem(getPendingBackgroundNotificationQueueKey(tokenType, account, device), encodeAciAndDeviceId(account, device)))
.thenRun(Util.NOOP)
.toCompletableFuture();
}
@@ -276,17 +278,23 @@ public class PushNotificationScheduler implements Managed {
}
@VisibleForTesting
CompletableFuture<Void> sendBackgroundApnsNotification(final Account account, final Device device) {
if (StringUtils.isBlank(device.getApnId())) {
CompletableFuture<Void> sendBackgroundNotification(PushNotification.TokenType tokenType, final Account account, final Device device) {
final String pushToken = getPushToken(tokenType, device);
if (StringUtils.isBlank(pushToken)) {
return CompletableFuture.completedFuture(null);
}
final PushNotificationSender sender = switch (tokenType) {
case FCM -> fcmSender;
case APN -> apnSender;
};
// It's okay for the "last notification" timestamp to expire after the "cooldown" period has elapsed; a missing
// timestamp and a timestamp older than the period are functionally equivalent.
return pushSchedulingCluster.withCluster(connection -> connection.async().set(
getLastBackgroundApnsNotificationTimestampKey(account, device),
getLastBackgroundNotificationTimestampKey(account, device),
String.valueOf(clock.millis()), new SetArgs().ex(BACKGROUND_NOTIFICATION_PERIOD)))
.thenCompose(ignored -> apnSender.sendNotification(new PushNotification(device.getApnId(), PushNotification.TokenType.APN, PushNotification.NotificationType.NOTIFICATION, null, account, device, false)))
.thenCompose(ignored -> sender.sendNotification(new PushNotification(pushToken, tokenType, PushNotification.NotificationType.NOTIFICATION, null, account, device, false)))
.thenAccept(response -> Metrics.counter(BACKGROUND_NOTIFICATION_SENT_COUNTER_NAME,
ACCEPTED_TAG, String.valueOf(response.accepted()))
.increment())
@@ -321,6 +329,10 @@ public class PushNotificationScheduler implements Managed {
@VisibleForTesting
static String encodeAciAndDeviceId(final Account account, final Device device) {
// Note: This does not include a device registration id. If a device is unlinked and a new device is linked with
// the original device's id, the new device might get the old device's scheduled push, or the new device might
// delay its own push because the old device had a recent push. An extra or delayed background push is harmless,
// so this is okay.
return account.getUuid() + ":" + device.getId();
}
@@ -351,15 +363,19 @@ public class PushNotificationScheduler implements Managed {
}
@VisibleForTesting
static String getPendingBackgroundApnsNotificationQueueKey(final Account account, final Device device) {
return getPendingBackgroundApnsNotificationQueueKey(SlotHash.getSlot(encodeAciAndDeviceId(account, device)));
static String getPendingBackgroundNotificationQueueKey(final PushNotification.TokenType tokenType, final Account account, final Device device) {
return getPendingBackgroundNotificationQueueKey(tokenType, SlotHash.getSlot(encodeAciAndDeviceId(account, device)));
}
private static String getPendingBackgroundApnsNotificationQueueKey(final int slot) {
return PENDING_BACKGROUND_NOTIFICATIONS_KEY_PREFIX + "::{" + RedisClusterUtil.getMinimalHashTag(slot) + "}";
private static String getPendingBackgroundNotificationQueueKey(final PushNotification.TokenType tokenType, final int slot) {
final String prefix = switch (tokenType) {
case APN -> PENDING_BACKGROUND_APN_NOTIFICATIONS_KEY_PREFIX;
case FCM -> PENDING_BACKGROUND_FCM_NOTIFICATIONS_KEY_PREFIX;
};
return prefix + "::{" + RedisClusterUtil.getMinimalHashTag(slot) + "}";
}
private static String getLastBackgroundApnsNotificationTimestampKey(final Account account, final Device device) {
private static String getLastBackgroundNotificationTimestampKey(final Account account, final Device device) {
return LAST_BACKGROUND_NOTIFICATION_TIMESTAMP_KEY_PREFIX + "::{" + encodeAciAndDeviceId(account, device) + "}";
}
@@ -376,15 +392,15 @@ public class PushNotificationScheduler implements Managed {
Optional<Instant> getLastBackgroundApnsNotificationTimestamp(final Account account, final Device device) {
return Optional.ofNullable(
pushSchedulingCluster.withCluster(connection ->
connection.sync().get(getLastBackgroundApnsNotificationTimestampKey(account, device))))
connection.sync().get(getLastBackgroundNotificationTimestampKey(account, device))))
.map(timestampString -> Instant.ofEpochMilli(Long.parseLong(timestampString)));
}
@VisibleForTesting
Optional<Instant> getNextScheduledBackgroundApnsNotificationTimestamp(final Account account, final Device device) {
Optional<Instant> getNextScheduledBackgroundNotificationTimestamp(PushNotification.TokenType tokenType, final Account account, final Device device) {
return Optional.ofNullable(
pushSchedulingCluster.withCluster(connection ->
connection.sync().zscore(getPendingBackgroundApnsNotificationQueueKey(account, device),
connection.sync().zscore(getPendingBackgroundNotificationQueueKey(tokenType, account, device),
encodeAciAndDeviceId(account, device))))
.map(timestamp -> Instant.ofEpochMilli(timestamp.longValue()));
}
@@ -407,4 +423,11 @@ public class PushNotificationScheduler implements Managed {
return "unknown";
}
}
private static String getPushToken(final PushNotification.TokenType tokenType, final Device device) {
return switch (tokenType) {
case FCM -> device.getGcmId();
case APN -> device.getApnId();
};
}
}