Create new Redis client resources for each test

This commit is contained in:
Jon Chambers
2025-07-16 18:28:52 -04:00
committed by Jon Chambers
parent 1f60300555
commit 73748a6341
2 changed files with 63 additions and 46 deletions

View File

@@ -22,6 +22,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
@@ -31,16 +32,16 @@ import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallback, ExtensionContext.Store.CloseableResource {
public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallback, AfterEachCallback, ExtensionContext.Store.CloseableResource {
private static ComposeContainer composeContainer;
private static ClientResources redisClientResources;
private static Map<HostAndPort, HostAndPort> exposedAddressesByInternalAddress;
private static List<RedisURI> redisUris;
private final Duration timeout;
private final RetryConfiguration retryConfiguration;
private ClientResources redisClientResources;
private FaultTolerantRedisClusterClient redisClusterClient;
private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(2);
@@ -63,18 +64,15 @@ public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallb
@Override
public void close() throws Throwable {
if (redisClientResources != null) {
redisClientResources.shutdown().get();
if (composeContainer != null) {
composeContainer.stop();
redisClusterClient.shutdown();
composeContainer = null;
}
redisClientResources = null;
}
@Override
public void beforeAll(final ExtensionContext context) throws Exception {
if (redisClientResources == null) {
if (composeContainer == null) {
final URL clusterComposeFileUrl = getClass().getResource("redis-cluster.yml");
if (clusterComposeFileUrl == null) {
@@ -115,52 +113,66 @@ public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallb
composeContainer.getServicePort("redis-0-1", REDIS_PORT)))
.toList();
redisClientResources = ClientResources.builder()
.socketAddressResolver(getSocketAddressResolver())
.build();
// Wait for the cluster to be fully up; just having the containers running isn't enough since they still need to do
// some post-launch cluster setup work.
boolean allNodesUp;
final Instant deadline = Instant.now().plus(CLUSTER_UP_DEADLINE);
do {
allNodesUp = redisUris.stream()
.allMatch(redisUri -> {
try (final RedisClient redisClient = RedisClient.create(redisClientResources, redisUri)) {
final String clusterInfo = redisClient.connect().sync().clusterInfo();
return clusterInfo.contains("cluster_state:ok") && clusterInfo.contains("cluster_slots_ok:16384");
} catch (final Exception e) {
return false;
}
});
final ClientResources clientResources = ClientResources.builder()
.socketAddressResolver(getSocketAddressResolver())
.build();
if (Instant.now().isAfter(deadline)) {
throw new RuntimeException("Cluster did not start before deadline");
}
try {
do {
allNodesUp = redisUris.stream()
.allMatch(redisUri -> {
try (final RedisClient redisClient = RedisClient.create(clientResources, redisUri)) {
final String clusterInfo = redisClient.connect().sync().clusterInfo();
return clusterInfo.contains("cluster_state:ok") && clusterInfo.contains("cluster_slots_ok:16384");
} catch (final Exception e) {
return false;
}
});
if (!allNodesUp) {
Thread.sleep(100);
}
} while (!allNodesUp);
if (Instant.now().isAfter(deadline)) {
throw new RuntimeException("Cluster did not start before deadline");
}
final CircuitBreakerConfiguration circuitBreakerConfig = new CircuitBreakerConfiguration();
circuitBreakerConfig.setWaitDurationInOpenState(Duration.ofMillis(500));
redisClusterClient = new FaultTolerantRedisClusterClient("test-cluster",
redisClientResources.mutate(),
getRedisURIs(),
timeout,
circuitBreakerConfig,
retryConfiguration);
if (!allNodesUp) {
Thread.sleep(100);
}
} while (!allNodesUp);
} finally {
clientResources.shutdown().await();
}
}
}
@Override
public void beforeEach(final ExtensionContext context) throws Exception {
redisClientResources = ClientResources.builder()
.socketAddressResolver(getSocketAddressResolver())
.build();
final CircuitBreakerConfiguration circuitBreakerConfig = new CircuitBreakerConfiguration();
circuitBreakerConfig.setWaitDurationInOpenState(Duration.ofMillis(500));
redisClusterClient = new FaultTolerantRedisClusterClient("test-cluster",
redisClientResources.mutate(),
getRedisURIs(),
timeout,
circuitBreakerConfig,
retryConfiguration);
redisClusterClient.useCluster(connection -> connection.sync().flushall(FlushMode.SYNC));
}
@Override
public void afterEach(final ExtensionContext context) throws InterruptedException {
redisClusterClient.shutdown();
redisClientResources.shutdown().await();
}
public static List<RedisURI> getRedisURIs() {
return redisUris;
}

View File

@@ -10,6 +10,7 @@ import io.lettuce.core.FlushMode;
import io.lettuce.core.RedisURI;
import io.lettuce.core.resource.ClientResources;
import java.time.Duration;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
@@ -17,11 +18,11 @@ import org.testcontainers.utility.DockerImageName;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
public class RedisServerExtension implements BeforeAllCallback, BeforeEachCallback, ExtensionContext.Store.CloseableResource {
public class RedisServerExtension implements BeforeAllCallback, BeforeEachCallback, AfterEachCallback, ExtensionContext.Store.CloseableResource {
private static RedisContainer redisContainer;
private static ClientResources redisClientResources;
private ClientResources redisClientResources;
private FaultTolerantRedisClient faultTolerantRedisClient;
// redis:7.4-apline; see https://hub.docker.com/layers/library/redis/7.4-alpine/images/sha256-e1b05db81cda983ede3bbb3e834e7ebec8faafa275f55f7f91f3ee84114f98a7
@@ -46,8 +47,6 @@ public class RedisServerExtension implements BeforeAllCallback, BeforeEachCallba
if (redisContainer == null) {
redisContainer = new RedisContainer(REDIS_IMAGE);
redisContainer.start();
redisClientResources = ClientResources.builder().build();
}
}
@@ -59,6 +58,9 @@ public class RedisServerExtension implements BeforeAllCallback, BeforeEachCallba
public void beforeEach(final ExtensionContext context) {
final CircuitBreakerConfiguration circuitBreakerConfig = new CircuitBreakerConfiguration();
circuitBreakerConfig.setWaitDurationInOpenState(Duration.ofMillis(500));
redisClientResources = ClientResources.builder().build();
faultTolerantRedisClient = new FaultTolerantRedisClient("test-redis-client",
redisClientResources.mutate(),
getRedisURI(),
@@ -69,15 +71,18 @@ public class RedisServerExtension implements BeforeAllCallback, BeforeEachCallba
faultTolerantRedisClient.useConnection(connection -> connection.sync().flushall(FlushMode.SYNC));
}
@Override
public void afterEach(final ExtensionContext context) throws InterruptedException {
faultTolerantRedisClient.shutdown();
redisClientResources.shutdown().await();
}
@Override
public void close() throws Throwable {
if (redisContainer != null) {
redisClientResources.shutdown().await();
redisContainer.stop();
redisContainer = null;
}
redisClientResources = null;
redisContainer = null;
}
public FaultTolerantRedisClient getRedisClient() {