Restore Redis retries for select operations

This commit is contained in:
Jon Chambers
2025-08-27 11:52:16 -04:00
committed by GitHub
parent f616612104
commit 8825396fc1
46 changed files with 449 additions and 262 deletions

View File

@@ -66,7 +66,6 @@ import org.whispersystems.textsecuregcm.configuration.TurnConfiguration;
import org.whispersystems.textsecuregcm.configuration.UnidentifiedDeliveryConfiguration;
import org.whispersystems.textsecuregcm.configuration.VirtualThreadConfiguration;
import org.whispersystems.textsecuregcm.configuration.ZkConfig;
import org.whispersystems.textsecuregcm.limits.RateLimiterConfig;
import org.whispersystems.websocket.configuration.WebSocketConfiguration;
/** @noinspection MismatchedQueryAndUpdateOfCollection, WeakerAccess */
@@ -192,11 +191,6 @@ public class WhisperServerConfiguration extends Configuration {
@JsonProperty
private List<MaxDeviceConfiguration> maxDevices = new LinkedList<>();
@Valid
@NotNull
@JsonProperty
private Map<String, RateLimiterConfig> limits = new HashMap<>();
@Valid
@NotNull
@JsonProperty
@@ -344,10 +338,15 @@ public class WhisperServerConfiguration extends Configuration {
new IdlePrimaryDeviceReminderConfiguration(Duration.ofDays(30));
@JsonProperty
private Map<String, CircuitBreakerConfiguration> circuitBreakers = Collections.emptyMap();
private Map<String, @Valid CircuitBreakerConfiguration> circuitBreakers = Collections.emptyMap();
@JsonProperty
private Map<String, RetryConfiguration> retries = Collections.emptyMap();
private Map<String, @Valid RetryConfiguration> retries = Collections.emptyMap();
@JsonProperty
@Valid
@NotNull
private RetryConfiguration generalRedisRetry = new RetryConfiguration();
public TlsKeyStoreConfiguration getTlsKeyStoreConfiguration() {
return tlsKeyStore;
@@ -579,4 +578,8 @@ public class WhisperServerConfiguration extends Configuration {
public Map<String, RetryConfiguration> getRetryConfigurations() {
return retries;
}
public RetryConfiguration getGeneralRedisRetryConfiguration() {
return generalRedisRetry;
}
}

View File

@@ -370,6 +370,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
config.getRetryConfigurations().forEach((name, configuration) ->
CircuitBreakerUtil.getRetryRegistry().addConfiguration(name, configuration.toRetryConfigBuilder().build()));
CircuitBreakerUtil.setGeneralRedisRetryConfiguration(config.getGeneralRedisRetryConfiguration());
ScheduledExecutorService dynamicConfigurationExecutor = ScheduledExecutorServiceBuilder.of(environment, "dynamicConfiguration")
.threads(1).build();
@@ -524,13 +526,10 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
.minThreads(1).maxThreads(1).build();
ExecutorService asyncOperationQueueingExecutor = ExecutorServiceBuilder.of(environment, "asyncOperationQueueing")
.minThreads(1).maxThreads(1).build();
ScheduledExecutorService secureValueRecoveryServiceRetryExecutor =
ScheduledExecutorServiceBuilder.of(environment, "secureValueRecoveryServiceRetry").threads(1).build();
ScheduledExecutorService storageServiceRetryExecutor =
ScheduledExecutorServiceBuilder.of(environment, "storageServiceRetry").threads(1).build();
ScheduledExecutorService remoteStorageRetryExecutor =
ScheduledExecutorServiceBuilder.of(environment, "remoteStorageRetry").threads(1).build();
ScheduledExecutorService registrationIdentityTokenRefreshExecutor =
final ScheduledExecutorService retryExecutor = ScheduledExecutorServiceBuilder.of(environment, "retry")
.threads(16).build();
final ScheduledExecutorService registrationIdentityTokenRefreshExecutor =
ScheduledExecutorServiceBuilder.of(environment, "registrationIdentityTokenRefresh").threads(1).build();
Scheduler messageDeliveryScheduler = Schedulers.fromExecutorService(
@@ -631,22 +630,24 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
SecureValueRecoveryClient secureValueRecovery2Client = new SecureValueRecoveryClient(
svr2CredentialsGenerator,
secureValueRecoveryServiceExecutor,
secureValueRecoveryServiceRetryExecutor,
retryExecutor,
config.getSvr2Configuration(),
() -> dynamicConfigurationManager.getConfiguration().getSvr2StatusCodesToIgnoreForAccountDeletion());
SecureValueRecoveryClient secureValueRecoveryBClient = new SecureValueRecoveryClient(
svrbCredentialsGenerator,
secureValueRecoveryServiceExecutor,
secureValueRecoveryServiceRetryExecutor,
retryExecutor,
config.getSvrbConfiguration(),
() -> dynamicConfigurationManager.getConfiguration().getSvrbStatusCodesToIgnoreForAccountDeletion());
SecureStorageClient secureStorageClient = new SecureStorageClient(storageCredentialsGenerator,
storageServiceExecutor, storageServiceRetryExecutor, config.getSecureStorageServiceConfiguration());
storageServiceExecutor, retryExecutor, config.getSecureStorageServiceConfiguration());
final GrpcClientConnectionManager grpcClientConnectionManager = new GrpcClientConnectionManager();
DisconnectionRequestManager disconnectionRequestManager = new DisconnectionRequestManager(pubsubClient, grpcClientConnectionManager, disconnectionRequestListenerExecutor);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster, asyncCdnS3Client, config.getCdnConfiguration().bucket());
DisconnectionRequestManager disconnectionRequestManager = new DisconnectionRequestManager(pubsubClient,
grpcClientConnectionManager, disconnectionRequestListenerExecutor, retryExecutor);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster, retryExecutor, asyncCdnS3Client,
config.getCdnConfiguration().bucket());
MessagesCache messagesCache = new MessagesCache(messagesCluster, messageDeliveryScheduler,
messageDeletionAsyncExecutor, clock, experimentEnrollmentManager);
messageDeletionAsyncExecutor, retryExecutor, clock, experimentEnrollmentManager);
ClientReleaseManager clientReleaseManager = new ClientReleaseManager(clientReleases,
recurringJobExecutor,
config.getClientReleaseConfiguration().refreshInterval(),
@@ -665,15 +666,15 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
pubsubClient, accountLockManager, keysManager, messagesManager, profilesManager,
secureStorageClient, secureValueRecovery2Client, disconnectionRequestManager,
registrationRecoveryPasswordsManager, clientPublicKeysManager, accountLockExecutor, messagePollExecutor,
clock, config.getLinkDeviceSecretConfiguration().secret().value(), dynamicConfigurationManager);
retryExecutor, clock, config.getLinkDeviceSecretConfiguration().secret().value(), dynamicConfigurationManager);
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
APNSender apnSender = new APNSender(apnSenderExecutor, config.getApnConfiguration());
FcmSender fcmSender = new FcmSender(fcmSenderExecutor, config.getFcmConfiguration().credentials().value());
PushNotificationScheduler pushNotificationScheduler = new PushNotificationScheduler(pushSchedulerCluster,
apnSender, fcmSender, accountsManager, 0, 0);
apnSender, fcmSender, accountsManager, 0, 0, retryExecutor);
PushNotificationManager pushNotificationManager =
new PushNotificationManager(accountsManager, apnSender, fcmSender, pushNotificationScheduler);
RateLimiters rateLimiters = RateLimiters.create(dynamicConfigurationManager, rateLimitersCluster);
RateLimiters rateLimiters = RateLimiters.create(dynamicConfigurationManager, rateLimitersCluster, retryExecutor);
ProvisioningManager provisioningManager = new ProvisioningManager(pubsubClient);
IssuedReceiptsManager issuedReceiptsManager = new IssuedReceiptsManager(
config.getDynamoDbTables().getIssuedReceipts().getTableName(),
@@ -806,7 +807,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
clock);
final Cdn3RemoteStorageManager cdn3RemoteStorageManager = new Cdn3RemoteStorageManager(
remoteStorageHttpExecutor,
remoteStorageRetryExecutor,
retryExecutor,
config.getCdn3StorageManagerConfiguration());
BackupManager backupManager = new BackupManager(
backupsDb,

View File

@@ -8,10 +8,12 @@ package org.whispersystems.textsecuregcm.auth;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.InvalidProtocolBufferException;
import io.dropwizard.lifecycle.Managed;
import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -21,6 +23,7 @@ import java.util.UUID;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,6 +35,7 @@ import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClient;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import org.whispersystems.textsecuregcm.util.UUIDUtil;
/**
@@ -45,6 +49,11 @@ public class DisconnectionRequestManager extends RedisPubSubAdapter<byte[], byte
private final FaultTolerantRedisClient pubSubClient;
private final GrpcClientConnectionManager grpcClientConnectionManager;
private final Executor listenerEventExecutor;
private final ScheduledExecutorService retryExecutor;
private static final String RETRY_NAME = DisconnectionRequestManager.class.getSimpleName();
private static final Duration SUBSCRIBE_RETRY_DELAY = Duration.ofSeconds(5);
private final Map<AccountIdentifierAndDeviceId, List<DisconnectionRequestListener>> listeners =
new ConcurrentHashMap<>();
@@ -66,11 +75,13 @@ public class DisconnectionRequestManager extends RedisPubSubAdapter<byte[], byte
public DisconnectionRequestManager(final FaultTolerantRedisClient pubSubClient,
final GrpcClientConnectionManager grpcClientConnectionManager,
final Executor listenerEventExecutor) {
final Executor listenerEventExecutor,
final ScheduledExecutorService retryExecutor) {
this.pubSubClient = pubSubClient;
this.grpcClientConnectionManager = grpcClientConnectionManager;
this.listenerEventExecutor = listenerEventExecutor;
this.retryExecutor = retryExecutor;
}
@Override
@@ -78,7 +89,25 @@ public class DisconnectionRequestManager extends RedisPubSubAdapter<byte[], byte
this.pubSubConnection = pubSubClient.createBinaryPubSubConnection();
this.pubSubConnection.usePubSubConnection(connection -> {
connection.addListener(this);
connection.sync().subscribe(DISCONNECTION_REQUEST_CHANNEL);
boolean subscribed = false;
// Loop indefinitely until we establish a subscription. We don't want to fail immediately if there's a temporary
// Redis connectivity issue, since that would derail the whole startup process and likely lead to unnecessary pod
// churn, which might make things worse. If we never establish a connection, readiness probes will eventually fail
// and terminate the pods.
do {
try {
connection.sync().subscribe(DISCONNECTION_REQUEST_CHANNEL);
subscribed = true;
} catch (final RedisCommandTimeoutException e) {
try {
Thread.sleep(SUBSCRIBE_RETRY_DELAY);
} catch (final InterruptedException ex) {
throw new RuntimeException(ex);
}
}
} while (!subscribed);
});
}
@@ -159,9 +188,10 @@ public class DisconnectionRequestManager extends RedisPubSubAdapter<byte[], byte
.addAllDeviceIds(deviceIds.stream().mapToInt(Byte::intValue).boxed().toList())
.build();
return pubSubClient.withBinaryConnection(connection ->
connection.async().publish(DISCONNECTION_REQUEST_CHANNEL, disconnectionRequest.toByteArray()))
.toCompletableFuture()
return CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME)
.executeCompletionStage(retryExecutor, () -> pubSubClient.withBinaryConnection(connection ->
connection.async().publish(DISCONNECTION_REQUEST_CHANNEL, disconnectionRequest.toByteArray()))
.toCompletableFuture())
.thenRun(DISCONNECTION_REQUESTS_SENT_COUNTER::increment);
}

View File

@@ -13,6 +13,7 @@ import java.io.UncheckedIOException;
import java.time.Clock;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
@@ -30,11 +31,12 @@ public abstract class BaseRateLimiters<T extends RateLimiterDescriptor> {
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
final ClusterLuaScript validateScript,
final FaultTolerantRedisClusterClient cacheCluster,
final ScheduledExecutorService retryExecutor,
final Clock clock) {
this.rateLimiterByDescriptor = Arrays.stream(values)
.map(descriptor -> Pair.of(
descriptor,
createForDescriptor(descriptor, dynamicConfigurationManager, validateScript, cacheCluster, clock)))
createForDescriptor(descriptor, dynamicConfigurationManager, validateScript, cacheCluster, retryExecutor, clock)))
.collect(Collectors.toUnmodifiableMap(Pair::getKey, Pair::getValue));
}
@@ -56,9 +58,10 @@ public abstract class BaseRateLimiters<T extends RateLimiterDescriptor> {
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
final ClusterLuaScript validateScript,
final FaultTolerantRedisClusterClient cacheCluster,
final ScheduledExecutorService retryExecutor,
final Clock clock) {
final Supplier<RateLimiterConfig> configResolver =
() -> dynamicConfigurationManager.getConfiguration().getLimits().getOrDefault(descriptor.id(), descriptor.defaultConfig());
return new DynamicRateLimiter(descriptor.id(), configResolver, validateScript, cacheCluster, clock);
return new DynamicRateLimiter(descriptor.id(), configResolver, validateScript, cacheCluster, retryExecutor, clock);
}
}

View File

@@ -14,11 +14,13 @@ import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;
import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import org.whispersystems.textsecuregcm.util.Util;
@@ -30,22 +32,26 @@ public class DynamicRateLimiter implements RateLimiter {
private final ClusterLuaScript validateScript;
private final FaultTolerantRedisClusterClient cluster;
private final ScheduledExecutorService retryExecutor;
private final Counter limitExceededCounter;
private final Clock clock;
private static final String RETRY_NAME = DynamicRateLimiter.class.getSimpleName();
public DynamicRateLimiter(
final String name,
final Supplier<RateLimiterConfig> configResolver,
final ClusterLuaScript validateScript,
final FaultTolerantRedisClusterClient cluster,
final ScheduledExecutorService retryExecutor,
final Clock clock) {
this.name = requireNonNull(name);
this.configResolver = requireNonNull(configResolver);
this.validateScript = requireNonNull(validateScript);
this.cluster = requireNonNull(cluster);
this.retryExecutor = requireNonNull(retryExecutor);
this.clock = requireNonNull(clock);
this.limitExceededCounter = Metrics.counter(MetricsUtil.name(getClass(), "exceeded"), "rateLimiterName", name);
}
@@ -129,13 +135,15 @@ public class DynamicRateLimiter implements RateLimiter {
@Override
public void clear(final String key) {
cluster.useCluster(connection -> connection.sync().del(bucketName(name, key)));
CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME)
.executeRunnable(() -> cluster.useCluster(connection -> connection.sync().del(bucketName(name, key))));
}
@Override
public CompletionStage<Void> clearAsync(final String key) {
return cluster.withCluster(connection -> connection.async().del(bucketName(name, key)))
.thenRun(Util.NOOP);
return CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME)
.executeCompletionStage(retryExecutor, () -> cluster.withCluster(connection -> connection.async().del(bucketName(name, key)))
.thenRun(Util.NOOP));
}
@Override

View File

@@ -7,6 +7,7 @@ package org.whispersystems.textsecuregcm.limits;
import com.google.common.annotations.VisibleForTesting;
import java.time.Clock;
import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
@@ -77,9 +78,10 @@ public class RateLimiters extends BaseRateLimiters<RateLimiters.For> {
public static RateLimiters create(
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
final FaultTolerantRedisClusterClient cacheCluster) {
final FaultTolerantRedisClusterClient cacheCluster,
final ScheduledExecutorService retryExecutor) {
return new RateLimiters(
dynamicConfigurationManager, defaultScript(cacheCluster), cacheCluster, Clock.systemUTC());
dynamicConfigurationManager, defaultScript(cacheCluster), cacheCluster, retryExecutor, Clock.systemUTC());
}
@VisibleForTesting
@@ -87,8 +89,9 @@ public class RateLimiters extends BaseRateLimiters<RateLimiters.For> {
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
final ClusterLuaScript validateScript,
final FaultTolerantRedisClusterClient cacheCluster,
final ScheduledExecutorService retryExecutor,
final Clock clock) {
super(For.values(), dynamicConfigurationManager, validateScript, cacheCluster, clock);
super(For.values(), dynamicConfigurationManager, validateScript, cacheCluster, retryExecutor, clock);
}
public RateLimiter getAllocateDeviceLimiter() {

View File

@@ -23,6 +23,7 @@ import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClient;
import org.whispersystems.textsecuregcm.redis.RedisOperation;
import org.whispersystems.textsecuregcm.storage.PubSubProtos;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
public class ProvisioningManager extends RedisPubSubAdapter<byte[], byte[]> implements Managed {
@@ -40,6 +41,8 @@ public class ProvisioningManager extends RedisPubSubAdapter<byte[], byte[]> impl
private static final String RECEIVE_PROVISIONING_MESSAGE_COUNTER_NAME =
name(ProvisioningManager.class, "receiveProvisioningMessage");
private static final String RETRY_NAME = ProvisioningManager.class.getSimpleName();
private static final Logger logger = LoggerFactory.getLogger(ProvisioningManager.class);
public ProvisioningManager(final FaultTolerantRedisClient pubSubClient) {
@@ -77,8 +80,9 @@ public class ProvisioningManager extends RedisPubSubAdapter<byte[], byte[]> impl
.setContent(ByteString.copyFrom(body))
.build();
final boolean receiverPresent = pubSubClient.withBinaryConnection(connection ->
connection.sync().publish(address.getBytes(StandardCharsets.UTF_8), pubSubMessage.toByteArray()) > 0);
final boolean receiverPresent = CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME)
.executeSupplier(() -> pubSubClient.withBinaryConnection(connection ->
connection.sync().publish(address.getBytes(StandardCharsets.UTF_8), pubSubMessage.toByteArray()) > 0));
Metrics.counter(SEND_PROVISIONING_MESSAGE_COUNTER_NAME, "online", String.valueOf(receiverPresent)).increment();

View File

@@ -23,6 +23,7 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
@@ -34,6 +35,7 @@ import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.util.RedisClusterUtil;
import org.whispersystems.textsecuregcm.util.Util;
@@ -65,6 +67,7 @@ public class PushNotificationScheduler implements Managed {
private final FcmSender fcmSender;
private final AccountsManager accountsManager;
private final FaultTolerantRedisClusterClient pushSchedulingCluster;
private final ScheduledExecutorService retryExecutor;
private final Clock clock;
private final ClusterLuaScript scheduleBackgroundNotificationScript;
@@ -76,6 +79,8 @@ public class PushNotificationScheduler implements Managed {
private final AtomicBoolean running = new AtomicBoolean(false);
private static final String RETRY_NAME = PushNotificationScheduler.class.getSimpleName();
class NotificationWorker implements Runnable {
private final int maxConcurrency;
@@ -154,7 +159,8 @@ public class PushNotificationScheduler implements Managed {
final FcmSender fcmSender,
final AccountsManager accountsManager,
final int dedicatedProcessWorkerThreadCount,
final int workerMaxConcurrency) throws IOException {
final int workerMaxConcurrency,
final ScheduledExecutorService retryExecutor) throws IOException {
this(pushSchedulingCluster,
apnSender,
@@ -162,17 +168,19 @@ public class PushNotificationScheduler implements Managed {
accountsManager,
Clock.systemUTC(),
dedicatedProcessWorkerThreadCount,
workerMaxConcurrency);
workerMaxConcurrency,
retryExecutor);
}
@VisibleForTesting
PushNotificationScheduler(final FaultTolerantRedisClusterClient pushSchedulingCluster,
final APNSender apnSender,
final FcmSender fcmSender,
final AccountsManager accountsManager,
final Clock clock,
final int dedicatedProcessThreadCount,
final int workerMaxConcurrency) throws IOException {
final APNSender apnSender,
final FcmSender fcmSender,
final AccountsManager accountsManager,
final Clock clock,
final int dedicatedProcessThreadCount,
final int workerMaxConcurrency,
final ScheduledExecutorService retryExecutor) throws IOException {
this.apnSender = apnSender;
this.fcmSender = fcmSender;
@@ -184,6 +192,7 @@ public class PushNotificationScheduler implements Managed {
"lua/apn/schedule_background_notification.lua", ScriptOutputType.VALUE);
this.workerThreads = new Thread[dedicatedProcessThreadCount];
this.retryExecutor = retryExecutor;
for (int i = 0; i < this.workerThreads.length; i++) {
this.workerThreads[i] = new Thread(new NotificationWorker(workerMaxConcurrency), "PushNotificationScheduler-" + i);
@@ -225,13 +234,16 @@ public class PushNotificationScheduler implements Managed {
* @return a future that completes once the notification has been scheduled
*/
public CompletableFuture<Void> scheduleDelayedNotification(final Account account, final Device device, final Duration minDelay) {
return pushSchedulingCluster.withCluster(connection ->
connection.async().zadd(getDelayedNotificationQueueKey(account, device),
clock.instant().plus(minDelay).toEpochMilli(),
encodeAciAndDeviceId(account, device)))
.thenRun(() -> Metrics.counter(DELAYED_NOTIFICATION_SCHEDULED_COUNTER_NAME,
TOKEN_TYPE_TAG, getTokenType(device))
.increment())
final long deliveryTime = clock.instant().plus(minDelay).toEpochMilli();
return CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME)
.executeCompletionStage(retryExecutor, () -> pushSchedulingCluster.withCluster(connection ->
connection.async().zadd(getDelayedNotificationQueueKey(account, device),
deliveryTime,
encodeAciAndDeviceId(account, device)))
.thenRun(() -> Metrics.counter(DELAYED_NOTIFICATION_SCHEDULED_COUNTER_NAME,
TOKEN_TYPE_TAG, getTokenType(device))
.increment()))
.toCompletableFuture();
}

