Refactor scheduled APNs notifications in preparation for future development

This commit is contained in:
Jon Chambers
2022-08-12 10:47:49 -04:00
committed by GitHub
parent a44c18e9b7
commit a53a85d788
21 changed files with 200 additions and 213 deletions

View File

@@ -5,26 +5,12 @@
package org.whispersystems.textsecuregcm.push;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.RatioGauge;
import com.codahale.metrics.SharedMetricRegistries;
import com.google.common.annotations.VisibleForTesting;
import io.dropwizard.lifecycle.Managed;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.cluster.SlotHash;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.util.RedisClusterUtil;
import org.whispersystems.textsecuregcm.util.Util;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
@@ -32,33 +18,40 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.util.RedisClusterUtil;
import org.whispersystems.textsecuregcm.util.Util;
import static com.codahale.metrics.MetricRegistry.name;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
public class ApnFallbackManager implements Managed {
public class ApnPushNotificationScheduler implements Managed {
private static final Logger logger = LoggerFactory.getLogger(ApnFallbackManager.class);
private static final Logger logger = LoggerFactory.getLogger(ApnPushNotificationScheduler.class);
private static final String PENDING_NOTIFICATIONS_KEY = "PENDING_APN";
static final String NEXT_SLOT_TO_PERSIST_KEY = "pending_notification_next_slot";
private static final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
private static final Meter delivered = metricRegistry.meter(name(ApnFallbackManager.class, "voip_delivered"));
private static final Meter sent = metricRegistry.meter(name(ApnFallbackManager.class, "voip_sent" ));
private static final Meter retry = metricRegistry.meter(name(ApnFallbackManager.class, "voip_retry"));
private static final Meter evicted = metricRegistry.meter(name(ApnFallbackManager.class, "voip_evicted"));
@VisibleForTesting
static final String NEXT_SLOT_TO_PERSIST_KEY = "pending_notification_next_slot";
static {
metricRegistry.register(name(ApnFallbackManager.class, "voip_ratio"), new VoipRatioGauge(delivered, sent));
}
private static final Counter delivered = Metrics.counter(name(ApnPushNotificationScheduler.class, "voip_delivered"));
private static final Counter sent = Metrics.counter(name(ApnPushNotificationScheduler.class, "voip_sent"));
private static final Counter retry = Metrics.counter(name(ApnPushNotificationScheduler.class, "voip_retry"));
private static final Counter evicted = Metrics.counter(name(ApnPushNotificationScheduler.class, "voip_evicted"));
private final APNSender apnSender;
private final AccountsManager accountsManager;
private final FaultTolerantRedisCluster cluster;
private final APNSender apnSender;
private final AccountsManager accountsManager;
private final FaultTolerantRedisCluster pushSchedulingCluster;
private final ClusterLuaScript getScript;
private final ClusterLuaScript insertScript;
private final ClusterLuaScript removeScript;
private final ClusterLuaScript getPendingVoipDestinationsScript;
private final ClusterLuaScript insertPendingVoipDestinationScript;
private final ClusterLuaScript removePendingVoipDestinationScript;
private final Thread[] workerThreads = new Thread[WORKER_THREAD_COUNT];
@@ -90,7 +83,7 @@ public class ApnFallbackManager implements Managed {
long entriesProcessed = 0;
do {
pendingDestinations = getPendingDestinations(slot, 100);
pendingDestinations = getPendingDestinationsForRecurringVoipNotifications(slot, 100);
entriesProcessed += pendingDestinations.size();
for (final String uuidAndDevice : pendingDestinations) {
@@ -104,9 +97,9 @@ public class ApnFallbackManager implements Managed {
.flatMap(deviceId -> maybeAccount.flatMap(account -> account.getDevice(deviceId)));
if (maybeAccount.isPresent() && maybeDevice.isPresent()) {
sendNotification(maybeAccount.get(), maybeDevice.get());
sendRecurringVoipNotification(maybeAccount.get(), maybeDevice.get());
} else {
remove(uuidAndDevice);
removeRecurringVoipNotificationEntry(uuidAndDevice);
}
}
} while (!pendingDestinations.isEmpty());
@@ -115,37 +108,37 @@ public class ApnFallbackManager implements Managed {
}
}
public ApnFallbackManager(FaultTolerantRedisCluster cluster,
public ApnPushNotificationScheduler(FaultTolerantRedisCluster pushSchedulingCluster,
APNSender apnSender,
AccountsManager accountsManager)
throws IOException
{
this.apnSender = apnSender;
this.apnSender = apnSender;
this.accountsManager = accountsManager;
this.cluster = cluster;
this.pushSchedulingCluster = pushSchedulingCluster;
this.getScript = ClusterLuaScript.fromResource(cluster, "lua/apn/get.lua", ScriptOutputType.MULTI);
this.insertScript = ClusterLuaScript.fromResource(cluster, "lua/apn/insert.lua", ScriptOutputType.VALUE);
this.removeScript = ClusterLuaScript.fromResource(cluster, "lua/apn/remove.lua", ScriptOutputType.INTEGER);
this.getPendingVoipDestinationsScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/get.lua", ScriptOutputType.MULTI);
this.insertPendingVoipDestinationScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/insert.lua", ScriptOutputType.VALUE);
this.removePendingVoipDestinationScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/remove.lua", ScriptOutputType.INTEGER);
for (int i = 0; i < this.workerThreads.length; i++) {
this.workerThreads[i] = new Thread(new NotificationWorker(), "ApnFallbackManagerWorker-" + i);
}
}
public void schedule(Account account, Device device) {
schedule(account, device, System.currentTimeMillis());
public void scheduleRecurringVoipNotification(Account account, Device device) {
scheduleRecurringVoipNotification(account, device, System.currentTimeMillis());
}
@VisibleForTesting
void schedule(Account account, Device device, long timestamp) {
sent.mark();
insert(account, device, timestamp + (15 * 1000), (15 * 1000));
void scheduleRecurringVoipNotification(Account account, Device device, long timestamp) {
sent.increment();
insertRecurringVoipNotificationEntry(account, device, timestamp + (15 * 1000), (15 * 1000));
}
public void cancel(Account account, Device device) {
if (remove(account, device)) {
delivered.mark();
public void cancelRecurringVoipNotification(Account account, Device device) {
if (removeRecurringVoipNotificationEntry(account, device)) {
delivered.increment();
}
}
@@ -167,24 +160,24 @@ public class ApnFallbackManager implements Managed {
}
}
private void sendNotification(final Account account, final Device device) {
private void sendRecurringVoipNotification(final Account account, final Device device) {
String apnId = device.getVoipApnId();
if (apnId == null) {
remove(account, device);
removeRecurringVoipNotificationEntry(account, device);
return;
}
long deviceLastSeen = device.getLastSeen();
if (deviceLastSeen < System.currentTimeMillis() - TimeUnit.DAYS.toMillis(7)) {
evicted.mark();
remove(account, device);
evicted.increment();
removeRecurringVoipNotificationEntry(account, device);
return;
}
apnSender.sendNotification(new PushNotification(apnId, PushNotification.TokenType.APN_VOIP, PushNotification.NotificationType.NOTIFICATION, null, account, device));
retry.mark();
retry.increment();
}
@VisibleForTesting
@@ -206,30 +199,33 @@ public class ApnFallbackManager implements Managed {
}
}
private boolean remove(Account account, Device device) {
return remove(getEndpointKey(account, device));
private boolean removeRecurringVoipNotificationEntry(Account account, Device device) {
return removeRecurringVoipNotificationEntry(getEndpointKey(account, device));
}
private boolean remove(final String endpoint) {
return (long)removeScript.execute(List.of(getPendingNotificationQueueKey(endpoint), endpoint),
Collections.emptyList()) > 0;
private boolean removeRecurringVoipNotificationEntry(final String endpoint) {
return (long) removePendingVoipDestinationScript.execute(
List.of(getPendingRecurringVoipNotificationQueueKey(endpoint), endpoint),
Collections.emptyList()) > 0;
}
@SuppressWarnings("unchecked")
@VisibleForTesting
List<String> getPendingDestinations(final int slot, final int limit) {
return (List<String>)getScript.execute(List.of(getPendingNotificationQueueKey(slot)),
List.of(String.valueOf(System.currentTimeMillis()), String.valueOf(limit)));
List<String> getPendingDestinationsForRecurringVoipNotifications(final int slot, final int limit) {
return (List<String>) getPendingVoipDestinationsScript.execute(
List.of(getPendingRecurringVoipNotificationQueueKey(slot)),
List.of(String.valueOf(System.currentTimeMillis()), String.valueOf(limit)));
}
private void insert(final Account account, final Device device, final long timestamp, final long interval) {
private void insertRecurringVoipNotificationEntry(final Account account, final Device device, final long timestamp, final long interval) {
final String endpoint = getEndpointKey(account, device);
insertScript.execute(List.of(getPendingNotificationQueueKey(endpoint), endpoint),
List.of(String.valueOf(timestamp),
String.valueOf(interval),
account.getUuid().toString(),
String.valueOf(device.getId())));
insertPendingVoipDestinationScript.execute(
List.of(getPendingRecurringVoipNotificationQueueKey(endpoint), endpoint),
List.of(String.valueOf(timestamp),
String.valueOf(interval),
account.getUuid().toString(),
String.valueOf(device.getId())));
}
@VisibleForTesting
@@ -237,32 +233,15 @@ public class ApnFallbackManager implements Managed {
return "apn_device::{" + account.getUuid() + "::" + device.getId() + "}";
}
private String getPendingNotificationQueueKey(final String endpoint) {
return getPendingNotificationQueueKey(SlotHash.getSlot(endpoint));
private String getPendingRecurringVoipNotificationQueueKey(final String endpoint) {
return getPendingRecurringVoipNotificationQueueKey(SlotHash.getSlot(endpoint));
}
private String getPendingNotificationQueueKey(final int slot) {
private String getPendingRecurringVoipNotificationQueueKey(final int slot) {
return PENDING_NOTIFICATIONS_KEY + "::{" + RedisClusterUtil.getMinimalHashTag(slot) + "}";
}
private int getNextSlot() {
return (int)(cluster.withCluster(connection -> connection.sync().incr(NEXT_SLOT_TO_PERSIST_KEY)) % SlotHash.SLOT_COUNT);
return (int)(pushSchedulingCluster.withCluster(connection -> connection.sync().incr(NEXT_SLOT_TO_PERSIST_KEY)) % SlotHash.SLOT_COUNT);
}
private static class VoipRatioGauge extends RatioGauge {
private final Meter success;
private final Meter attempts;
private VoipRatioGauge(Meter success, Meter attempts) {
this.success = success;
this.attempts = attempts;
}
@Override
protected Ratio getRatio() {
return RatioGauge.Ratio.of(success.getFiveMinuteRate(), attempts.getFiveMinuteRate());
}
}
}

View File

@@ -9,7 +9,6 @@ import static org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import io.micrometer.core.instrument.Metrics;
import org.apache.commons.lang3.StringUtils;
import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
import org.whispersystems.textsecuregcm.redis.RedisOperation;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device;

View File

@@ -0,0 +1,184 @@
/*
* Copyright 2013-2022 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.push;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.vdurmont.semver4j.Semver;
import io.lettuce.core.SetArgs;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException;
import org.whispersystems.textsecuregcm.util.ua.UserAgent;
import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil;
/**
* Measures and records the latency between sending a push notification to a device and that device draining its queue
* of messages.
* <p/>
* When the server sends a push notification to a device, the push latency manager creates a Redis key/value pair
* mapping the current timestamp to the given device if such a mapping doesn't already exist. When a client connects and
* clears its message queue, the push latency manager gets and clears the time of the initial push notification to that
* device and records the time elapsed since the push notification timestamp as a latency observation.
*/
public class PushLatencyManager {
private final FaultTolerantRedisCluster redisCluster;
private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
private final Clock clock;
private static final String TIMER_NAME = MetricRegistry.name(PushLatencyManager.class, "latency");
private static final int TTL = (int) Duration.ofDays(1).toSeconds();
private static final Logger log = LoggerFactory.getLogger(PushLatencyManager.class);
@VisibleForTesting
enum PushType {
STANDARD,
VOIP
}
@VisibleForTesting
static class PushRecord {
private final Instant timestamp;
@Nullable
private final PushType pushType;
@JsonCreator
PushRecord(@JsonProperty("timestamp") final Instant timestamp,
@JsonProperty("pushType") @Nullable final PushType pushType) {
this.timestamp = timestamp;
this.pushType = pushType;
}
public Instant getTimestamp() {
return timestamp;
}
@Nullable
public PushType getPushType() {
return pushType;
}
}
public PushLatencyManager(final FaultTolerantRedisCluster redisCluster,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) {
this(redisCluster, dynamicConfigurationManager, Clock.systemUTC());
}
@VisibleForTesting
PushLatencyManager(final FaultTolerantRedisCluster redisCluster,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
final Clock clock) {
this.redisCluster = redisCluster;
this.dynamicConfigurationManager = dynamicConfigurationManager;
this.clock = clock;
}
void recordPushSent(final UUID accountUuid, final long deviceId, final boolean isVoip) {
try {
final String recordJson = SystemMapper.getMapper().writeValueAsString(
new PushRecord(Instant.now(clock), isVoip ? PushType.VOIP : PushType.STANDARD));
redisCluster.useCluster(connection ->
connection.async().set(getFirstUnacknowledgedPushKey(accountUuid, deviceId),
recordJson,
SetArgs.Builder.nx().ex(TTL)));
} catch (final JsonProcessingException e) {
// This should never happen
log.error("Failed to write push latency record JSON", e);
}
}
void recordQueueRead(final UUID accountUuid, final long deviceId, final String userAgentString) {
takePushRecord(accountUuid, deviceId).thenAccept(pushRecord -> {
if (pushRecord != null) {
final Duration latency = Duration.between(pushRecord.getTimestamp(), Instant.now());
final List<Tag> tags = new ArrayList<>(2);
tags.add(UserAgentTagUtil.getPlatformTag(userAgentString));
if (pushRecord.getPushType() != null) {
tags.add(Tag.of("pushType", pushRecord.getPushType().name().toLowerCase()));
}
try {
final UserAgent userAgent = UserAgentUtil.parseUserAgentString(userAgentString);
final Set<Semver> instrumentedVersions =
dynamicConfigurationManager.getConfiguration().getPushLatencyConfiguration().getInstrumentedVersions()
.getOrDefault(userAgent.getPlatform(), Collections.emptySet());
if (instrumentedVersions.contains(userAgent.getVersion())) {
tags.add(Tag.of("clientVersion", userAgent.getVersion().toString()));
}
} catch (UnrecognizedUserAgentException ignored) {
}
Metrics.timer(TIMER_NAME, tags).record(latency);
}
});
}
@VisibleForTesting
CompletableFuture<PushRecord> takePushRecord(final UUID accountUuid, final long deviceId) {
final String key = getFirstUnacknowledgedPushKey(accountUuid, deviceId);
return redisCluster.withCluster(connection -> {
final CompletableFuture<PushRecord> getFuture = connection.async().get(key).toCompletableFuture()
.thenApply(recordJson -> {
if (StringUtils.isNotEmpty(recordJson)) {
try {
return SystemMapper.getMapper().readValue(recordJson, PushRecord.class);
} catch (JsonProcessingException e) {
return null;
}
} else {
return null;
}
});
getFuture.whenComplete((record, cause) -> {
if (cause == null) {
connection.async().del(key);
}
});
return getFuture;
});
}
private static String getFirstUnacknowledgedPushKey(final UUID accountUuid, final long deviceId) {
return "push_latency::v2::" + accountUuid.toString() + "::" + deviceId;
}
}

View File

@@ -5,6 +5,8 @@
package org.whispersystems.textsecuregcm.push;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tags;
@@ -18,14 +20,13 @@ import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.util.Util;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
public class PushNotificationManager {
private final AccountsManager accountsManager;
private final APNSender apnSender;
private final FcmSender fcmSender;
private final ApnFallbackManager fallbackManager;
private final ApnPushNotificationScheduler apnPushNotificationScheduler;
private final PushLatencyManager pushLatencyManager;
private static final String SENT_NOTIFICATION_COUNTER_NAME = name(PushNotificationManager.class, "sentPushNotification");
private static final String FAILED_NOTIFICATION_COUNTER_NAME = name(PushNotificationManager.class, "failedPushNotification");
@@ -35,12 +36,14 @@ public class PushNotificationManager {
public PushNotificationManager(final AccountsManager accountsManager,
final APNSender apnSender,
final FcmSender fcmSender,
final ApnFallbackManager fallbackManager) {
final ApnPushNotificationScheduler apnPushNotificationScheduler,
final PushLatencyManager pushLatencyManager) {
this.accountsManager = accountsManager;
this.apnSender = apnSender;
this.fcmSender = fcmSender;
this.fallbackManager = fallbackManager;
this.apnPushNotificationScheduler = apnPushNotificationScheduler;
this.pushLatencyManager = pushLatencyManager;
}
public void sendNewMessageNotification(final Account destination, final long destinationDeviceId) throws NotPushRegisteredException {
@@ -65,6 +68,11 @@ public class PushNotificationManager {
PushNotification.NotificationType.RATE_LIMIT_CHALLENGE, challengeToken, destination, device));
}
public void handleMessagesRetrieved(final Account account, final Device device, final String userAgent) {
RedisOperation.unchecked(() -> pushLatencyManager.recordQueueRead(account.getUuid(), device.getId(), userAgent));
RedisOperation.unchecked(() -> apnPushNotificationScheduler.cancelRecurringVoipNotification(account, device));
}
@VisibleForTesting
Pair<String, PushNotification.TokenType> getToken(final Device device) throws NotPushRegisteredException {
final Pair<String, PushNotification.TokenType> tokenAndType;
@@ -112,7 +120,7 @@ public class PushNotificationManager {
pushNotification.destination() != null &&
pushNotification.destinationDevice() != null) {
RedisOperation.unchecked(() -> fallbackManager.schedule(pushNotification.destination(),
RedisOperation.unchecked(() -> apnPushNotificationScheduler.scheduleRecurringVoipNotification(pushNotification.destination(),
pushNotification.destinationDevice()));
}
} else {
@@ -131,7 +139,7 @@ public class PushNotificationManager {
d.setUninstalledFeedbackTimestamp(Util.todayInMillis()));
}
} else {
RedisOperation.unchecked(() -> fallbackManager.cancel(account, device));
RedisOperation.unchecked(() -> apnPushNotificationScheduler.cancelRecurringVoipNotification(account, device));
}
}
}