Use central registries for Retry and CircuitBreaker instances

This commit is contained in:
Jon Chambers
2025-08-27 11:33:42 -04:00
committed by GitHub
parent a8c6fa93e0
commit f616612104
33 changed files with 326 additions and 349 deletions

View File

@@ -5,6 +5,7 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.RedisException;
import io.lettuce.core.resource.ClientResources;
@@ -34,9 +35,14 @@ class FaultTolerantRedisClientTest {
@Nullable final CircuitBreakerConfiguration circuitBreakerConfiguration,
final ClientResources.Builder clientResourcesBuilder) {
return new FaultTolerantRedisClient("test", clientResourcesBuilder,
RedisServerExtension.getRedisURI(), TIMEOUT,
Optional.ofNullable(circuitBreakerConfiguration).orElseGet(CircuitBreakerConfiguration::new));
final CircuitBreaker circuitBreaker = CircuitBreaker.of("test", Optional.ofNullable(circuitBreakerConfiguration)
.orElseGet(CircuitBreakerConfiguration::new).toCircuitBreakerConfig());
return new FaultTolerantRedisClient("test",
clientResourcesBuilder,
RedisServerExtension.getRedisURI(),
TIMEOUT,
circuitBreaker);
}
@AfterEach

View File

@@ -46,7 +46,6 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@@ -61,6 +60,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.util.RedisClusterUtil;
@@ -71,6 +71,8 @@ class FaultTolerantRedisClusterClientTest {
private static final Duration TIMEOUT = Duration.ofMillis(200);
private static int circuitBreakerConfigurationCount = 0;
@RegisterExtension
static final RedisClusterExtension REDIS_CLUSTER_EXTENSION = RedisClusterExtension.builder()
.timeout(TIMEOUT)
@@ -79,13 +81,24 @@ class FaultTolerantRedisClusterClientTest {
private FaultTolerantRedisClusterClient cluster;
private static FaultTolerantRedisClusterClient buildCluster(
final String name,
@Nullable final CircuitBreakerConfiguration circuitBreakerConfiguration,
final ClientResources.Builder clientResourcesBuilder) {
return new FaultTolerantRedisClusterClient("test",
final String circuitBreakerConfigurationName;
if (circuitBreakerConfiguration != null) {
circuitBreakerConfigurationName = FaultTolerantRedisClusterClientTest.class.getSimpleName() + "-" + circuitBreakerConfigurationCount++;
CircuitBreakerUtil.getCircuitBreakerRegistry().addConfiguration(circuitBreakerConfigurationName, circuitBreakerConfiguration.toCircuitBreakerConfig());
} else {
circuitBreakerConfigurationName = null;
}
return new FaultTolerantRedisClusterClient(name,
clientResourcesBuilder.socketAddressResolver(REDIS_CLUSTER_EXTENSION.getSocketAddressResolver()),
RedisClusterExtension.getRedisURIs(), TIMEOUT,
Optional.ofNullable(circuitBreakerConfiguration).orElseGet(CircuitBreakerConfiguration::new));
RedisClusterExtension.getRedisURIs(),
TIMEOUT,
circuitBreakerConfigurationName);
}
@AfterEach
@@ -95,7 +108,7 @@ class FaultTolerantRedisClusterClientTest {
@Test
void testTimeout() {
cluster = buildCluster(null, ClientResources.builder());
cluster = buildCluster("testTimeout", null, ClientResources.builder());
final ExecutionException asyncException = assertThrows(ExecutionException.class,
() -> cluster.withCluster(connection -> connection.async().blpop(10 * TIMEOUT.toMillis() / 1000d, "key"))
@@ -119,7 +132,7 @@ class FaultTolerantRedisClusterClientTest {
circuitBreakerConfig.setSlidingWindowSize(1);
circuitBreakerConfig.setWaitDurationInOpenState(breakerWaitDuration);
cluster = buildCluster(circuitBreakerConfig, ClientResources.builder());
cluster = buildCluster("testTimeoutCircuitBreaker", circuitBreakerConfig, ClientResources.builder());
final String key = "key";
@@ -149,7 +162,7 @@ class FaultTolerantRedisClusterClientTest {
final ClientResources.Builder builder = CompositeNettyCustomizerClientResourcesBuilder.builder()
.nettyCustomizer(testBreakerManager);
cluster = buildCluster(circuitBreakerConfig, builder);
cluster = buildCluster("testShardUnavailable", circuitBreakerConfig, builder);
// this test will open the breaker on one shard and check that other shards are still available,
// so we get two nodes and a slot+key on each to test
@@ -202,7 +215,7 @@ class FaultTolerantRedisClusterClientTest {
final ClientResources.Builder builder = CompositeNettyCustomizerClientResourcesBuilder.builder()
.nettyCustomizer(testBreakerManager);
cluster = buildCluster(circuitBreakerConfig, builder);
cluster = buildCluster("testShardUnavailablePubSub", circuitBreakerConfig, builder);
cluster.useCluster(
connection -> connection.sync().upstream().commands().configSet("notify-keyspace-events", "K$glz"));

View File

@@ -33,19 +33,15 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
@@ -59,15 +55,15 @@ class LettuceShardCircuitBreakerTest {
eventBus = mock(EventBus.class);
when(eventBus.get()).thenReturn(Flux.never());
channelCircuitBreakerHandler = new LettuceShardCircuitBreaker.ChannelCircuitBreakerHandler(
"test", new CircuitBreakerConfiguration().toCircuitBreakerConfig(), Collections.emptySet(), eventBus,
Schedulers.immediate());
"test", null, Collections.emptySet(), eventBus, Schedulers.immediate());
}
@Test
void testAfterChannelInitialized() {
final LettuceShardCircuitBreaker lettuceShardCircuitBreaker = new LettuceShardCircuitBreaker("test",
new CircuitBreakerConfiguration().toCircuitBreakerConfig(), Schedulers.immediate());
final LettuceShardCircuitBreaker lettuceShardCircuitBreaker =
new LettuceShardCircuitBreaker("test", null, Schedulers.immediate());
lettuceShardCircuitBreaker.setEventBus(eventBus);
final Channel channel = new EmbeddedChannel(

View File

@@ -22,6 +22,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
@@ -30,6 +31,7 @@ import org.testcontainers.containers.ComposeContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import org.whispersystems.textsecuregcm.util.TestcontainersImages;
public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallback, AfterEachCallback, ExtensionContext.Store.CloseableResource {
@@ -43,6 +45,9 @@ public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallb
private ClientResources redisClientResources;
private FaultTolerantRedisClusterClient redisClusterClient;
private static final String CIRCUIT_BREAKER_CONFIGURATION_NAME =
RedisClusterExtension.class.getSimpleName() + "-" + RandomStringUtils.insecure().nextAlphanumeric(8);
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(2);
private static final int REDIS_PORT = 6379;
@@ -97,6 +102,11 @@ public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallb
@Override
public void beforeAll(final ExtensionContext context) throws Exception {
if (composeContainer == null) {
final CircuitBreakerConfiguration circuitBreakerConfig = new CircuitBreakerConfiguration();
circuitBreakerConfig.setWaitDurationInOpenState(Duration.ofMillis(500));
CircuitBreakerUtil.getCircuitBreakerRegistry().addConfiguration(CIRCUIT_BREAKER_CONFIGURATION_NAME, circuitBreakerConfig.toCircuitBreakerConfig());
final File clusterComposeFile = File.createTempFile("redis-cluster", ".yml");
clusterComposeFile.deleteOnExit();
@@ -181,14 +191,11 @@ public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallb
.socketAddressResolver(getSocketAddressResolver())
.build();
final CircuitBreakerConfiguration circuitBreakerConfig = new CircuitBreakerConfiguration();
circuitBreakerConfig.setWaitDurationInOpenState(Duration.ofMillis(500));
redisClusterClient = new FaultTolerantRedisClusterClient("test-cluster",
redisClientResources.mutate(),
getRedisURIs(),
timeout,
circuitBreakerConfig);
CIRCUIT_BREAKER_CONFIGURATION_NAME);
redisClusterClient.useCluster(connection -> connection.sync().flushall(FlushMode.SYNC));
}

View File

@@ -6,6 +6,7 @@
package org.whispersystems.textsecuregcm.redis;
import com.redis.testcontainers.RedisContainer;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.lettuce.core.FlushMode;
import io.lettuce.core.RedisURI;
import io.lettuce.core.resource.ClientResources;
@@ -64,7 +65,7 @@ public class RedisServerExtension implements BeforeAllCallback, BeforeEachCallba
redisClientResources.mutate(),
getRedisURI(),
Duration.ofSeconds(2),
circuitBreakerConfig);
CircuitBreaker.of("test", circuitBreakerConfig.toCircuitBreakerConfig()));
faultTolerantRedisClient.useConnection(connection -> connection.sync().flushall(FlushMode.SYNC));
}