View File

@@ -13,6 +13,7 @@ import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.dropwizard.lifecycle.Managed;
import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.RedisException;
import io.lettuce.core.SetArgs;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
@@ -84,6 +85,7 @@ import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClient;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient;
import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecoveryClient;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.util.RegistrationIdValidator;
@@ -116,6 +118,10 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
private static final String TIMESTAMP_BASED_TRANSFER_ARCHIVE_KEY_COUNTER_NAME = name(AccountsManager.class, "timestampRedisKeyCounter");
private static final String REGISTRATION_ID_BASED_TRANSFER_ARCHIVE_KEY_COUNTER_NAME = name(AccountsManager.class,"registrationIdRedisKeyCounter");
private static final String RETRY_NAME = AccountsManager.class.getSimpleName();
private static final Duration SUBSCRIBE_RETRY_DELAY = Duration.ofSeconds(5);
private static final Logger logger = LoggerFactory.getLogger(AccountsManager.class);
private final Accounts accounts;
@@ -133,6 +139,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
private final ClientPublicKeysManager clientPublicKeysManager;
private final Executor accountLockExecutor;
private final ScheduledExecutorService messagesPollExecutor;
private final ScheduledExecutorService retryExecutor;
private final Clock clock;
private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
@@ -222,7 +229,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
final RegistrationRecoveryPasswordsManager registrationRecoveryPasswordsManager,
final ClientPublicKeysManager clientPublicKeysManager,
final Executor accountLockExecutor,
final ScheduledExecutorService messagesPollExecutor,
final ScheduledExecutorService messagesPollExecutor, final ScheduledExecutorService retryExecutor,
final Clock clock,
final byte[] linkDeviceSecret,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) {
@@ -241,6 +248,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
this.clientPublicKeysManager = clientPublicKeysManager;
this.accountLockExecutor = accountLockExecutor;
this.messagesPollExecutor = messagesPollExecutor;
this.retryExecutor = retryExecutor;
this.clock = requireNonNull(clock);
this.dynamicConfigurationManager = dynamicConfigurationManager;
@@ -260,8 +268,27 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
public void start() {
pubSubConnection.usePubSubConnection(connection -> {
connection.addListener(this);
connection.sync().psubscribe(LINKED_DEVICE_KEYSPACE_PATTERN, TRANSFER_ARCHIVE_KEYSPACE_PATTERN,
RESTORE_ACCOUNT_REQUEST_KEYSPACE_PATTERN);
boolean subscribed = false;
// Loop indefinitely until we establish a subscription. We don't want to fail immediately if there's a temporary
// Redis connectivity issue, since that would derail the whole startup process and likely lead to unnecessary pod
// churn, which might make things worse. If we never establish a connection, readiness probes will eventually fail
// and terminate the pods.
do {
try {
connection.sync().psubscribe(LINKED_DEVICE_KEYSPACE_PATTERN, TRANSFER_ARCHIVE_KEYSPACE_PATTERN,
RESTORE_ACCOUNT_REQUEST_KEYSPACE_PATTERN);
subscribed = true;
} catch (final RedisCommandTimeoutException e) {
try {
Thread.sleep(SUBSCRIBE_RETRY_DELAY);
} catch (final InterruptedException ex) {
throw new RuntimeException(ex);
}
}
} while (!subscribed);
});
}
@@ -484,7 +511,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
return CompletableFuture.failedFuture(throwable);
})
.whenComplete((updatedAccountAndDevice, throwable) -> {
.whenComplete((updatedAccountAndDevice, _) -> {
if (updatedAccountAndDevice != null) {
final String key = getLinkedDeviceKey(getLinkDeviceTokenIdentifier(linkDeviceToken));
final String deviceInfoJson;
@@ -495,9 +522,10 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
throw new UncheckedIOException(e);
}
pubSubRedisClient.withConnection(connection ->
connection.async().set(key, deviceInfoJson, SetArgs.Builder.ex(RECENTLY_ADDED_DEVICE_TTL)))
.whenComplete((ignored, pubSubThrowable) -> {
CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME)
.executeCompletionStage(retryExecutor, () -> pubSubRedisClient.withConnection(connection ->
connection.async().set(key, deviceInfoJson, SetArgs.Builder.ex(RECENTLY_ADDED_DEVICE_TTL))))
.whenComplete((_, pubSubThrowable) -> {
if (pubSubThrowable != null) {
logger.warn("Failed to record recently-created device", pubSubThrowable);
}
@@ -1399,10 +1427,11 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
}
private void redisDelete(final Account account) {
redisDeleteTimer.record(() ->
cacheCluster.useCluster(connection ->
connection.sync().del(getAccountMapKey(account.getPhoneNumberIdentifier().toString()),
getAccountEntityKey(account.getUuid()))));
CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME).executeRunnable(() ->
redisDeleteTimer.record(() ->
cacheCluster.useCluster(connection ->
connection.sync().del(getAccountMapKey(account.getPhoneNumberIdentifier().toString()),
getAccountEntityKey(account.getUuid())))));
}
private CompletableFuture<Void> redisDeleteAsync(final Account account) {
@@ -1413,10 +1442,11 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
getAccountEntityKey(account.getUuid())
};
return cacheCluster.withCluster(connection -> connection.async().del(keysToDelete))
return CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME).executeCompletionStage(retryExecutor,
() -> cacheCluster.withCluster(connection -> connection.async().del(keysToDelete))
.thenRun(Util.NOOP))
.toCompletableFuture()
.whenComplete((ignoredResult, ignoredException) -> sample.stop(redisDeleteTimer))
.thenRun(Util.NOOP);
.whenComplete((_, _) -> sample.stop(redisDeleteTimer));
}
public CompletableFuture<Optional<DeviceInfo>> waitForNewLinkedDevice(
@@ -1564,19 +1594,19 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
try {
final String transferArchiveJson = SystemMapper.jsonMapper().writeValueAsString(transferArchiveResult);
return pubSubRedisClient.withConnection(connection -> {
final String key = destinationDeviceCreationTimestamp
.map(timestamp -> getTimestampTransferArchiveKey(account.getIdentifier(IdentityType.ACI), destinationDeviceId, timestamp))
.orElseGet(() -> maybeRegistrationId
.map(registrationId -> getRegistrationIdTransferArchiveKey(account.getIdentifier(IdentityType.ACI), destinationDeviceId, registrationId))
// We validate the request object so this should never happen
.orElseThrow(() -> new AssertionError("No creation timestamp or registration ID provided")));
final String key = destinationDeviceCreationTimestamp
.map(timestamp -> getTimestampTransferArchiveKey(account.getIdentifier(IdentityType.ACI), destinationDeviceId, timestamp))
.orElseGet(() -> maybeRegistrationId
.map(registrationId -> getRegistrationIdTransferArchiveKey(account.getIdentifier(IdentityType.ACI), destinationDeviceId, registrationId))
// We validate the request object so this should never happen
.orElseThrow(() -> new AssertionError("No creation timestamp or registration ID provided")));
return connection.async()
.set(key, transferArchiveJson, SetArgs.Builder.ex(RECENTLY_ADDED_TRANSFER_ARCHIVE_TTL))
.thenRun(Util.NOOP)
.toCompletableFuture();
});
return CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME)
.executeCompletionStage(retryExecutor, () -> pubSubRedisClient.withConnection(connection -> connection.async()
.set(key, transferArchiveJson, SetArgs.Builder.ex(RECENTLY_ADDED_TRANSFER_ARCHIVE_TTL)))
.toCompletableFuture())
.thenRun(Util.NOOP)
.toCompletableFuture();
} catch (final JsonProcessingException e) {
// This should never happen for well-defined objects we control
throw new UncheckedIOException(e);
@@ -1632,8 +1662,10 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
throw new UncheckedIOException(e);
}
return pubSubRedisClient.withConnection(connection ->
connection.async().set(key, requestJson, SetArgs.Builder.ex(RESTORE_ACCOUNT_REQUEST_TTL)))
return CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME)
.executeCompletionStage(retryExecutor, () -> pubSubRedisClient.withConnection(connection ->
connection.async().set(key, requestJson, SetArgs.Builder.ex(RESTORE_ACCOUNT_REQUEST_TTL)))
.toCompletableFuture())
.thenRun(Util.NOOP)
.toCompletableFuture();
}

View File

@@ -10,6 +10,7 @@ import static com.codahale.metrics.MetricRegistry.name;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import io.github.resilience4j.reactor.retry.RetryOperator;
import io.lettuce.core.Limit;
import io.lettuce.core.Range;
import io.lettuce.core.ScoredValue;
@@ -34,6 +35,7 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.function.Predicate;
import org.reactivestreams.Publisher;
@@ -46,6 +48,7 @@ import org.whispersystems.textsecuregcm.experiment.ExperimentEnrollmentManager;
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.util.RedisClusterUtil;
import reactor.core.observability.micrometer.Micrometer;
@@ -53,7 +56,6 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.RetrySpec;
/**
* Manages short-term storage of messages in Redis. Messages are frequently delivered to their destination and deleted
@@ -150,6 +152,8 @@ public class MessagesCache {
private final Counter sharedMrmDataKeyRemovedCounter = Metrics.counter(
name(MessagesCache.class, "sharedMrmKeyRemoved"));
static final String RETRY_NAME = MessagesCache.class.getSimpleName();
static final String NEXT_SLOT_TO_PERSIST_KEY = "user_queue_persist_slot";
private static final byte[] LOCK_VALUE = "1".getBytes(StandardCharsets.UTF_8);
@@ -169,6 +173,7 @@ public class MessagesCache {
public MessagesCache(final FaultTolerantRedisClusterClient redisCluster,
final Scheduler messageDeliveryScheduler,
final ExecutorService messageDeletionExecutorService,
final ScheduledExecutorService retryExecutor,
final Clock clock,
final ExperimentEnrollmentManager experimentEnrollmentManager)
throws IOException {
@@ -179,10 +184,10 @@ public class MessagesCache {
messageDeletionExecutorService,
clock,
experimentEnrollmentManager,
new MessagesCacheInsertScript(redisCluster),
new MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript(redisCluster),
new MessagesCacheInsertScript(redisCluster, retryExecutor),
new MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript(redisCluster, retryExecutor),
new MessagesCacheGetItemsScript(redisCluster),
new MessagesCacheRemoveByGuidScript(redisCluster),
new MessagesCacheRemoveByGuidScript(redisCluster, retryExecutor),
new MessagesCacheRemoveQueueScript(redisCluster),
new MessagesCacheGetQueuesToPersistScript(redisCluster),
new MessagesCacheRemoveRecipientViewFromMrmDataScript(redisCluster),
@@ -230,7 +235,8 @@ public class MessagesCache {
final Timer.Sample sample = Timer.start();
return insertScript.executeAsync(destinationAccountIdentifier, destinationDeviceId, messageWithGuid)
.whenComplete((ignored, throwable) -> sample.stop(insertTimer));
.toCompletableFuture()
.whenComplete((_, _) -> sample.stop(insertTimer));
}
public CompletableFuture<byte[]> insertSharedMultiRecipientMessagePayload(
@@ -241,8 +247,9 @@ public class MessagesCache {
final byte[] sharedMrmKey = getSharedMrmKey(UUID.randomUUID());
return insertMrmScript.executeAsync(sharedMrmKey, sealedSenderMultiRecipientMessage)
.thenApply(ignored -> sharedMrmKey)
.whenComplete((ignored, throwable) -> sample.stop(insertSharedMrmPayloadTimer));
.thenApply(_ -> sharedMrmKey)
.toCompletableFuture()
.whenComplete((_, _) -> sample.stop(insertSharedMrmPayloadTimer));
}
public CompletableFuture<Optional<RemovedMessage>> remove(final UUID destinationUuid, final byte destinationDevice,
@@ -269,7 +276,7 @@ public class MessagesCache {
removedMessages.add(RemovedMessage.fromEnvelope(envelope));
if (envelope.hasSharedMrmKey()) {
serviceIdentifierToMrmKeys.computeIfAbsent(
ServiceIdentifier.valueOf(envelope.getDestinationServiceId()), ignored -> new ArrayList<>())
ServiceIdentifier.valueOf(envelope.getDestinationServiceId()), _ -> new ArrayList<>())
.add(envelope.getSharedMrmKey().toByteArray());
}
} catch (final InvalidProtocolBufferException e) {
@@ -281,7 +288,9 @@ public class MessagesCache {
(serviceId, keysToUpdate) -> removeRecipientViewFromMrmData(keysToUpdate, serviceId, destinationDevice));
return removedMessages;
}, messageDeletionExecutorService).whenComplete((removedMessages, throwable) -> {
}, messageDeletionExecutorService)
.toCompletableFuture()
.whenComplete((removedMessages, _) -> {
if (removedMessages != null) {
removeMessageCounter.increment(removedMessages.size());
}
@@ -291,13 +300,6 @@ public class MessagesCache {
}
public CompletableFuture<Boolean> hasMessagesAsync(final UUID destinationUuid, final byte destinationDevice) {
return redisCluster.withBinaryCluster(connection ->
connection.async().zcard(getMessageQueueKey(destinationUuid, destinationDevice))
.thenApply(cardinality -> cardinality > 0))
.toCompletableFuture();
}
public Publisher<MessageProtos.Envelope> get(final UUID destinationUuid, final byte destinationDevice) {
final long earliestAllowableEphemeralTimestamp =
@@ -438,6 +440,7 @@ public class MessagesCache {
conn -> conn.reactive().hmget(key, "data".getBytes(StandardCharsets.UTF_8), sharedMrmViewKey)
.collectList()
.publishOn(messageDeliveryScheduler)))
.transformDeferred(RetryOperator.of(CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME)))
.<MessageProtos.Envelope>handle((mrmDataAndView, sink) -> {
try {
assert mrmDataAndView.size() == 2;
@@ -523,7 +526,6 @@ public class MessagesCache {
long messageId, int pageSize) {
return getItemsScript.execute(destinationUuid, destinationDevice, pageSize, messageId)
.retryWhen(RetrySpec.backoff(4, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(4)))
.map(queueItems -> {
logger.trace("Processing page: {}", messageId);
@@ -653,7 +655,7 @@ public class MessagesCache {
if (message.hasSharedMrmKey()) {
serviceIdentifierToMrmKeys.computeIfAbsent(ServiceIdentifier.valueOf(message.getDestinationServiceId()),
ignored -> new ArrayList<>())
_ -> new ArrayList<>())
.add(message.getSharedMrmKey().toByteArray());
}
} catch (final InvalidProtocolBufferException e) {
@@ -675,7 +677,7 @@ public class MessagesCache {
try {
return redisCluster.withBinaryCluster(
connection -> connection.getPartitions().getPartitionBySlot(slot).getUri().getHost());
} catch (Throwable ignored) {
} catch (final Throwable _) {
return "unknown";
}
}

View File

@@ -5,6 +5,7 @@
package org.whispersystems.textsecuregcm.storage;
import io.github.resilience4j.reactor.retry.RetryOperator;
import io.lettuce.core.ScriptOutputType;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -12,6 +13,7 @@ import java.util.List;
import java.util.UUID;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import reactor.core.publisher.Mono;
/**
@@ -38,6 +40,7 @@ class MessagesCacheGetItemsScript {
);
//noinspection unchecked
return getItemsScript.executeBinaryReactive(keys, args)
.transformDeferred(RetryOperator.of(CircuitBreakerUtil.getGeneralRedisRetry(MessagesCache.RETRY_NAME)))
.map(result -> (List<byte[]>) result)
.next();
}

View File

@@ -12,13 +12,15 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.push.ClientEvent;
import org.whispersystems.textsecuregcm.push.NewMessageAvailableEvent;
import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
/**
* Inserts an envelope into the message queue for a destination device and publishes a "new message available" event.
@@ -26,14 +28,18 @@ import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
class MessagesCacheInsertScript {
private final ClusterLuaScript insertScript;
private final ScheduledExecutorService retryExecutor;
private static final byte[] NEW_MESSAGE_EVENT_BYTES = ClientEvent.newBuilder()
.setNewMessageAvailable(NewMessageAvailableEvent.getDefaultInstance())
.build()
.toByteArray();
MessagesCacheInsertScript(FaultTolerantRedisClusterClient redisCluster) throws IOException {
MessagesCacheInsertScript(FaultTolerantRedisClusterClient redisCluster,
final ScheduledExecutorService retryExecutor) throws IOException {
this.insertScript = ClusterLuaScript.fromResource(redisCluster, "lua/insert_item.lua", ScriptOutputType.BOOLEAN);
this.retryExecutor = retryExecutor;
}
/**
@@ -45,7 +51,7 @@ class MessagesCacheInsertScript {
* @return {@code true} if the destination device had a registered "presence"/event subscriber or {@code false}
* otherwise
*/
CompletableFuture<Boolean> executeAsync(final UUID destinationUuid, final byte destinationDevice, final MessageProtos.Envelope envelope) {
CompletionStage<Boolean> executeAsync(final UUID destinationUuid, final byte destinationDevice, final MessageProtos.Envelope envelope) {
assert envelope.hasServerGuid();
assert envelope.hasServerTimestamp();
@@ -63,7 +69,8 @@ class MessagesCacheInsertScript {
NEW_MESSAGE_EVENT_BYTES // eventPayload
));
return insertScript.executeBinaryAsync(keys, args)
return CircuitBreakerUtil.getGeneralRedisRetry(MessagesCache.RETRY_NAME)
.executeCompletionStage(retryExecutor, () -> insertScript.executeBinaryAsync(keys, args))
.thenApply(result -> (boolean) result);
}
}

