diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisClusterConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisClusterConfiguration.java index 1fa6bb342..eff651d76 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisClusterConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisClusterConfiguration.java @@ -31,11 +31,6 @@ public class RedisClusterConfiguration implements FaultTolerantRedisClusterFacto @Valid private CircuitBreakerConfiguration circuitBreaker = new CircuitBreakerConfiguration(); - @JsonProperty - @NotNull - @Valid - private RetryConfiguration retry = new RetryConfiguration(); - @VisibleForTesting void setConfigurationUri(final String configurationUri) { this.configurationUri = configurationUri; @@ -53,10 +48,6 @@ public class RedisClusterConfiguration implements FaultTolerantRedisClusterFacto return circuitBreaker; } - public RetryConfiguration getRetryConfiguration() { - return retry; - } - @Override public FaultTolerantRedisClusterClient build(final String name, final ClientResources.Builder clientResourcesBuilder) { return new FaultTolerantRedisClusterClient(name, this, clientResourcesBuilder); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisConfiguration.java index 6a32bb223..9899efde5 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/RedisConfiguration.java @@ -31,11 +31,6 @@ public class RedisConfiguration implements FaultTolerantRedisClientFactory { @Valid private CircuitBreakerConfiguration circuitBreaker = new CircuitBreakerConfiguration(); - @JsonProperty - @NotNull - @Valid - private RetryConfiguration retry = new RetryConfiguration(); - public String getUri() { return uri; } @@ -53,10 +48,6 @@ public class RedisConfiguration implements FaultTolerantRedisClientFactory { return circuitBreaker; } - public @NotNull @Valid RetryConfiguration getRetryConfiguration() { - return retry; - } - @Override public FaultTolerantRedisClient build(final String name, final ClientResources clientResources) { return new FaultTolerantRedisClient(name, this, clientResources.mutate()); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java index 8e74d4f80..9c854f0e0 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java @@ -1,10 +1,8 @@ package org.whispersystems.textsecuregcm.redis; import io.github.resilience4j.circuitbreaker.CircuitBreaker; -import io.github.resilience4j.retry.Retry; import io.lettuce.core.ClientOptions; import io.lettuce.core.RedisClient; -import io.lettuce.core.RedisCommandTimeoutException; import io.lettuce.core.RedisException; import io.lettuce.core.RedisURI; import io.lettuce.core.TimeoutOptions; @@ -19,8 +17,6 @@ import java.util.function.Consumer; import java.util.function.Function; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.configuration.RedisConfiguration; -import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; -import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; public class FaultTolerantRedisClient { @@ -34,7 +30,6 @@ public class FaultTolerantRedisClient { private final List> pubSubConnections = new ArrayList<>(); private final CircuitBreaker circuitBreaker; - private final Retry retry; public FaultTolerantRedisClient(final String name, final RedisConfiguration redisConfiguration, @@ -43,16 +38,14 @@ public class FaultTolerantRedisClient { this(name, clientResourcesBuilder, RedisUriUtil.createRedisUriWithTimeout(redisConfiguration.getUri(), redisConfiguration.getTimeout()), redisConfiguration.getTimeout(), - redisConfiguration.getCircuitBreakerConfiguration(), - redisConfiguration.getRetryConfiguration()); + redisConfiguration.getCircuitBreakerConfiguration()); } FaultTolerantRedisClient(String name, final ClientResources.Builder clientResourcesBuilder, final RedisURI redisUri, final Duration commandTimeout, - final CircuitBreakerConfiguration circuitBreakerConfiguration, - final RetryConfiguration retryConfiguration) { + final CircuitBreakerConfiguration circuitBreakerConfiguration) { this.name = name; @@ -81,10 +74,6 @@ public class FaultTolerantRedisClient { this.binaryConnection = redisClient.connect(ByteArrayCodec.INSTANCE); this.circuitBreaker = CircuitBreaker.of(name + "-breaker", circuitBreakerConfiguration.toCircuitBreakerConfig()); - this.retry = Retry.of(name + "-retry", retryConfiguration.toRetryConfigBuilder() - .retryOnException(exception -> exception instanceof RedisCommandTimeoutException).build()); - - CircuitBreakerUtil.registerMetrics(retry, FaultTolerantRedisClient.class); } public void shutdown() { @@ -120,7 +109,7 @@ public class FaultTolerantRedisClient { private void useConnection(final StatefulRedisConnection connection, final Consumer> consumer) { try { - circuitBreaker.executeRunnable(() -> retry.executeRunnable(() -> consumer.accept(connection))); + circuitBreaker.executeRunnable(() -> consumer.accept(connection)); } catch (final Throwable t) { if (t instanceof RedisException) { throw (RedisException) t; @@ -133,7 +122,7 @@ public class FaultTolerantRedisClient { private T withConnection(final StatefulRedisConnection connection, final Function, T> function) { try { - return circuitBreaker.executeCallable(() -> retry.executeCallable(() -> function.apply(connection))); + return circuitBreaker.executeCallable(() -> function.apply(connection)); } catch (final Throwable t) { if (t instanceof RedisException) { throw (RedisException) t; diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java index a313a423b..0a8f22044 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java @@ -6,11 +6,9 @@ package org.whispersystems.textsecuregcm.redis; import io.github.resilience4j.core.IntervalFunction; -import io.github.resilience4j.reactor.retry.RetryOperator; import io.github.resilience4j.retry.Retry; import io.github.resilience4j.retry.RetryConfig; import io.lettuce.core.ClientOptions; -import io.lettuce.core.RedisCommandTimeoutException; import io.lettuce.core.RedisException; import io.lettuce.core.RedisURI; import io.lettuce.core.TimeoutOptions; @@ -28,13 +26,8 @@ import java.util.Collections; import java.util.List; import java.util.function.Consumer; import java.util.function.Function; -import org.reactivestreams.Publisher; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.configuration.RedisClusterConfiguration; -import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; -import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; /** @@ -53,25 +46,26 @@ public class FaultTolerantRedisClusterClient { private final List> pubSubConnections = new ArrayList<>(); - private final Retry retry; private final Retry topologyChangedEventRetry; - public FaultTolerantRedisClusterClient(final String name, final RedisClusterConfiguration clusterConfiguration, - final ClientResources.Builder clientResourcesBuilder) { + public FaultTolerantRedisClusterClient(final String name, + final RedisClusterConfiguration clusterConfiguration, + final ClientResources.Builder clientResourcesBuilder) { this(name, clientResourcesBuilder, Collections.singleton(RedisUriUtil.createRedisUriWithTimeout(clusterConfiguration.getConfigurationUri(), clusterConfiguration.getTimeout())), clusterConfiguration.getTimeout(), - clusterConfiguration.getCircuitBreakerConfiguration(), - clusterConfiguration.getRetryConfiguration()); + clusterConfiguration.getCircuitBreakerConfiguration()); } - FaultTolerantRedisClusterClient(String name, final ClientResources.Builder clientResourcesBuilder, - Iterable redisUris, Duration commandTimeout, CircuitBreakerConfiguration circuitBreakerConfig, - RetryConfiguration retryConfiguration) { + FaultTolerantRedisClusterClient(final String name, + final ClientResources.Builder clientResourcesBuilder, + final Iterable redisUris, + final Duration commandTimeout, + final CircuitBreakerConfiguration circuitBreakerConfig) { this.name = name; @@ -116,8 +110,6 @@ public class FaultTolerantRedisClusterClient { clusterClient.getResources().eventBus().publish( new ClusterTopologyChangedEvent(Collections.emptyList(), clusterClient.getPartitions().getPartitions())); - this.retry = Retry.of(name + "-retry", retryConfiguration.toRetryConfigBuilder() - .retryOnException(exception -> exception instanceof RedisCommandTimeoutException).build()); final RetryConfig topologyChangedEventRetryConfig = RetryConfig.custom() .maxAttempts(Integer.MAX_VALUE) .intervalFunction( @@ -125,8 +117,6 @@ public class FaultTolerantRedisClusterClient { .build(); this.topologyChangedEventRetry = Retry.of(name + "-topologyChangedRetry", topologyChangedEventRetryConfig); - - CircuitBreakerUtil.registerMetrics(retry, FaultTolerantRedisClusterClient.class); } public void shutdown() { @@ -160,15 +150,10 @@ public class FaultTolerantRedisClusterClient { return withConnection(binaryConnection, function); } - public Publisher withBinaryClusterReactive( - final Function, Publisher> function) { - return withConnectionReactive(binaryConnection, function); - } - private void useConnection(final StatefulRedisClusterConnection connection, final Consumer> consumer) { try { - retry.executeRunnable(() -> consumer.accept(connection)); + consumer.accept(connection); } catch (final Throwable t) { if (t instanceof RedisException) { throw (RedisException) t; @@ -181,7 +166,7 @@ public class FaultTolerantRedisClusterClient { private T withConnection(final StatefulRedisClusterConnection connection, final Function, T> function) { try { - return retry.executeCallable(() -> function.apply(connection)); + return function.apply(connection); } catch (final Throwable t) { if (t instanceof RedisException) { throw (RedisException) t; @@ -191,22 +176,6 @@ public class FaultTolerantRedisClusterClient { } } - private Publisher withConnectionReactive( - final StatefulRedisClusterConnection connection, - final Function, Publisher> function) { - - final Publisher publisher = function.apply(connection); - - if (publisher instanceof Mono m) { - return m.transformDeferred(RetryOperator.of(retry)); - } - if (publisher instanceof Flux f) { - return f.transformDeferred(RetryOperator.of(retry)); - } - - return Flux.from(publisher).transformDeferred(RetryOperator.of(retry)); - } - public FaultTolerantPubSubClusterConnection createPubSubConnection() { final StatefulRedisClusterPubSubConnection pubSubConnection = clusterClient.connectPubSub(); pubSubConnections.add(pubSubConnection); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index ab670aedd..53221bc5a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -434,7 +434,7 @@ public class MessagesCache { // the message might be addressed to the account's PNI, so use the service ID from the envelope ServiceIdentifier.valueOf(mrmMessage.getDestinationServiceId()), destinationDevice); - return Mono.from(redisCluster.withBinaryClusterReactive( + return Mono.from(redisCluster.withBinaryCluster( conn -> conn.reactive().hmget(key, "data".getBytes(StandardCharsets.UTF_8), sharedMrmViewKey) .collectList() .publishOn(messageDeliveryScheduler))) diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClientTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClientTest.java index 2d34a23d9..565842fed 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClientTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClientTest.java @@ -17,7 +17,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; -import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; // ThreadMode.SEPARATE_THREAD protects against hangs in the remote Redis calls, as this mode allows the test code to be // preempted by the timeout check @@ -26,13 +25,6 @@ class FaultTolerantRedisClientTest { private static final Duration TIMEOUT = Duration.ofMillis(50); - private static final RetryConfiguration RETRY_CONFIGURATION = new RetryConfiguration(); - - static { - RETRY_CONFIGURATION.setMaxAttempts(1); - RETRY_CONFIGURATION.setWaitDuration(50); - } - @RegisterExtension static final RedisServerExtension REDIS_SERVER_EXTENSION = RedisServerExtension.builder().build(); @@ -44,8 +36,7 @@ class FaultTolerantRedisClientTest { return new FaultTolerantRedisClient("test", clientResourcesBuilder, RedisServerExtension.getRedisURI(), TIMEOUT, - Optional.ofNullable(circuitBreakerConfiguration).orElseGet(CircuitBreakerConfiguration::new), - RETRY_CONFIGURATION); + Optional.ofNullable(circuitBreakerConfiguration).orElseGet(CircuitBreakerConfiguration::new)); } @AfterEach diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClientTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClientTest.java index 48809d7c7..d98c2bef4 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClientTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClientTest.java @@ -61,7 +61,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; -import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.textsecuregcm.util.RedisClusterUtil; @@ -72,16 +71,8 @@ class FaultTolerantRedisClusterClientTest { private static final Duration TIMEOUT = Duration.ofMillis(200); - private static final RetryConfiguration RETRY_CONFIGURATION = new RetryConfiguration(); - - static { - RETRY_CONFIGURATION.setMaxAttempts(1); - RETRY_CONFIGURATION.setWaitDuration(50); - } - @RegisterExtension static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder() - .retryConfiguration(RETRY_CONFIGURATION) .timeout(TIMEOUT) .build(); @@ -94,8 +85,7 @@ class FaultTolerantRedisClusterClientTest { return new FaultTolerantRedisClusterClient("test", clientResourcesBuilder.socketAddressResolver(REDIS_CLUSTER_EXTENSION.getSocketAddressResolver()), RedisClusterExtension.getRedisURIs(), TIMEOUT, - Optional.ofNullable(circuitBreakerConfiguration).orElseGet(CircuitBreakerConfiguration::new), - RETRY_CONFIGURATION); + Optional.ofNullable(circuitBreakerConfiguration).orElseGet(CircuitBreakerConfiguration::new)); } @AfterEach diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisClusterExtension.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisClusterExtension.java index 98479b048..de87b5817 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisClusterExtension.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisClusterExtension.java @@ -30,7 +30,6 @@ import org.testcontainers.containers.ComposeContainer; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.containers.wait.strategy.WaitStrategy; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; -import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; import org.whispersystems.textsecuregcm.util.TestcontainersImages; public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallback, AfterEachCallback, ExtensionContext.Store.CloseableResource { @@ -40,7 +39,6 @@ public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallb private static List redisUris; private final Duration timeout; - private final RetryConfiguration retryConfiguration; private ClientResources redisClientResources; private FaultTolerantRedisClusterClient redisClusterClient; @@ -79,9 +77,8 @@ public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallb - 'REDIS_CLUSTER_CREATOR=yes' """, TestcontainersImages.getRedisCluster()); - public RedisClusterExtension(final Duration timeout, final RetryConfiguration retryConfiguration) { + public RedisClusterExtension(final Duration timeout) { this.timeout = timeout; - this.retryConfiguration = retryConfiguration; } @@ -191,8 +188,7 @@ public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallb redisClientResources.mutate(), getRedisURIs(), timeout, - circuitBreakerConfig, - retryConfiguration); + circuitBreakerConfig); redisClusterClient.useCluster(connection -> connection.sync().flushall(FlushMode.SYNC)); } @@ -226,7 +222,6 @@ public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallb public static class Builder { private Duration timeout = DEFAULT_TIMEOUT; - private RetryConfiguration retryConfiguration = new RetryConfiguration(); private Builder() { } @@ -236,13 +231,8 @@ public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallb return this; } - Builder retryConfiguration(RetryConfiguration retryConfiguration) { - this.retryConfiguration = retryConfiguration; - return this; - } - public RedisClusterExtension build() { - return new RedisClusterExtension(timeout, retryConfiguration); + return new RedisClusterExtension(timeout); } } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisServerExtension.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisServerExtension.java index 969a13052..adc281122 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisServerExtension.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisServerExtension.java @@ -16,7 +16,6 @@ import org.junit.jupiter.api.extension.BeforeEachCallback; import org.junit.jupiter.api.extension.ExtensionContext; import org.testcontainers.utility.DockerImageName; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; -import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; import org.whispersystems.textsecuregcm.util.TestcontainersImages; public class RedisServerExtension implements BeforeAllCallback, BeforeEachCallback, AfterEachCallback, ExtensionContext.Store.CloseableResource { @@ -65,8 +64,7 @@ public class RedisServerExtension implements BeforeAllCallback, BeforeEachCallba redisClientResources.mutate(), getRedisURI(), Duration.ofSeconds(2), - circuitBreakerConfig, - new RetryConfiguration()); + circuitBreakerConfig); faultTolerantRedisClient.useConnection(connection -> connection.sync().flushall(FlushMode.SYNC)); }