Remove default/integral retries from Redis clients

This commit is contained in:
Jon Chambers
2025-08-07 09:49:02 -04:00
committed by Jon Chambers
parent 32cf12e9af
commit a8c6fa93e0
9 changed files with 22 additions and 113 deletions

View File

@@ -31,11 +31,6 @@ public class RedisClusterConfiguration implements FaultTolerantRedisClusterFacto
@Valid
private CircuitBreakerConfiguration circuitBreaker = new CircuitBreakerConfiguration();
@JsonProperty
@NotNull
@Valid
private RetryConfiguration retry = new RetryConfiguration();
@VisibleForTesting
void setConfigurationUri(final String configurationUri) {
this.configurationUri = configurationUri;
@@ -53,10 +48,6 @@ public class RedisClusterConfiguration implements FaultTolerantRedisClusterFacto
return circuitBreaker;
}
public RetryConfiguration getRetryConfiguration() {
return retry;
}
@Override
public FaultTolerantRedisClusterClient build(final String name, final ClientResources.Builder clientResourcesBuilder) {
return new FaultTolerantRedisClusterClient(name, this, clientResourcesBuilder);

View File

@@ -31,11 +31,6 @@ public class RedisConfiguration implements FaultTolerantRedisClientFactory {
@Valid
private CircuitBreakerConfiguration circuitBreaker = new CircuitBreakerConfiguration();
@JsonProperty
@NotNull
@Valid
private RetryConfiguration retry = new RetryConfiguration();
public String getUri() {
return uri;
}
@@ -53,10 +48,6 @@ public class RedisConfiguration implements FaultTolerantRedisClientFactory {
return circuitBreaker;
}
public @NotNull @Valid RetryConfiguration getRetryConfiguration() {
return retry;
}
@Override
public FaultTolerantRedisClient build(final String name, final ClientResources clientResources) {
return new FaultTolerantRedisClient(name, this, clientResources.mutate());

View File

@@ -1,10 +1,8 @@
package org.whispersystems.textsecuregcm.redis;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.retry.Retry;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.RedisException;
import io.lettuce.core.RedisURI;
import io.lettuce.core.TimeoutOptions;
@@ -19,8 +17,6 @@ import java.util.function.Consumer;
import java.util.function.Function;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.RedisConfiguration;
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
public class FaultTolerantRedisClient {
@@ -34,7 +30,6 @@ public class FaultTolerantRedisClient {
private final List<StatefulRedisPubSubConnection<?, ?>> pubSubConnections = new ArrayList<>();
private final CircuitBreaker circuitBreaker;
private final Retry retry;
public FaultTolerantRedisClient(final String name,
final RedisConfiguration redisConfiguration,
@@ -43,16 +38,14 @@ public class FaultTolerantRedisClient {
this(name, clientResourcesBuilder,
RedisUriUtil.createRedisUriWithTimeout(redisConfiguration.getUri(), redisConfiguration.getTimeout()),
redisConfiguration.getTimeout(),
redisConfiguration.getCircuitBreakerConfiguration(),
redisConfiguration.getRetryConfiguration());
redisConfiguration.getCircuitBreakerConfiguration());
}
FaultTolerantRedisClient(String name,
final ClientResources.Builder clientResourcesBuilder,
final RedisURI redisUri,
final Duration commandTimeout,
final CircuitBreakerConfiguration circuitBreakerConfiguration,
final RetryConfiguration retryConfiguration) {
final CircuitBreakerConfiguration circuitBreakerConfiguration) {
this.name = name;
@@ -81,10 +74,6 @@ public class FaultTolerantRedisClient {
this.binaryConnection = redisClient.connect(ByteArrayCodec.INSTANCE);
this.circuitBreaker = CircuitBreaker.of(name + "-breaker", circuitBreakerConfiguration.toCircuitBreakerConfig());
this.retry = Retry.of(name + "-retry", retryConfiguration.toRetryConfigBuilder()
.retryOnException(exception -> exception instanceof RedisCommandTimeoutException).build());
CircuitBreakerUtil.registerMetrics(retry, FaultTolerantRedisClient.class);
}
public void shutdown() {
@@ -120,7 +109,7 @@ public class FaultTolerantRedisClient {
private <K, V> void useConnection(final StatefulRedisConnection<K, V> connection,
final Consumer<StatefulRedisConnection<K, V>> consumer) {
try {
circuitBreaker.executeRunnable(() -> retry.executeRunnable(() -> consumer.accept(connection)));
circuitBreaker.executeRunnable(() -> consumer.accept(connection));
} catch (final Throwable t) {
if (t instanceof RedisException) {
throw (RedisException) t;
@@ -133,7 +122,7 @@ public class FaultTolerantRedisClient {
private <T, K, V> T withConnection(final StatefulRedisConnection<K, V> connection,
final Function<StatefulRedisConnection<K, V>, T> function) {
try {
return circuitBreaker.executeCallable(() -> retry.executeCallable(() -> function.apply(connection)));
return circuitBreaker.executeCallable(() -> function.apply(connection));
} catch (final Throwable t) {
if (t instanceof RedisException) {
throw (RedisException) t;

View File

@@ -6,11 +6,9 @@
package org.whispersystems.textsecuregcm.redis;
import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.reactor.retry.RetryOperator;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.RedisException;
import io.lettuce.core.RedisURI;
import io.lettuce.core.TimeoutOptions;
@@ -28,13 +26,8 @@ import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.RedisClusterConfiguration;
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
/**
@@ -53,25 +46,26 @@ public class FaultTolerantRedisClusterClient {
private final List<StatefulRedisClusterPubSubConnection<?, ?>> pubSubConnections = new ArrayList<>();
private final Retry retry;
private final Retry topologyChangedEventRetry;
public FaultTolerantRedisClusterClient(final String name, final RedisClusterConfiguration clusterConfiguration,
final ClientResources.Builder clientResourcesBuilder) {
public FaultTolerantRedisClusterClient(final String name,
final RedisClusterConfiguration clusterConfiguration,
final ClientResources.Builder clientResourcesBuilder) {
this(name, clientResourcesBuilder,
Collections.singleton(RedisUriUtil.createRedisUriWithTimeout(clusterConfiguration.getConfigurationUri(),
clusterConfiguration.getTimeout())),
clusterConfiguration.getTimeout(),
clusterConfiguration.getCircuitBreakerConfiguration(),
clusterConfiguration.getRetryConfiguration());
clusterConfiguration.getCircuitBreakerConfiguration());
}
FaultTolerantRedisClusterClient(String name, final ClientResources.Builder clientResourcesBuilder,
Iterable<RedisURI> redisUris, Duration commandTimeout, CircuitBreakerConfiguration circuitBreakerConfig,
RetryConfiguration retryConfiguration) {
FaultTolerantRedisClusterClient(final String name,
final ClientResources.Builder clientResourcesBuilder,
final Iterable<RedisURI> redisUris,
final Duration commandTimeout,
final CircuitBreakerConfiguration circuitBreakerConfig) {
this.name = name;
@@ -116,8 +110,6 @@ public class FaultTolerantRedisClusterClient {
clusterClient.getResources().eventBus().publish(
new ClusterTopologyChangedEvent(Collections.emptyList(), clusterClient.getPartitions().getPartitions()));
this.retry = Retry.of(name + "-retry", retryConfiguration.toRetryConfigBuilder()
.retryOnException(exception -> exception instanceof RedisCommandTimeoutException).build());
final RetryConfig topologyChangedEventRetryConfig = RetryConfig.custom()
.maxAttempts(Integer.MAX_VALUE)
.intervalFunction(
@@ -125,8 +117,6 @@ public class FaultTolerantRedisClusterClient {
.build();
this.topologyChangedEventRetry = Retry.of(name + "-topologyChangedRetry", topologyChangedEventRetryConfig);
CircuitBreakerUtil.registerMetrics(retry, FaultTolerantRedisClusterClient.class);
}
public void shutdown() {
@@ -160,15 +150,10 @@ public class FaultTolerantRedisClusterClient {
return withConnection(binaryConnection, function);
}
public <T> Publisher<T> withBinaryClusterReactive(
final Function<StatefulRedisClusterConnection<byte[], byte[]>, Publisher<T>> function) {
return withConnectionReactive(binaryConnection, function);
}
private <K, V> void useConnection(final StatefulRedisClusterConnection<K, V> connection,
final Consumer<StatefulRedisClusterConnection<K, V>> consumer) {
try {
retry.executeRunnable(() -> consumer.accept(connection));
consumer.accept(connection);
} catch (final Throwable t) {
if (t instanceof RedisException) {
throw (RedisException) t;
@@ -181,7 +166,7 @@ public class FaultTolerantRedisClusterClient {
private <T, K, V> T withConnection(final StatefulRedisClusterConnection<K, V> connection,
final Function<StatefulRedisClusterConnection<K, V>, T> function) {
try {
return retry.executeCallable(() -> function.apply(connection));
return function.apply(connection);
} catch (final Throwable t) {
if (t instanceof RedisException) {
throw (RedisException) t;
@@ -191,22 +176,6 @@ public class FaultTolerantRedisClusterClient {
}
}
private <T, K, V> Publisher<T> withConnectionReactive(
final StatefulRedisClusterConnection<K, V> connection,
final Function<StatefulRedisClusterConnection<K, V>, Publisher<T>> function) {
final Publisher<T> publisher = function.apply(connection);
if (publisher instanceof Mono<T> m) {
return m.transformDeferred(RetryOperator.of(retry));
}
if (publisher instanceof Flux<T> f) {
return f.transformDeferred(RetryOperator.of(retry));
}
return Flux.from(publisher).transformDeferred(RetryOperator.of(retry));
}
public FaultTolerantPubSubClusterConnection<String, String> createPubSubConnection() {
final StatefulRedisClusterPubSubConnection<String, String> pubSubConnection = clusterClient.connectPubSub();
pubSubConnections.add(pubSubConnection);

View File

@@ -434,7 +434,7 @@ public class MessagesCache {
// the message might be addressed to the account's PNI, so use the service ID from the envelope
ServiceIdentifier.valueOf(mrmMessage.getDestinationServiceId()), destinationDevice);
return Mono.from(redisCluster.withBinaryClusterReactive(
return Mono.from(redisCluster.withBinaryCluster(
conn -> conn.reactive().hmget(key, "data".getBytes(StandardCharsets.UTF_8), sharedMrmViewKey)
.collectList()
.publishOn(messageDeliveryScheduler)))