diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java index 755fed51a..8e74d4f80 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClient.java @@ -9,21 +9,18 @@ import io.lettuce.core.RedisException; import io.lettuce.core.RedisURI; import io.lettuce.core.TimeoutOptions; import io.lettuce.core.api.StatefulRedisConnection; -import io.lettuce.core.cluster.ClusterClientOptions; -import io.lettuce.core.cluster.ClusterTopologyRefreshOptions; import io.lettuce.core.codec.ByteArrayCodec; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; import io.lettuce.core.resource.ClientResources; -import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; -import org.whispersystems.textsecuregcm.configuration.RedisConfiguration; -import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; -import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; -import reactor.core.scheduler.Schedulers; import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.function.Consumer; import java.util.function.Function; +import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; +import org.whispersystems.textsecuregcm.configuration.RedisConfiguration; +import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; +import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; public class FaultTolerantRedisClient { @@ -70,15 +67,9 @@ public class FaultTolerantRedisClient { redisUri.setLibraryName(null); redisUri.setLibraryVersion(null); - final LettuceShardCircuitBreaker lettuceShardCircuitBreaker = new LettuceShardCircuitBreaker(name, - circuitBreakerConfiguration.toCircuitBreakerConfig(), Schedulers.newSingle("topology-changed-" + name, true)); this.redisClient = RedisClient.create(clientResourcesBuilder.build(), redisUri); - this.redisClient.setOptions(ClusterClientOptions.builder() + this.redisClient.setOptions(ClientOptions.builder() .disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS) - .validateClusterNodeMembership(false) - .topologyRefreshOptions(ClusterTopologyRefreshOptions.builder() - .enableAllAdaptiveRefreshTriggers() - .build()) // for asynchronous commands .timeoutOptions(TimeoutOptions.builder() .fixedTimeout(commandTimeout) @@ -86,8 +77,6 @@ public class FaultTolerantRedisClient { .publishOnScheduler(true) .build()); - lettuceShardCircuitBreaker.setEventBus(redisClient.getResources().eventBus()); - this.stringConnection = redisClient.connect(); this.binaryConnection = redisClient.connect(ByteArrayCodec.INSTANCE); @@ -95,7 +84,7 @@ public class FaultTolerantRedisClient { this.retry = Retry.of(name + "-retry", retryConfiguration.toRetryConfigBuilder() .retryOnException(exception -> exception instanceof RedisCommandTimeoutException).build()); - CircuitBreakerUtil.registerMetrics(retry, FaultTolerantRedisClusterClient.class); + CircuitBreakerUtil.registerMetrics(retry, FaultTolerantRedisClient.class); } public void shutdown() {