View File

@@ -9,10 +9,12 @@ import io.lettuce.core.ScriptOutputType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import org.signal.libsignal.protocol.SealedSenderMultiRecipientMessage;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import org.whispersystems.textsecuregcm.util.Util;
/**
@@ -24,16 +26,20 @@ import org.whispersystems.textsecuregcm.util.Util;
class MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript {
private final ClusterLuaScript script;
private final ScheduledExecutorService retryExecutor;
static final String ERROR_KEY_EXISTS = "ERR key exists";
MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript(FaultTolerantRedisClusterClient redisCluster)
throws IOException {
MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript(FaultTolerantRedisClusterClient redisCluster,
final ScheduledExecutorService retryExecutor) throws IOException {
this.script = ClusterLuaScript.fromResource(redisCluster, "lua/insert_shared_multirecipient_message_data.lua",
ScriptOutputType.INTEGER);
this.retryExecutor = retryExecutor;
}
CompletableFuture<Void> executeAsync(final byte[] sharedMrmKey, final SealedSenderMultiRecipientMessage message) {
CompletionStage<Void> executeAsync(final byte[] sharedMrmKey, final SealedSenderMultiRecipientMessage message) {
final List<byte[]> keys = List.of(
sharedMrmKey // sharedMrmKey
);
@@ -49,7 +55,8 @@ class MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript {
}
});
return script.executeBinaryAsync(keys, args)
return CircuitBreakerUtil.getGeneralRedisRetry(MessagesCache.RETRY_NAME)
.executeCompletionStage(retryExecutor, () -> script.executeBinaryAsync(keys, args))
.thenRun(Util.NOOP);
}
}

View File

@@ -10,9 +10,11 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
/**
* Removes a list of message GUIDs from the queue of a destination device.
@@ -20,13 +22,17 @@ import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
class MessagesCacheRemoveByGuidScript {
private final ClusterLuaScript removeByGuidScript;
private final ScheduledExecutorService retryExecutor;
MessagesCacheRemoveByGuidScript(final FaultTolerantRedisClusterClient redisCluster,
final ScheduledExecutorService retryExecutor) throws IOException {
MessagesCacheRemoveByGuidScript(final FaultTolerantRedisClusterClient redisCluster) throws IOException {
this.removeByGuidScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_item_by_guid.lua",
ScriptOutputType.OBJECT);
this.retryExecutor = retryExecutor;
}
CompletableFuture<List<byte[]>> execute(final UUID destinationUuid, final byte destinationDevice,
CompletionStage<List<byte[]>> execute(final UUID destinationUuid, final byte destinationDevice,
final List<UUID> messageGuids) {
final List<byte[]> keys = List.of(
@@ -38,7 +44,8 @@ class MessagesCacheRemoveByGuidScript {
.toList();
//noinspection unchecked
return removeByGuidScript.executeBinaryAsync(keys, args)
return CircuitBreakerUtil.getGeneralRedisRetry(MessagesCache.RETRY_NAME)
.executeCompletionStage(retryExecutor, () -> removeByGuidScript.executeBinaryAsync(keys, args))
.thenApply(result -> (List<byte[]>) result);
}

View File

@@ -11,6 +11,7 @@ import org.whispersystems.textsecuregcm.push.MessagesPersistedEvent;
import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
@@ -38,6 +39,7 @@ class MessagesCacheUnlockQueueScript {
RedisMessageAvailabilityManager.getClientEventChannel(accountIdentifier, deviceId) // eventChannelKey
);
unlockQueueScript.executeBinary(keys, MESSAGES_PERSISTED_EVENT_ARGS);
CircuitBreakerUtil.getGeneralRedisRetry(MessagesCache.RETRY_NAME)
.executeRunnable(() -> unlockQueueScript.executeBinary(keys, MESSAGES_PERSISTED_EVENT_ARGS));
}
}

View File

@@ -32,7 +32,6 @@ import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import org.whispersystems.textsecuregcm.identity.IdentityType;
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.push.RedisMessageAvailabilityManager;
import org.whispersystems.textsecuregcm.util.Pair;
import reactor.core.observability.micrometer.Micrometer;
@@ -52,9 +51,6 @@ public class MessagesManager {
private static final Counter PERSIST_MESSAGE_BYTES_COUNTER = Metrics.counter(
name(MessagesManager.class, "persistMessageBytes"));
private static final String MAY_HAVE_MESSAGES_COUNTER_NAME =
MetricsUtil.name(MessagesManager.class, "mayHaveMessages");
private final MessagesDynamoDb messagesDynamoDb;
private final MessagesCache messagesCache;
private final RedisMessageAvailabilityManager redisMessageAvailabilityManager;
@@ -182,28 +178,6 @@ public class MessagesManager {
return messagesDynamoDb.mayHaveMessages(destinationUuid, destinationDevice);
}
public CompletableFuture<Boolean> mayHaveMessages(final UUID destinationUuid, final Device destinationDevice) {
return messagesCache.hasMessagesAsync(destinationUuid, destinationDevice.getId())
.thenCombine(messagesDynamoDb.mayHaveMessages(destinationUuid, destinationDevice),
(mayHaveCachedMessages, mayHavePersistedMessages) -> {
final String outcome;
if (mayHaveCachedMessages && mayHavePersistedMessages) {
outcome = "both";
} else if (mayHaveCachedMessages) {
outcome = "cached";
} else if (mayHavePersistedMessages) {
outcome = "persisted";
} else {
outcome = "none";
}
Metrics.counter(MAY_HAVE_MESSAGES_COUNTER_NAME, "outcome", outcome).increment();
return mayHaveCachedMessages || mayHavePersistedMessages;
});
}
public CompletableFuture<Boolean> mayHaveUrgentPersistedMessages(final UUID destinationUuid, final Device destinationDevice) {
return messagesDynamoDb.mayHaveUrgentMessages(destinationUuid, destinationDevice);
}

View File

@@ -15,12 +15,14 @@ import java.io.IOException;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import javax.annotation.Nullable;
import io.micrometer.core.instrument.Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.Util;
import reactor.core.publisher.Mono;
@@ -35,16 +37,23 @@ public class ProfilesManager {
private final Profiles profiles;
private final FaultTolerantRedisClusterClient cacheCluster;
private final ScheduledExecutorService retryExecutor;
private final S3AsyncClient s3Client;
private final String bucket;
private final ObjectMapper mapper;
private static final String RETRY_NAME = ProfilesManager.class.getSimpleName();
private static final String DELETE_AVATAR_COUNTER_NAME = name(ProfilesManager.class, "deleteAvatar");
public ProfilesManager(final Profiles profiles, final FaultTolerantRedisClusterClient cacheCluster, final S3AsyncClient s3Client,
public ProfilesManager(final Profiles profiles,
final FaultTolerantRedisClusterClient cacheCluster,
final ScheduledExecutorService retryExecutor,
final S3AsyncClient s3Client,
final String bucket) {
this.profiles = profiles;
this.cacheCluster = cacheCluster;
this.retryExecutor = retryExecutor;
this.s3Client = s3Client;
this.bucket = bucket;
this.mapper = SystemMapper.jsonMapper();
@@ -181,7 +190,9 @@ public class ProfilesManager {
}
private CompletableFuture<Void> redisDelete(UUID uuid) {
return cacheCluster.withCluster(connection -> connection.async().del(getCacheKey(uuid)))
return CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME)
.executeCompletionStage(retryExecutor,
() -> cacheCluster.withCluster(connection -> connection.async().del(getCacheKey(uuid))))
.toCompletableFuture()
.thenRun(Util.NOOP);
}

View File

@@ -32,10 +32,9 @@ import java.util.UUID;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
/**
* Register Apple DeviceCheck App Attestations and verify the corresponding assertions.
@@ -65,6 +64,8 @@ public class AppleDeviceCheckManager {
private final String teamId;
private final String bundleId;
private static final String RETRY_NAME = AppleDeviceCheckManager.class.getSimpleName();
public AppleDeviceCheckManager(
AppleDeviceChecks appleDeviceChecks,
FaultTolerantRedisClusterClient redisClient,
@@ -115,7 +116,10 @@ public class AppleDeviceCheckManager {
}
final String redisChallengeKey = challengeKey(ChallengeType.ATTEST, account.getUuid());
final String challenge = redisClient.withCluster(cluster -> cluster.sync().get(redisChallengeKey));
@Nullable final String challenge = CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME)
.executeSupplier(() -> redisClient.withCluster(cluster -> cluster.sync().get(redisChallengeKey)));
if (challenge == null) {
throw new ChallengeNotFoundException();
}
@@ -172,7 +176,9 @@ public class AppleDeviceCheckManager {
throws ChallengeNotFoundException, DeviceCheckVerificationFailedException, DeviceCheckKeyIdNotFoundException, RequestReuseException {
final String redisChallengeKey = challengeKey(challengeType, account.getUuid());
final String storedChallenge = redisClient.withCluster(cluster -> cluster.sync().get(redisChallengeKey));
@Nullable final String storedChallenge = CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME)
.executeSupplier(() -> redisClient.withCluster(cluster -> cluster.sync().get(redisChallengeKey)));
if (storedChallenge == null) {
throw new ChallengeNotFoundException();
}
@@ -211,29 +217,29 @@ public class AppleDeviceCheckManager {
* @param account The account that will use the challenge
* @return The challenge to be included as part of an attestation or assertion
*/
public String createChallenge(final ChallengeType challengeType, final Account account)
throws RateLimitExceededException {
public String createChallenge(final ChallengeType challengeType, final Account account) {
final UUID accountIdentifier = account.getUuid();
final String challengeKey = challengeKey(challengeType, accountIdentifier);
return redisClient.withCluster(cluster -> {
final RedisAdvancedClusterCommands<String, String> commands = cluster.sync();
return CircuitBreakerUtil.getGeneralRedisRetry(RETRY_NAME)
.executeSupplier(() -> redisClient.withCluster(cluster -> {
final RedisAdvancedClusterCommands<String, String> commands = cluster.sync();
// Sets the new challenge if and only if there isn't already one stored for the challenge key; returns the existing
// challenge if present or null if no challenge was previously set.
final String proposedChallenge = generateChallenge();
@Nullable final String existingChallenge =
commands.setGet(challengeKey, proposedChallenge, SetArgs.Builder.nx().ex(CHALLENGE_TTL));
// Sets the new challenge if and only if there isn't already one stored for the challenge key; returns the existing
// challenge if present or null if no challenge was previously set.
final String proposedChallenge = generateChallenge();
@Nullable final String existingChallenge =
commands.setGet(challengeKey, proposedChallenge, SetArgs.Builder.nx().ex(CHALLENGE_TTL));
if (existingChallenge != null) {
// If the key was already set, make sure we extend the TTL. This is racy because the key could disappear or have
// been updated since the get returned, but it's fine. In the former case, this is a noop. In the latter
// case we may slightly extend the TTL from after it was set, but that's also no big deal.
commands.expire(challengeKey, CHALLENGE_TTL);
}
if (existingChallenge != null) {
// If the key was already set, make sure we extend the TTL. This is racy because the key could disappear or have
// been updated since the get returned, but it's fine. In the former case, this is a noop. In the latter
// case we may slightly extend the TTL from after it was set, but that's also no big deal.
commands.expire(challengeKey, CHALLENGE_TTL);
}
return existingChallenge != null ? existingChallenge : proposedChallenge;
});
return existingChallenge != null ? existingChallenge : proposedChallenge;
}));
}
private void removeChallenge(final String challengeKey) {

View File

@@ -11,10 +11,12 @@ import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryRegistry;
import io.lettuce.core.RedisCommandTimeoutException;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import org.apache.commons.lang3.RandomStringUtils;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
@@ -33,6 +35,14 @@ public class CircuitBreakerUtil {
private static final RetryRegistry RETRY_REGISTRY =
RetryRegistry.of(new RetryConfiguration().toRetryConfigBuilder().build());
// Include a random suffix to avoid accidental collisions
private static final String GENERAL_REDIS_CONFIGURATION_NAME =
"redis-general-" + RandomStringUtils.insecure().nextAlphanumeric(8);
static {
setGeneralRedisRetryConfiguration(new RetryConfiguration());
}
public static CircuitBreakerRegistry getCircuitBreakerRegistry() {
return CIRCUIT_BREAKER_REGISTRY;
}
@@ -41,6 +51,20 @@ public class CircuitBreakerUtil {
return RETRY_REGISTRY;
}
public static void setGeneralRedisRetryConfiguration(final RetryConfiguration retryConfiguration) {
RETRY_REGISTRY.addConfiguration(GENERAL_REDIS_CONFIGURATION_NAME, retryConfiguration.toRetryConfigBuilder()
.retryOnException(throwable -> throwable instanceof RedisCommandTimeoutException)
.build());
}
/// Returns a `Retry` instance with a default configuration suitable for general Redis operations.
///
/// @param name The name of this `Retry`. Calls to this method with the same name will return the same `Retry`
/// instance, and the name is used to identify metrics tied to the returned `Retry` instance.
public static Retry getGeneralRedisRetry(final String name) {
return RETRY_REGISTRY.retry(name, GENERAL_REDIS_CONFIGURATION_NAME);
}
/// @deprecated [CircuitBreakerRegistry] maintains its own set of metrics, and manually managing them is unnecessary
@Deprecated(forRemoval = true)
public static void registerMetrics(CircuitBreaker circuitBreaker, Class<?> clazz, Tags additionalTags) {

View File

@@ -161,14 +161,10 @@ record CommandDependencies(
ExecutorService disconnectionRequestListenerExecutor = environment.lifecycle()
.virtualExecutorService(name(WhisperServerService.class, "disconnectionRequest-%d"));
ScheduledExecutorService secureValueRecoveryServiceRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(WhisperServerService.class, "secureValueRecoveryServiceRetry-%d")).threads(1).build();
ScheduledExecutorService remoteStorageRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(WhisperServerService.class, "remoteStorageRetry-%d")).threads(1).build();
ScheduledExecutorService storageServiceRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(WhisperServerService.class, "storageServiceRetry-%d")).threads(1).build();
ScheduledExecutorService messagePollExecutor = environment.lifecycle()
final ScheduledExecutorService messagePollExecutor = environment.lifecycle()
.scheduledExecutorService(name(WhisperServerService.class, "messagePollExecutor-%d")).threads(1).build();
final ScheduledExecutorService retryExecutor = environment.lifecycle()
.scheduledExecutorService(name(WhisperServerService.class, "retry-%d")).threads(4).build();
ExternalServiceCredentialsGenerator storageCredentialsGenerator = SecureStorageController.credentialsGenerator(
configuration.getSecureStorageServiceConfiguration());
@@ -243,22 +239,23 @@ record CommandDependencies(
SecureValueRecoveryClient secureValueRecovery2Client = new SecureValueRecoveryClient(
secureValueRecovery2CredentialsGenerator,
secureValueRecoveryServiceExecutor,
secureValueRecoveryServiceRetryExecutor,
retryExecutor,
configuration.getSvr2Configuration(),
() -> dynamicConfigurationManager.getConfiguration().getSvr2StatusCodesToIgnoreForAccountDeletion());
SecureValueRecoveryClient secureValueRecoveryBClient = new SecureValueRecoveryClient(
secureValueRecoveryBCredentialsGenerator,
secureValueRecoveryServiceExecutor,
secureValueRecoveryServiceRetryExecutor,
retryExecutor,
configuration.getSvrbConfiguration(),
() -> dynamicConfigurationManager.getConfiguration().getSvrbStatusCodesToIgnoreForAccountDeletion());
SecureStorageClient secureStorageClient = new SecureStorageClient(storageCredentialsGenerator,
storageServiceExecutor, storageServiceRetryExecutor, configuration.getSecureStorageServiceConfiguration());
storageServiceExecutor, retryExecutor, configuration.getSecureStorageServiceConfiguration());
GrpcClientConnectionManager grpcClientConnectionManager = new GrpcClientConnectionManager();
DisconnectionRequestManager disconnectionRequestManager = new DisconnectionRequestManager(pubsubClient, grpcClientConnectionManager, disconnectionRequestListenerExecutor);
DisconnectionRequestManager disconnectionRequestManager = new DisconnectionRequestManager(pubsubClient,
grpcClientConnectionManager, disconnectionRequestListenerExecutor, retryExecutor);
MessagesCache messagesCache = new MessagesCache(messagesCluster,
messageDeliveryScheduler, messageDeletionExecutor, Clock.systemUTC(), experimentEnrollmentManager);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster, asyncCdnS3Client,
messageDeliveryScheduler, messageDeletionExecutor, retryExecutor, Clock.systemUTC(), experimentEnrollmentManager);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster, retryExecutor, asyncCdnS3Client,
configuration.getCdnConfiguration().bucket());
ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(dynamoDbClient, dynamoDbAsyncClient,
configuration.getDynamoDbTables().getReportMessage().getTableName(),
@@ -279,8 +276,9 @@ record CommandDependencies(
pubsubClient, accountLockManager, keys, messagesManager, profilesManager,
secureStorageClient, secureValueRecovery2Client, disconnectionRequestManager,
registrationRecoveryPasswordsManager, clientPublicKeysManager, accountLockExecutor, messagePollExecutor,
clock, configuration.getLinkDeviceSecretConfiguration().secret().value(), dynamicConfigurationManager);
RateLimiters rateLimiters = RateLimiters.create(dynamicConfigurationManager, rateLimitersCluster);
retryExecutor, clock, configuration.getLinkDeviceSecretConfiguration().secret().value(),
dynamicConfigurationManager);
RateLimiters rateLimiters = RateLimiters.create(dynamicConfigurationManager, rateLimitersCluster, retryExecutor);
final BackupsDb backupsDb =
new BackupsDb(dynamoDbAsyncClient, configuration.getDynamoDbTables().getBackups().getTableName(), clock);
final GenericServerSecretParams backupsGenericZkSecretParams;
@@ -298,7 +296,7 @@ record CommandDependencies(
new Cdn3BackupCredentialGenerator(configuration.getTus()),
new Cdn3RemoteStorageManager(
remoteStorageHttpExecutor,
remoteStorageRetryExecutor,
retryExecutor,
configuration.getCdn3StorageManagerConfiguration()),
secureValueRecoveryBCredentialsGenerator,
secureValueRecoveryBClient,
@@ -314,7 +312,7 @@ record CommandDependencies(
APNSender apnSender = new APNSender(apnSenderExecutor, configuration.getApnConfiguration());
FcmSender fcmSender = new FcmSender(fcmSenderExecutor, configuration.getFcmConfiguration().credentials().value());
PushNotificationScheduler pushNotificationScheduler = new PushNotificationScheduler(pushSchedulerCluster,
apnSender, fcmSender, accountsManager, 0, 0);
apnSender, fcmSender, accountsManager, 0, 0, retryExecutor);
PushNotificationManager pushNotificationManager = new PushNotificationManager(accountsManager,
apnSender, fcmSender, pushNotificationScheduler);
PushNotificationExperimentSamples pushNotificationExperimentSamples =

View File

@@ -5,11 +5,14 @@
package org.whispersystems.textsecuregcm.workers;
import static com.codahale.metrics.MetricRegistry.name;
import io.dropwizard.core.Application;
import io.dropwizard.core.cli.ServerCommand;
import io.dropwizard.core.server.DefaultServerFactory;
import io.dropwizard.core.setup.Environment;
import io.dropwizard.jetty.HttpsConnectorFactory;
import java.util.concurrent.ScheduledExecutorService;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
@@ -21,6 +24,7 @@ public class ScheduledApnPushNotificationSenderServiceCommand extends ServerComm
private static final String WORKER_COUNT = "workers";
private static final String MAX_CONCURRENCY = "max_concurrency";
private static final String RETRY_THREADS = "retry_threads";
public ScheduledApnPushNotificationSenderServiceCommand() {
super(new Application<>() {
@@ -48,6 +52,13 @@ public class ScheduledApnPushNotificationSenderServiceCommand extends ServerComm
.required(false)
.setDefault(16)
.help("The number of concurrent operations per worker thread");
subparser.addArgument("--retry-threads")
.type(Integer.class)
.dest(RETRY_THREADS)
.required(false)
.setDefault(4)
.help("The number of threads to use in the retry executor's pool");
}
@Override
@@ -68,8 +79,20 @@ public class ScheduledApnPushNotificationSenderServiceCommand extends ServerComm
});
}
final ScheduledExecutorService retryExecutor = environment.lifecycle()
.scheduledExecutorService(name(ScheduledApnPushNotificationSenderServiceCommand.class, "retry-%d"))
.threads(namespace.getInt(RETRY_THREADS))
.build();
final PushNotificationScheduler pushNotificationScheduler = new PushNotificationScheduler(
deps.pushSchedulerCluster(), deps.apnSender(), deps.fcmSender(), deps.accountsManager(), namespace.getInt(WORKER_COUNT), namespace.getInt(MAX_CONCURRENCY));
deps.pushSchedulerCluster(),
deps.apnSender(),
deps.fcmSender(),
deps.accountsManager(),
namespace.getInt(WORKER_COUNT),
namespace.getInt(MAX_CONCURRENCY),
retryExecutor
);
environment.lifecycle().manage(pushNotificationScheduler);