Introduce FaultTolerantRedisClient

This commit is contained in:
Jon Chambers
2024-10-09 09:22:10 -04:00
committed by GitHub
parent 9d980f36b0
commit a9117010f9
61 changed files with 744 additions and 462 deletions

View File

@@ -23,7 +23,6 @@ import org.whispersystems.textsecuregcm.configuration.BadgesConfiguration;
import org.whispersystems.textsecuregcm.configuration.BraintreeConfiguration;
import org.whispersystems.textsecuregcm.configuration.Cdn3StorageManagerConfiguration;
import org.whispersystems.textsecuregcm.configuration.CdnConfiguration;
import org.whispersystems.textsecuregcm.configuration.ClientCdnConfiguration;
import org.whispersystems.textsecuregcm.configuration.ClientReleaseConfiguration;
import org.whispersystems.textsecuregcm.configuration.DatadogConfiguration;
import org.whispersystems.textsecuregcm.configuration.DefaultAwsCredentialsFactory;
@@ -34,6 +33,7 @@ import org.whispersystems.textsecuregcm.configuration.DynamoDbClientFactory;
import org.whispersystems.textsecuregcm.configuration.DynamoDbTables;
import org.whispersystems.textsecuregcm.configuration.ExternalRequestFilterConfiguration;
import org.whispersystems.textsecuregcm.configuration.FaultTolerantRedisClusterFactory;
import org.whispersystems.textsecuregcm.configuration.FaultTolerantRedisClientFactory;
import org.whispersystems.textsecuregcm.configuration.FcmConfiguration;
import org.whispersystems.textsecuregcm.configuration.GcpAttachmentsConfiguration;
import org.whispersystems.textsecuregcm.configuration.GenericZkConfig;
@@ -47,7 +47,6 @@ import org.whispersystems.textsecuregcm.configuration.MessageCacheConfiguration;
import org.whispersystems.textsecuregcm.configuration.NoiseWebSocketTunnelConfiguration;
import org.whispersystems.textsecuregcm.configuration.OneTimeDonationConfiguration;
import org.whispersystems.textsecuregcm.configuration.PaymentsServiceConfiguration;
import org.whispersystems.textsecuregcm.configuration.ProvisioningConfiguration;
import org.whispersystems.textsecuregcm.configuration.RegistrationServiceClientFactory;
import org.whispersystems.textsecuregcm.configuration.RemoteConfigConfiguration;
import org.whispersystems.textsecuregcm.configuration.ReportMessageConfiguration;
@@ -143,7 +142,7 @@ public class WhisperServerConfiguration extends Configuration {
@NotNull
@Valid
@JsonProperty
private ProvisioningConfiguration provisioning;
private FaultTolerantRedisClientFactory pubsub;
@NotNull
@Valid
@@ -410,8 +409,8 @@ public class WhisperServerConfiguration extends Configuration {
return cacheCluster;
}
public ProvisioningConfiguration getProvisioningConfiguration() {
return provisioning;
public FaultTolerantRedisClientFactory getRedisPubSubConfiguration() {
return pubsub;
}
public SecureValueRecovery2Configuration getSvr2Configuration() {

View File

@@ -200,7 +200,8 @@ import org.whispersystems.textsecuregcm.push.PushNotificationManager;
import org.whispersystems.textsecuregcm.push.PushNotificationScheduler;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
import org.whispersystems.textsecuregcm.redis.ConnectionEventLogger;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClient;
import org.whispersystems.textsecuregcm.registration.RegistrationServiceClient;
import org.whispersystems.textsecuregcm.s3.PolicySigner;
import org.whispersystems.textsecuregcm.s3.PostPolicyGenerator;
@@ -447,18 +448,21 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
.build();
ConnectionEventLogger.logConnectionEvents(sharedClientResources);
FaultTolerantRedisCluster cacheCluster = config.getCacheClusterConfiguration()
FaultTolerantRedisClusterClient cacheCluster = config.getCacheClusterConfiguration()
.build("main_cache", sharedClientResources.mutate());
FaultTolerantRedisCluster messagesCluster =
FaultTolerantRedisClusterClient messagesCluster =
config.getMessageCacheConfiguration().getRedisClusterConfiguration()
.build("messages", sharedClientResources.mutate());
FaultTolerantRedisCluster clientPresenceCluster = config.getClientPresenceClusterConfiguration()
FaultTolerantRedisClusterClient clientPresenceCluster = config.getClientPresenceClusterConfiguration()
.build("client_presence", sharedClientResources.mutate());
FaultTolerantRedisCluster pushSchedulerCluster = config.getPushSchedulerCluster().build("push_scheduler",
FaultTolerantRedisClusterClient pushSchedulerCluster = config.getPushSchedulerCluster().build("push_scheduler",
sharedClientResources.mutate());
FaultTolerantRedisCluster rateLimitersCluster = config.getRateLimitersCluster().build("rate_limiters",
FaultTolerantRedisClusterClient rateLimitersCluster = config.getRateLimitersCluster().build("rate_limiters",
sharedClientResources.mutate());
FaultTolerantRedisClient pubsubClient =
config.getRedisPubSubConfiguration().build("pubsub", sharedClientResources);
final BlockingQueue<Runnable> keyspaceNotificationDispatchQueue = new ArrayBlockingQueue<>(100_000);
Metrics.gaugeCollectionSize(name(getClass(), "keyspaceNotificationDispatchQueueSize"), Collections.emptyList(),
keyspaceNotificationDispatchQueue);
@@ -652,9 +656,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
new PushNotificationManager(accountsManager, apnSender, fcmSender, pushNotificationScheduler);
RateLimiters rateLimiters = RateLimiters.createAndValidate(config.getLimitsConfiguration(),
dynamicConfigurationManager, rateLimitersCluster);
ProvisioningManager provisioningManager = new ProvisioningManager(
config.getProvisioningConfiguration().pubsub().build(sharedClientResources),
config.getProvisioningConfiguration().circuitBreaker());
ProvisioningManager provisioningManager = new ProvisioningManager(pubsubClient);
IssuedReceiptsManager issuedReceiptsManager = new IssuedReceiptsManager(
config.getDynamoDbTables().getIssuedReceipts().getTableName(),
config.getDynamoDbTables().getIssuedReceipts().getExpiration(),

View File

@@ -7,11 +7,11 @@ package org.whispersystems.textsecuregcm.configuration;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.dropwizard.jackson.Discoverable;
import io.lettuce.core.RedisClient;
import io.lettuce.core.resource.ClientResources;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClient;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = RedisConfiguration.class)
public interface SingletonRedisClientFactory extends Discoverable {
public interface FaultTolerantRedisClientFactory extends Discoverable {
RedisClient build(ClientResources clientResources);
FaultTolerantRedisClient build(String name, ClientResources clientResources);
}

View File

@@ -8,10 +8,10 @@ package org.whispersystems.textsecuregcm.configuration;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.dropwizard.jackson.Discoverable;
import io.lettuce.core.resource.ClientResources;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = RedisClusterConfiguration.class)
public interface FaultTolerantRedisClusterFactory extends Discoverable {
FaultTolerantRedisCluster build(String name, ClientResources.Builder clientResourcesBuilder);
FaultTolerantRedisClusterClient build(String name, ClientResources.Builder clientResourcesBuilder);
}

View File

@@ -1,20 +0,0 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.configuration;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
public record ProvisioningConfiguration(@Valid @NotNull SingletonRedisClientFactory pubsub,
@Valid @NotNull CircuitBreakerConfiguration circuitBreaker) {
public ProvisioningConfiguration {
if (circuitBreaker == null) {
circuitBreaker = new CircuitBreakerConfiguration();
}
}
}

View File

@@ -13,7 +13,7 @@ import java.time.Duration;
import javax.validation.Valid;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
@JsonTypeName("default")
public class RedisClusterConfiguration implements FaultTolerantRedisClusterFactory {
@@ -58,7 +58,7 @@ public class RedisClusterConfiguration implements FaultTolerantRedisClusterFacto
}
@Override
public FaultTolerantRedisCluster build(final String name, final ClientResources.Builder clientResourcesBuilder) {
return new FaultTolerantRedisCluster(name, this, clientResourcesBuilder);
public FaultTolerantRedisClusterClient build(final String name, final ClientResources.Builder clientResourcesBuilder) {
return new FaultTolerantRedisClusterClient(name, this, clientResourcesBuilder);
}
}

View File

@@ -7,15 +7,16 @@ package org.whispersystems.textsecuregcm.configuration;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.lettuce.core.RedisClient;
import com.google.common.annotations.VisibleForTesting;
import io.lettuce.core.resource.ClientResources;
import java.time.Duration;
import javax.validation.Valid;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import org.whispersystems.textsecuregcm.redis.RedisUriUtil;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClient;
@JsonTypeName("default")
public class RedisConfiguration implements SingletonRedisClientFactory {
public class RedisConfiguration implements FaultTolerantRedisClientFactory {
@JsonProperty
@NotEmpty
@@ -25,20 +26,39 @@ public class RedisConfiguration implements SingletonRedisClientFactory {
@NotNull
private Duration timeout = Duration.ofSeconds(1);
@JsonProperty
@NotNull
@Valid
private CircuitBreakerConfiguration circuitBreaker = new CircuitBreakerConfiguration();
@JsonProperty
@NotNull
@Valid
private RetryConfiguration retry = new RetryConfiguration();
public String getUri() {
return uri;
}
@VisibleForTesting
public void setUri(String uri) {
this.uri = uri;
}
public Duration getTimeout() {
return timeout;
}
@Override
public RedisClient build(final ClientResources clientResources) {
final RedisClient redisClient = RedisClient.create(clientResources,
RedisUriUtil.createRedisUriWithTimeout(uri, timeout));
redisClient.setDefaultTimeout(timeout);
public @NotNull @Valid CircuitBreakerConfiguration getCircuitBreakerConfiguration() {
return circuitBreaker;
}
return redisClient;
public @NotNull @Valid RetryConfiguration getRetryConfiguration() {
return retry;
}
@Override
public FaultTolerantRedisClient build(final String name, final ClientResources clientResources) {
return new FaultTolerantRedisClient(name, this, clientResources.mutate());
}
}

View File

@@ -27,7 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.CurrencyConversionEntity;
import org.whispersystems.textsecuregcm.entities.CurrencyConversionEntityList;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
public class CurrencyConversionManager implements Managed {
@@ -47,7 +47,7 @@ public class CurrencyConversionManager implements Managed {
private final CoinMarketCapClient coinMarketCapClient;
private final FaultTolerantRedisCluster cacheCluster;
private final FaultTolerantRedisClusterClient cacheCluster;
private final Clock clock;
@@ -67,7 +67,7 @@ public class CurrencyConversionManager implements Managed {
public CurrencyConversionManager(
final FixerClient fixerClient,
final CoinMarketCapClient coinMarketCapClient,
final FaultTolerantRedisCluster cacheCluster,
final FaultTolerantRedisClusterClient cacheCluster,
final List<String> currencies,
final ScheduledExecutorService executor,
final Clock clock) {

View File

@@ -22,7 +22,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
public abstract class BaseRateLimiters<T extends RateLimiterDescriptor> {
@@ -39,7 +39,7 @@ public abstract class BaseRateLimiters<T extends RateLimiterDescriptor> {
final Map<String, RateLimiterConfig> configs,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
final ClusterLuaScript validateScript,
final FaultTolerantRedisCluster cacheCluster,
final FaultTolerantRedisClusterClient cacheCluster,
final Clock clock) {
this.configs = configs;
this.rateLimiterByDescriptor = Arrays.stream(values)
@@ -69,7 +69,7 @@ public abstract class BaseRateLimiters<T extends RateLimiterDescriptor> {
}
}
protected static ClusterLuaScript defaultScript(final FaultTolerantRedisCluster cacheCluster) {
protected static ClusterLuaScript defaultScript(final FaultTolerantRedisClusterClient cacheCluster) {
try {
return ClusterLuaScript.fromResource(
cacheCluster, "lua/validate_rate_limit.lua", ScriptOutputType.INTEGER);
@@ -83,7 +83,7 @@ public abstract class BaseRateLimiters<T extends RateLimiterDescriptor> {
final Map<String, RateLimiterConfig> configs,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
final ClusterLuaScript validateScript,
final FaultTolerantRedisCluster cacheCluster,
final FaultTolerantRedisClusterClient cacheCluster,
final Clock clock) {
if (descriptor.isDynamic()) {
final Supplier<RateLimiterConfig> configResolver = () -> {

View File

@@ -12,7 +12,7 @@ import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.util.Util;
/**
@@ -21,11 +21,11 @@ import org.whispersystems.textsecuregcm.util.Util;
public class CardinalityEstimator {
private volatile double uniqueElementCount;
private final FaultTolerantRedisCluster redisCluster;
private final FaultTolerantRedisClusterClient redisCluster;
private final String hllName;
private final Duration period;
public CardinalityEstimator(final FaultTolerantRedisCluster redisCluster, final String name, final Duration period) {
public CardinalityEstimator(final FaultTolerantRedisClusterClient redisCluster, final String name, final Duration period) {
this.redisCluster = redisCluster;
this.hllName = "cardinality_estimator::" + name;
this.period = period;

View File

@@ -15,7 +15,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
public class DynamicRateLimiter implements RateLimiter {
@@ -26,7 +26,7 @@ public class DynamicRateLimiter implements RateLimiter {
private final ClusterLuaScript validateScript;
private final FaultTolerantRedisCluster cluster;
private final FaultTolerantRedisClusterClient cluster;
private final Clock clock;
@@ -38,7 +38,7 @@ public class DynamicRateLimiter implements RateLimiter {
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
final Supplier<RateLimiterConfig> configResolver,
final ClusterLuaScript validateScript,
final FaultTolerantRedisCluster cluster,
final FaultTolerantRedisClusterClient cluster,
final Clock clock) {
this.name = requireNonNull(name);
this.dynamicConfigurationManager = dynamicConfigurationManager;

View File

@@ -11,7 +11,7 @@ import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
public class MessageDeliveryLoopMonitor {
@@ -22,7 +22,7 @@ public class MessageDeliveryLoopMonitor {
private static final Logger logger = LoggerFactory.getLogger(MessageDeliveryLoopMonitor.class);
public MessageDeliveryLoopMonitor(final FaultTolerantRedisCluster rateLimitCluster) {
public MessageDeliveryLoopMonitor(final FaultTolerantRedisClusterClient rateLimitCluster) {
try {
getDeliveryAttemptsScript =
ClusterLuaScript.fromResource(rateLimitCluster, "lua/get_delivery_attempt_count.lua", ScriptOutputType.INTEGER);

View File

@@ -11,7 +11,7 @@ import java.time.Duration;
import java.util.Map;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
public class RateLimiters extends BaseRateLimiters<RateLimiters.For> {
@@ -81,7 +81,7 @@ public class RateLimiters extends BaseRateLimiters<RateLimiters.For> {
public static RateLimiters createAndValidate(
final Map<String, RateLimiterConfig> configs,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
final FaultTolerantRedisCluster cacheCluster) {
final FaultTolerantRedisClusterClient cacheCluster) {
final RateLimiters rateLimiters = new RateLimiters(
configs, dynamicConfigurationManager, defaultScript(cacheCluster), cacheCluster, Clock.systemUTC());
rateLimiters.validateValuesAndConfigs();
@@ -93,7 +93,7 @@ public class RateLimiters extends BaseRateLimiters<RateLimiters.For> {
final Map<String, RateLimiterConfig> configs,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
final ClusterLuaScript validateScript,
final FaultTolerantRedisCluster cacheCluster,
final FaultTolerantRedisClusterClient cacheCluster,
final Clock clock) {
super(For.values(), configs, dynamicConfigurationManager, validateScript, cacheCluster, clock);
}

View File

@@ -20,7 +20,7 @@ import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfigurati
import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import org.whispersystems.textsecuregcm.util.Util;
@@ -36,7 +36,7 @@ public class StaticRateLimiter implements RateLimiter {
private final ClusterLuaScript validateScript;
private final FaultTolerantRedisCluster cacheCluster;
private final FaultTolerantRedisClusterClient cacheCluster;
private final Clock clock;
@@ -45,7 +45,7 @@ public class StaticRateLimiter implements RateLimiter {
final String name,
final RateLimiterConfig config,
final ClusterLuaScript validateScript,
final FaultTolerantRedisCluster cacheCluster,
final FaultTolerantRedisClusterClient cacheCluster,
final Clock clock,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) {
this.name = requireNonNull(name);

View File

@@ -6,13 +6,13 @@
package org.whispersystems.textsecuregcm.providers;
import com.codahale.metrics.health.HealthCheck;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
public class RedisClusterHealthCheck extends HealthCheck {
private final FaultTolerantRedisCluster redisCluster;
private final FaultTolerantRedisClusterClient redisCluster;
public RedisClusterHealthCheck(final FaultTolerantRedisCluster redisCluster) {
public RedisClusterHealthCheck(final FaultTolerantRedisClusterClient redisCluster) {
this.redisCluster = redisCluster;
}

View File

@@ -35,8 +35,8 @@ import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubClusterConnection;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.storage.Device;
/**
@@ -52,8 +52,8 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
private final String managerId = UUID.randomUUID().toString();
private final String connectedClientSetKey = getConnectedClientSetKey(managerId);
private final FaultTolerantRedisCluster presenceCluster;
private final FaultTolerantPubSubConnection<String, String> pubSubConnection;
private final FaultTolerantRedisClusterClient presenceCluster;
private final FaultTolerantPubSubClusterConnection<String, String> pubSubConnection;
private final ClusterLuaScript clearPresenceScript;
private final ClusterLuaScript renewPresenceScript;
@@ -80,7 +80,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
private static final Logger log = LoggerFactory.getLogger(ClientPresenceManager.class);
public ClientPresenceManager(final FaultTolerantRedisCluster presenceCluster,
public ClientPresenceManager(final FaultTolerantRedisClusterClient presenceCluster,
final ScheduledExecutorService scheduledExecutorService,
final ExecutorService keyspaceNotificationExecutorService) throws IOException {
this.presenceCluster = presenceCluster;
@@ -106,7 +106,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter<String, Str
}
@VisibleForTesting
FaultTolerantPubSubConnection<String, String> getPubSubConnection() {
FaultTolerantPubSubClusterConnection<String, String> getPubSubConnection() {
return pubSubConnection;
}

View File

@@ -10,12 +10,7 @@ import static com.codahale.metrics.MetricRegistry.name;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import io.dropwizard.lifecycle.Managed;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tags;
import java.nio.charset.StandardCharsets;
@@ -24,18 +19,15 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClient;
import org.whispersystems.textsecuregcm.redis.RedisOperation;
import org.whispersystems.textsecuregcm.storage.PubSubProtos;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
public class ProvisioningManager extends RedisPubSubAdapter<byte[], byte[]> implements Managed {
private final RedisClient redisClient;
private final StatefulRedisPubSubConnection<byte[], byte[]> subscriptionConnection;
private final StatefulRedisConnection<byte[], byte[]> publicationConnection;
private final CircuitBreaker circuitBreaker;
private final FaultTolerantRedisClient pubSubClient;
private final FaultTolerantPubSubConnection<byte[], byte[]> pubSubConnection;
private final Map<String, Consumer<PubSubProtos.PubSubMessage>> listenersByProvisioningAddress =
new ConcurrentHashMap<>();
@@ -50,46 +42,31 @@ public class ProvisioningManager extends RedisPubSubAdapter<byte[], byte[]> impl
private static final Logger logger = LoggerFactory.getLogger(ProvisioningManager.class);
public ProvisioningManager(final RedisClient redisClient,
final CircuitBreakerConfiguration circuitBreakerConfiguration) {
this.redisClient = redisClient;
this.subscriptionConnection = redisClient.connectPubSub(new ByteArrayCodec());
this.publicationConnection = redisClient.connect(new ByteArrayCodec());
this.circuitBreaker = CircuitBreaker.of("pubsub-breaker", circuitBreakerConfiguration.toCircuitBreakerConfig());
CircuitBreakerUtil.registerMetrics(circuitBreaker, ProvisioningManager.class, Tags.empty());
public ProvisioningManager(final FaultTolerantRedisClient pubSubClient) {
this.pubSubClient = pubSubClient;
this.pubSubConnection = pubSubClient.createBinaryPubSubConnection();
Metrics.gaugeMapSize(ACTIVE_LISTENERS_GAUGE_NAME, Tags.empty(), listenersByProvisioningAddress);
}
@Override
public void start() throws Exception {
subscriptionConnection.addListener(this);
pubSubConnection.usePubSubConnection(connection -> connection.addListener(this));
}
@Override
public void stop() throws Exception {
subscriptionConnection.removeListener(this);
subscriptionConnection.close();
publicationConnection.close();
redisClient.shutdown();
pubSubConnection.usePubSubConnection(connection -> connection.removeListener(this));
}
public void addListener(final String address, final Consumer<PubSubProtos.PubSubMessage> listener) {
listenersByProvisioningAddress.put(address, listener);
circuitBreaker.executeRunnable(
() -> subscriptionConnection.sync().subscribe(address.getBytes(StandardCharsets.UTF_8)));
pubSubConnection.usePubSubConnection(connection -> connection.sync().subscribe(address.getBytes(StandardCharsets.UTF_8)));
}
public void removeListener(final String address) {
RedisOperation.unchecked(() -> circuitBreaker.executeRunnable(
() -> subscriptionConnection.sync().unsubscribe(address.getBytes(StandardCharsets.UTF_8))));
RedisOperation.unchecked(() ->
pubSubConnection.usePubSubConnection(connection -> connection.sync().unsubscribe(address.getBytes(StandardCharsets.UTF_8))));
listenersByProvisioningAddress.remove(address);
}
@@ -100,9 +77,8 @@ public class ProvisioningManager extends RedisPubSubAdapter<byte[], byte[]> impl
.setContent(ByteString.copyFrom(body))
.build();
final boolean receiverPresent = circuitBreaker.executeSupplier(
() -> publicationConnection.sync()
.publish(address.getBytes(StandardCharsets.UTF_8), pubSubMessage.toByteArray()) > 0);
final boolean receiverPresent = pubSubClient.withBinaryConnection(connection ->
connection.sync().publish(address.getBytes(StandardCharsets.UTF_8), pubSubMessage.toByteArray()) > 0);
Metrics.counter(SEND_PROVISIONING_MESSAGE_COUNTER_NAME, "online", String.valueOf(receiverPresent)).increment();

View File

@@ -31,7 +31,7 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
@@ -62,7 +62,7 @@ public class PushNotificationScheduler implements Managed {
private final APNSender apnSender;
private final FcmSender fcmSender;
private final AccountsManager accountsManager;
private final FaultTolerantRedisCluster pushSchedulingCluster;
private final FaultTolerantRedisClusterClient pushSchedulingCluster;
private final Clock clock;
private final ClusterLuaScript scheduleBackgroundApnsNotificationScript;
@@ -141,7 +141,7 @@ public class PushNotificationScheduler implements Managed {
}
}
public PushNotificationScheduler(final FaultTolerantRedisCluster pushSchedulingCluster,
public PushNotificationScheduler(final FaultTolerantRedisClusterClient pushSchedulingCluster,
final APNSender apnSender,
final FcmSender fcmSender,
final AccountsManager accountsManager,
@@ -158,7 +158,7 @@ public class PushNotificationScheduler implements Managed {
}
@VisibleForTesting
PushNotificationScheduler(final FaultTolerantRedisCluster pushSchedulingCluster,
PushNotificationScheduler(final FaultTolerantRedisClusterClient pushSchedulingCluster,
final APNSender apnSender,
final FcmSender fcmSender,
final AccountsManager accountsManager,

View File

@@ -0,0 +1,65 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.redis;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import io.github.resilience4j.retry.Retry;
import io.lettuce.core.RedisException;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import java.util.function.Consumer;
import java.util.function.Function;
abstract class AbstractFaultTolerantPubSubConnection<K, V, C extends StatefulRedisPubSubConnection<K, V>> {
private final String name;
private final C pubSubConnection;
private final Retry retry;
private final Timer executeTimer;
protected AbstractFaultTolerantPubSubConnection(final String name,
final C pubSubConnection,
final Retry retry) {
this.name = name;
this.pubSubConnection = pubSubConnection;
this.retry = retry;
this.executeTimer = Metrics.timer(name(getClass(), "execute"), "clusterName", name + "-pubsub");
}
protected String getName() {
return name;
}
public void usePubSubConnection(final Consumer<C> consumer) {
try {
retry.executeRunnable(() -> executeTimer.record(() -> consumer.accept(pubSubConnection)));
} catch (final Throwable t) {
if (t instanceof RedisException) {
throw (RedisException) t;
} else {
throw new RedisException(t);
}
}
}
public <T> T withPubSubConnection(final Function<C, T> function) {
try {
return retry.executeCallable(() -> executeTimer.record(() -> function.apply(pubSubConnection)));
} catch (final Throwable t) {
if (t instanceof RedisException) {
throw (RedisException) t;
} else {
throw new RedisException(t);
}
}
}
}

View File

@@ -25,7 +25,7 @@ import reactor.core.publisher.Mono;
public class ClusterLuaScript {
private final FaultTolerantRedisCluster redisCluster;
private final FaultTolerantRedisClusterClient redisCluster;
private final ScriptOutputType scriptOutputType;
private final String script;
private final String sha;
@@ -35,7 +35,7 @@ public class ClusterLuaScript {
private static final Logger log = LoggerFactory.getLogger(ClusterLuaScript.class);
public static ClusterLuaScript fromResource(final FaultTolerantRedisCluster redisCluster,
public static ClusterLuaScript fromResource(final FaultTolerantRedisClusterClient redisCluster,
final String resource,
final ScriptOutputType scriptOutputType) throws IOException {
@@ -51,7 +51,7 @@ public class ClusterLuaScript {
}
@VisibleForTesting
ClusterLuaScript(final FaultTolerantRedisCluster redisCluster,
ClusterLuaScript(final FaultTolerantRedisClusterClient redisCluster,
final String script,
final ScriptOutputType scriptOutputType) {

View File

@@ -0,0 +1,54 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.redis;
import io.github.resilience4j.retry.Retry;
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.scheduler.Scheduler;
public class FaultTolerantPubSubClusterConnection<K, V> extends AbstractFaultTolerantPubSubConnection<K, V, StatefulRedisClusterPubSubConnection<K, V>> {
private final Logger logger = LoggerFactory.getLogger(FaultTolerantPubSubClusterConnection.class);
private final Retry resubscribeRetry;
private final Scheduler topologyChangedEventScheduler;
protected FaultTolerantPubSubClusterConnection(final String name,
final StatefulRedisClusterPubSubConnection<K, V> pubSubConnection,
final Retry retry,
final Retry resubscribeRetry,
final Scheduler topologyChangedEventScheduler) {
super(name, pubSubConnection, retry);
pubSubConnection.setNodeMessagePropagation(true);
this.resubscribeRetry = resubscribeRetry;
this.topologyChangedEventScheduler = topologyChangedEventScheduler;
}
public void subscribeToClusterTopologyChangedEvents(final Runnable eventHandler) {
usePubSubConnection(connection -> connection.getResources().eventBus().get()
.filter(event -> event instanceof ClusterTopologyChangedEvent)
.subscribeOn(topologyChangedEventScheduler)
.subscribe(event -> {
logger.info("Got topology change event for {}, resubscribing all keyspace notifications", getName());
resubscribeRetry.executeRunnable(() -> {
try {
eventHandler.run();
} catch (final RuntimeException e) {
logger.warn("Resubscribe for {} failed", getName(), e);
throw e;
}
});
}));
}
}

View File

@@ -5,90 +5,15 @@
package org.whispersystems.textsecuregcm.redis;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import io.github.resilience4j.retry.Retry;
import io.lettuce.core.RedisException;
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.scheduler.Scheduler;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
public class FaultTolerantPubSubConnection<K, V> {
public class FaultTolerantPubSubConnection<K, V> extends AbstractFaultTolerantPubSubConnection<K, V, StatefulRedisPubSubConnection<K, V>> {
private static final Logger logger = LoggerFactory.getLogger(FaultTolerantPubSubConnection.class);
protected FaultTolerantPubSubConnection(final String name,
final StatefulRedisPubSubConnection<K, V> pubSubConnection,
final Retry retry) {
private final String name;
private final StatefulRedisClusterPubSubConnection<K, V> pubSubConnection;
private final Retry retry;
private final Retry resubscribeRetry;
private final Scheduler topologyChangedEventScheduler;
private final Timer executeTimer;
public FaultTolerantPubSubConnection(final String name,
final StatefulRedisClusterPubSubConnection<K, V> pubSubConnection,
final Retry retry, final Retry resubscribeRetry, final Scheduler topologyChangedEventScheduler) {
this.name = name;
this.pubSubConnection = pubSubConnection;
this.retry = retry;
this.resubscribeRetry = resubscribeRetry;
this.topologyChangedEventScheduler = topologyChangedEventScheduler;
this.pubSubConnection.setNodeMessagePropagation(true);
this.executeTimer = Metrics.timer(name(getClass(), "execute"), "clusterName", name + "-pubsub");
super(name, pubSubConnection, retry);
}
public void usePubSubConnection(final Consumer<StatefulRedisClusterPubSubConnection<K, V>> consumer) {
try {
retry.executeRunnable(() -> executeTimer.record(() -> consumer.accept(pubSubConnection)));
} catch (final Throwable t) {
if (t instanceof RedisException) {
throw (RedisException) t;
} else {
throw new RedisException(t);
}
}
}
public <T> T withPubSubConnection(final Function<StatefulRedisClusterPubSubConnection<K, V>, T> function) {
try {
return retry.executeCallable(() -> executeTimer.record(() -> function.apply(pubSubConnection)));
} catch (final Throwable t) {
if (t instanceof RedisException) {
throw (RedisException) t;
} else {
throw new RedisException(t);
}
}
}
public void subscribeToClusterTopologyChangedEvents(final Runnable eventHandler) {
usePubSubConnection(connection -> connection.getResources().eventBus().get()
.filter(event -> event instanceof ClusterTopologyChangedEvent)
.subscribeOn(topologyChangedEventScheduler)
.subscribe(event -> {
logger.info("Got topology change event for {}, resubscribing all keyspace notifications", name);
resubscribeRetry.executeRunnable(() -> {
try {
eventHandler.run();
} catch (final RuntimeException e) {
logger.warn("Resubscribe for {} failed", name, e);
throw e;
}
});
}));
}
}

View File

@@ -0,0 +1,159 @@
package org.whispersystems.textsecuregcm.redis;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.retry.Retry;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.RedisException;
import io.lettuce.core.RedisURI;
import io.lettuce.core.TimeoutOptions;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.resource.ClientResources;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.RedisConfiguration;
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
public class FaultTolerantRedisClient {
private final String name;
private final RedisClient redisClient;
private final StatefulRedisConnection<String, String> stringConnection;
private final StatefulRedisConnection<byte[], byte[]> binaryConnection;
private final List<StatefulRedisPubSubConnection<?, ?>> pubSubConnections = new ArrayList<>();
private final CircuitBreaker circuitBreaker;
private final Retry retry;
public FaultTolerantRedisClient(final String name,
final RedisConfiguration redisConfiguration,
final ClientResources.Builder clientResourcesBuilder) {
this(name, clientResourcesBuilder,
RedisUriUtil.createRedisUriWithTimeout(redisConfiguration.getUri(), redisConfiguration.getTimeout()),
redisConfiguration.getTimeout(),
redisConfiguration.getCircuitBreakerConfiguration(),
redisConfiguration.getRetryConfiguration());
}
FaultTolerantRedisClient(String name,
final ClientResources.Builder clientResourcesBuilder,
final RedisURI redisUri,
final Duration commandTimeout,
final CircuitBreakerConfiguration circuitBreakerConfiguration,
final RetryConfiguration retryConfiguration) {
this.name = name;
final LettuceShardCircuitBreaker lettuceShardCircuitBreaker = new LettuceShardCircuitBreaker(name,
circuitBreakerConfiguration.toCircuitBreakerConfig(), Schedulers.newSingle("topology-changed-" + name, true));
this.redisClient = RedisClient.create(clientResourcesBuilder.build(), redisUri);
this.redisClient.setOptions(ClusterClientOptions.builder()
.disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS)
.validateClusterNodeMembership(false)
.topologyRefreshOptions(ClusterTopologyRefreshOptions.builder()
.enableAllAdaptiveRefreshTriggers()
.build())
// for asynchronous commands
.timeoutOptions(TimeoutOptions.builder()
.fixedTimeout(commandTimeout)
.build())
.publishOnScheduler(true)
.build());
lettuceShardCircuitBreaker.setEventBus(redisClient.getResources().eventBus());
this.stringConnection = redisClient.connect();
this.binaryConnection = redisClient.connect(ByteArrayCodec.INSTANCE);
this.circuitBreaker = CircuitBreaker.of(name + "-breaker", circuitBreakerConfiguration.toCircuitBreakerConfig());
this.retry = Retry.of(name + "-retry", retryConfiguration.toRetryConfigBuilder()
.retryOnException(exception -> exception instanceof RedisCommandTimeoutException).build());
CircuitBreakerUtil.registerMetrics(retry, FaultTolerantRedisClusterClient.class);
}
public void shutdown() {
stringConnection.close();
for (final StatefulRedisPubSubConnection<?, ?> pubSubConnection : pubSubConnections) {
pubSubConnection.close();
}
redisClient.shutdown();
}
public String getName() {
return name;
}
public void useConnection(final Consumer<StatefulRedisConnection<String, String>> consumer) {
useConnection(stringConnection, consumer);
}
public <T> T withConnection(final Function<StatefulRedisConnection<String, String>, T> function) {
return withConnection(stringConnection, function);
}
public void useBinaryConnection(final Consumer<StatefulRedisConnection<byte[], byte[]>> consumer) {
useConnection(binaryConnection, consumer);
}
public <T> T withBinaryConnection(final Function<StatefulRedisConnection<byte[], byte[]>, T> function) {
return withConnection(binaryConnection, function);
}
public <K, V> void useConnection(final StatefulRedisConnection<K, V> connection,
final Consumer<StatefulRedisConnection<K, V>> consumer) {
try {
circuitBreaker.executeRunnable(() -> retry.executeRunnable(() -> consumer.accept(connection)));
} catch (final Throwable t) {
if (t instanceof RedisException) {
throw (RedisException) t;
} else {
throw new RedisException(t);
}
}
}
public <T, K, V> T withConnection(final StatefulRedisConnection<K, V> connection,
final Function<StatefulRedisConnection<K, V>, T> function) {
try {
return circuitBreaker.executeCallable(() -> retry.executeCallable(() -> function.apply(connection)));
} catch (final Throwable t) {
if (t instanceof RedisException) {
throw (RedisException) t;
} else {
throw new RedisException(t);
}
}
}
public FaultTolerantPubSubConnection<String, String> createPubSubConnection() {
final StatefulRedisPubSubConnection<String, String> pubSubConnection = redisClient.connectPubSub();
pubSubConnections.add(pubSubConnection);
return new FaultTolerantPubSubConnection<>(name, pubSubConnection, retry);
}
public FaultTolerantPubSubConnection<byte[], byte[]> createBinaryPubSubConnection() {
final StatefulRedisPubSubConnection<byte[], byte[]> pubSubConnection = redisClient.connectPubSub(ByteArrayCodec.INSTANCE);
pubSubConnections.add(pubSubConnection);
return new FaultTolerantPubSubConnection<>(name, pubSubConnection, retry);
}
}

View File

@@ -41,7 +41,7 @@ import reactor.core.scheduler.Schedulers;
*
* @see LettuceShardCircuitBreaker
*/
public class FaultTolerantRedisCluster {
public class FaultTolerantRedisClusterClient {
private final String name;
@@ -56,8 +56,8 @@ public class FaultTolerantRedisCluster {
private final Retry topologyChangedEventRetry;
public FaultTolerantRedisCluster(final String name, final RedisClusterConfiguration clusterConfiguration,
final ClientResources.Builder clientResourcesBuilder) {
public FaultTolerantRedisClusterClient(final String name, final RedisClusterConfiguration clusterConfiguration,
final ClientResources.Builder clientResourcesBuilder) {
this(name, clientResourcesBuilder,
Collections.singleton(RedisUriUtil.createRedisUriWithTimeout(clusterConfiguration.getConfigurationUri(),
@@ -68,9 +68,9 @@ public class FaultTolerantRedisCluster {
}
FaultTolerantRedisCluster(String name, final ClientResources.Builder clientResourcesBuilder,
Iterable<RedisURI> redisUris, Duration commandTimeout, CircuitBreakerConfiguration circuitBreakerConfig,
RetryConfiguration retryConfiguration) {
FaultTolerantRedisClusterClient(String name, final ClientResources.Builder clientResourcesBuilder,
Iterable<RedisURI> redisUris, Duration commandTimeout, CircuitBreakerConfiguration circuitBreakerConfig,
RetryConfiguration retryConfiguration) {
this.name = name;
@@ -112,7 +112,7 @@ public class FaultTolerantRedisCluster {
this.topologyChangedEventRetry = Retry.of(name + "-topologyChangedRetry", topologyChangedEventRetryConfig);
CircuitBreakerUtil.registerMetrics(retry, FaultTolerantRedisCluster.class);
CircuitBreakerUtil.registerMetrics(retry, FaultTolerantRedisClusterClient.class);
}
public void shutdown() {
@@ -184,11 +184,11 @@ public class FaultTolerantRedisCluster {
.transformDeferred(RetryOperator.of(retry));
}
public FaultTolerantPubSubConnection<String, String> createPubSubConnection() {
public FaultTolerantPubSubClusterConnection<String, String> createPubSubConnection() {
final StatefulRedisClusterPubSubConnection<String, String> pubSubConnection = clusterClient.connectPubSub();
pubSubConnections.add(pubSubConnection);
return new FaultTolerantPubSubConnection<>(name, pubSubConnection, retry, topologyChangedEventRetry,
return new FaultTolerantPubSubClusterConnection<>(name, pubSubConnection, retry, topologyChangedEventRetry,
Schedulers.newSingle(name + "-redisPubSubEvents", true));
}

View File

@@ -67,7 +67,7 @@ import org.whispersystems.textsecuregcm.identity.IdentityType;
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.redis.RedisOperation;
import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient;
import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecovery2Client;
@@ -107,7 +107,7 @@ public class AccountsManager {
private final Accounts accounts;
private final PhoneNumberIdentifiers phoneNumberIdentifiers;
private final FaultTolerantRedisCluster cacheCluster;
private final FaultTolerantRedisClusterClient cacheCluster;
private final AccountLockManager accountLockManager;
private final KeysManager keysManager;
private final MessagesManager messagesManager;
@@ -157,7 +157,7 @@ public class AccountsManager {
public AccountsManager(final Accounts accounts,
final PhoneNumberIdentifiers phoneNumberIdentifiers,
final FaultTolerantRedisCluster cacheCluster,
final FaultTolerantRedisClusterClient cacheCluster,
final AccountLockManager accountLockManager,
final KeysManager keysManager,
final MessagesManager messagesManager,

View File

@@ -50,8 +50,8 @@ import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.experiment.Experiment;
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubConnection;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantPubSubClusterConnection;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.util.RedisClusterUtil;
import org.whispersystems.textsecuregcm.util.Util;
@@ -119,8 +119,8 @@ import reactor.core.scheduler.Schedulers;
*/
public class MessagesCache extends RedisClusterPubSubAdapter<String, String> implements Managed {
private final FaultTolerantRedisCluster redisCluster;
private final FaultTolerantPubSubConnection<String, String> pubSubConnection;
private final FaultTolerantRedisClusterClient redisCluster;
private final FaultTolerantPubSubClusterConnection<String, String> pubSubConnection;
private final Clock clock;
private final ExecutorService notificationExecutorService;
@@ -183,9 +183,9 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
private static final Logger logger = LoggerFactory.getLogger(MessagesCache.class);
public MessagesCache(final FaultTolerantRedisCluster redisCluster, final ExecutorService notificationExecutorService,
final Scheduler messageDeliveryScheduler, final ExecutorService messageDeletionExecutorService, final Clock clock,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager)
public MessagesCache(final FaultTolerantRedisClusterClient redisCluster, final ExecutorService notificationExecutorService,
final Scheduler messageDeliveryScheduler, final ExecutorService messageDeletionExecutorService, final Clock clock,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager)
throws IOException {
this(
redisCluster,
@@ -205,15 +205,15 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
}
@VisibleForTesting
MessagesCache(final FaultTolerantRedisCluster redisCluster, final ExecutorService notificationExecutorService,
final Scheduler messageDeliveryScheduler, final ExecutorService messageDeletionExecutorService, final Clock clock,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
final MessagesCacheInsertScript insertScript,
final MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript insertMrmScript,
final MessagesCacheGetItemsScript getItemsScript, final MessagesCacheRemoveByGuidScript removeByGuidScript,
final MessagesCacheRemoveQueueScript removeQueueScript,
final MessagesCacheGetQueuesToPersistScript getQueuesToPersistScript,
final MessagesCacheRemoveRecipientViewFromMrmDataScript removeRecipientViewFromMrmDataScript)
MessagesCache(final FaultTolerantRedisClusterClient redisCluster, final ExecutorService notificationExecutorService,
final Scheduler messageDeliveryScheduler, final ExecutorService messageDeletionExecutorService, final Clock clock,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
final MessagesCacheInsertScript insertScript,
final MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript insertMrmScript,
final MessagesCacheGetItemsScript getItemsScript, final MessagesCacheRemoveByGuidScript removeByGuidScript,
final MessagesCacheRemoveQueueScript removeQueueScript,
final MessagesCacheGetQueuesToPersistScript getQueuesToPersistScript,
final MessagesCacheRemoveRecipientViewFromMrmDataScript removeRecipientViewFromMrmDataScript)
throws IOException {
this.redisCluster = redisCluster;

View File

@@ -11,7 +11,7 @@ import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.UUID;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import reactor.core.publisher.Mono;
/**
@@ -22,7 +22,7 @@ class MessagesCacheGetItemsScript {
private final ClusterLuaScript getItemsScript;
MessagesCacheGetItemsScript(FaultTolerantRedisCluster redisCluster) throws IOException {
MessagesCacheGetItemsScript(FaultTolerantRedisClusterClient redisCluster) throws IOException {
this.getItemsScript = ClusterLuaScript.fromResource(redisCluster, "lua/get_items.lua", ScriptOutputType.OBJECT);
}

View File

@@ -11,7 +11,7 @@ import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.List;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
/**
* Returns a list of queues that may be persisted. They will be sorted from oldest to more recent, limited by the
@@ -23,7 +23,7 @@ class MessagesCacheGetQueuesToPersistScript {
private final ClusterLuaScript getQueuesToPersistScript;
MessagesCacheGetQueuesToPersistScript(final FaultTolerantRedisCluster redisCluster) throws IOException {
MessagesCacheGetQueuesToPersistScript(final FaultTolerantRedisClusterClient redisCluster) throws IOException {
this.getQueuesToPersistScript = ClusterLuaScript.fromResource(redisCluster, "lua/get_queues_to_persist.lua",
ScriptOutputType.MULTI);
}

View File

@@ -14,7 +14,7 @@ import java.util.List;
import java.util.UUID;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
/**
* Inserts an envelope into the message queue for a destination device.
@@ -23,7 +23,7 @@ class MessagesCacheInsertScript {
private final ClusterLuaScript insertScript;
MessagesCacheInsertScript(FaultTolerantRedisCluster redisCluster) throws IOException {
MessagesCacheInsertScript(FaultTolerantRedisClusterClient redisCluster) throws IOException {
this.insertScript = ClusterLuaScript.fromResource(redisCluster, "lua/insert_item.lua", ScriptOutputType.INTEGER);
}

View File

@@ -11,7 +11,7 @@ import java.util.ArrayList;
import java.util.List;
import org.signal.libsignal.protocol.SealedSenderMultiRecipientMessage;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
/**
* Inserts the shared multi-recipient message payload into the cache. The list of recipients and views will be set as
@@ -25,7 +25,7 @@ class MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript {
static final String ERROR_KEY_EXISTS = "ERR key exists";
MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript(FaultTolerantRedisCluster redisCluster)
MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript(FaultTolerantRedisClusterClient redisCluster)
throws IOException {
this.script = ClusterLuaScript.fromResource(redisCluster, "lua/insert_shared_multirecipient_message_data.lua",
ScriptOutputType.INTEGER);

View File

@@ -12,7 +12,7 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
/**
* Removes a list of message GUIDs from the queue of a destination device.
@@ -21,7 +21,7 @@ class MessagesCacheRemoveByGuidScript {
private final ClusterLuaScript removeByGuidScript;
MessagesCacheRemoveByGuidScript(final FaultTolerantRedisCluster redisCluster) throws IOException {
MessagesCacheRemoveByGuidScript(final FaultTolerantRedisClusterClient redisCluster) throws IOException {
this.removeByGuidScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_item_by_guid.lua",
ScriptOutputType.OBJECT);
}

View File

@@ -12,7 +12,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import reactor.core.publisher.Mono;
/**
@@ -29,7 +29,7 @@ class MessagesCacheRemoveQueueScript {
private final ClusterLuaScript removeQueueScript;
MessagesCacheRemoveQueueScript(FaultTolerantRedisCluster redisCluster) throws IOException {
MessagesCacheRemoveQueueScript(FaultTolerantRedisClusterClient redisCluster) throws IOException {
this.removeQueueScript = ClusterLuaScript.fromResource(redisCluster, "lua/remove_queue.lua",
ScriptOutputType.MULTI);
}

View File

@@ -12,7 +12,7 @@ import java.util.Collection;
import java.util.List;
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import reactor.core.publisher.Mono;
/**
@@ -25,7 +25,7 @@ class MessagesCacheRemoveRecipientViewFromMrmDataScript {
private final ClusterLuaScript removeRecipientViewFromMrmDataScript;
MessagesCacheRemoveRecipientViewFromMrmDataScript(final FaultTolerantRedisCluster redisCluster) throws IOException {
MessagesCacheRemoveRecipientViewFromMrmDataScript(final FaultTolerantRedisClusterClient redisCluster) throws IOException {
this.removeRecipientViewFromMrmDataScript = ClusterLuaScript.fromResource(redisCluster,
"lua/remove_recipient_view_from_mrm_data.lua", ScriptOutputType.INTEGER);
}

View File

@@ -14,7 +14,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.Util;
import javax.annotation.Nullable;
@@ -26,12 +26,12 @@ public class ProfilesManager {
private static final String CACHE_PREFIX = "profiles::";
private final Profiles profiles;
private final FaultTolerantRedisCluster cacheCluster;
private final FaultTolerantRedisClusterClient cacheCluster;
private final ObjectMapper mapper;
public ProfilesManager(final Profiles profiles,
final FaultTolerantRedisCluster cacheCluster) {
final FaultTolerantRedisClusterClient cacheCluster) {
this.profiles = profiles;
this.cacheCluster = cacheCluster;
this.mapper = SystemMapper.jsonMapper();

View File

@@ -21,13 +21,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.util.UUIDUtil;
public class ReportMessageManager {
private final ReportMessageDynamoDb reportMessageDynamoDb;
private final FaultTolerantRedisCluster rateLimitCluster;
private final FaultTolerantRedisClusterClient rateLimitCluster;
private final Duration counterTtl;
@@ -40,7 +40,7 @@ public class ReportMessageManager {
private static final Logger logger = LoggerFactory.getLogger(ReportMessageManager.class);
public ReportMessageManager(final ReportMessageDynamoDb reportMessageDynamoDb,
final FaultTolerantRedisCluster rateLimitCluster,
final FaultTolerantRedisClusterClient rateLimitCluster,
final Duration counterTtl) {
this.reportMessageDynamoDb = reportMessageDynamoDb;

View File

@@ -34,13 +34,12 @@ import org.whispersystems.textsecuregcm.controllers.SecureValueRecovery2Controll
import org.whispersystems.textsecuregcm.experiment.PushNotificationExperimentSamples;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.metrics.MicrometerAwsSdkMetricPublisher;
import org.whispersystems.textsecuregcm.metrics.NoopAwsSdkMetricPublisher;
import org.whispersystems.textsecuregcm.push.APNSender;
import org.whispersystems.textsecuregcm.push.PushNotificationScheduler;
import org.whispersystems.textsecuregcm.push.ClientPresenceManager;
import org.whispersystems.textsecuregcm.push.FcmSender;
import org.whispersystems.textsecuregcm.push.PushNotificationManager;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient;
import org.whispersystems.textsecuregcm.securevaluerecovery.SecureValueRecovery2Client;
import org.whispersystems.textsecuregcm.storage.AccountLockManager;
@@ -82,8 +81,8 @@ record CommandDependencies(
FcmSender fcmSender,
PushNotificationManager pushNotificationManager,
PushNotificationExperimentSamples pushNotificationExperimentSamples,
FaultTolerantRedisCluster cacheCluster,
FaultTolerantRedisCluster pushSchedulerCluster,
FaultTolerantRedisClusterClient cacheCluster,
FaultTolerantRedisClusterClient pushSchedulerCluster,
ClientResources.Builder redisClusterClientResourcesBuilder,
BackupManager backupManager,
DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
@@ -109,9 +108,9 @@ record CommandDependencies(
final ClientResources.Builder redisClientResourcesBuilder = ClientResources.builder();
FaultTolerantRedisCluster cacheCluster = configuration.getCacheClusterConfiguration()
FaultTolerantRedisClusterClient cacheCluster = configuration.getCacheClusterConfiguration()
.build("main_cache", redisClientResourcesBuilder);
FaultTolerantRedisCluster pushSchedulerCluster = configuration.getPushSchedulerCluster()
FaultTolerantRedisClusterClient pushSchedulerCluster = configuration.getPushSchedulerCluster()
.build("push_scheduler", redisClientResourcesBuilder);
ScheduledExecutorService recurringJobExecutor = environment.lifecycle()
@@ -197,11 +196,11 @@ record CommandDependencies(
configuration.getDynamoDbTables().getMessages().getTableName(),
configuration.getDynamoDbTables().getMessages().getExpiration(),
messageDeletionExecutor);
FaultTolerantRedisCluster messagesCluster = configuration.getMessageCacheConfiguration()
FaultTolerantRedisClusterClient messagesCluster = configuration.getMessageCacheConfiguration()
.getRedisClusterConfiguration().build("messages", redisClientResourcesBuilder);
FaultTolerantRedisCluster clientPresenceCluster = configuration.getClientPresenceClusterConfiguration()
FaultTolerantRedisClusterClient clientPresenceCluster = configuration.getClientPresenceClusterConfiguration()
.build("client_presence", redisClientResourcesBuilder);
FaultTolerantRedisCluster rateLimitersCluster = configuration.getRateLimitersCluster().build("rate_limiters",
FaultTolerantRedisClusterClient rateLimitersCluster = configuration.getRateLimitersCluster().build("rate_limiters",
redisClientResourcesBuilder);
SecureValueRecovery2Client secureValueRecovery2Client = new SecureValueRecovery2Client(
secureValueRecoveryCredentialsGenerator, secureValueRecoveryServiceExecutor,