diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisClusterExtension.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisClusterExtension.java index c346c1032..408ff38e9 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisClusterExtension.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisClusterExtension.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.BeforeAllCallback; import org.junit.jupiter.api.extension.BeforeEachCallback; import org.junit.jupiter.api.extension.ExtensionContext; @@ -31,16 +32,16 @@ import org.testcontainers.containers.wait.strategy.WaitStrategy; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; -public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallback, ExtensionContext.Store.CloseableResource { +public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallback, AfterEachCallback, ExtensionContext.Store.CloseableResource { private static ComposeContainer composeContainer; - private static ClientResources redisClientResources; private static Map exposedAddressesByInternalAddress; private static List redisUris; private final Duration timeout; private final RetryConfiguration retryConfiguration; + private ClientResources redisClientResources; private FaultTolerantRedisClusterClient redisClusterClient; private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(2); @@ -63,18 +64,15 @@ public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallb @Override public void close() throws Throwable { - if (redisClientResources != null) { - redisClientResources.shutdown().get(); + if (composeContainer != null) { composeContainer.stop(); - redisClusterClient.shutdown(); + composeContainer = null; } - - redisClientResources = null; } @Override public void beforeAll(final ExtensionContext context) throws Exception { - if (redisClientResources == null) { + if (composeContainer == null) { final URL clusterComposeFileUrl = getClass().getResource("redis-cluster.yml"); if (clusterComposeFileUrl == null) { @@ -115,52 +113,66 @@ public class RedisClusterExtension implements BeforeAllCallback, BeforeEachCallb composeContainer.getServicePort("redis-0-1", REDIS_PORT))) .toList(); - redisClientResources = ClientResources.builder() - .socketAddressResolver(getSocketAddressResolver()) - .build(); - // Wait for the cluster to be fully up; just having the containers running isn't enough since they still need to do // some post-launch cluster setup work. boolean allNodesUp; final Instant deadline = Instant.now().plus(CLUSTER_UP_DEADLINE); - do { - allNodesUp = redisUris.stream() - .allMatch(redisUri -> { - try (final RedisClient redisClient = RedisClient.create(redisClientResources, redisUri)) { - final String clusterInfo = redisClient.connect().sync().clusterInfo(); - return clusterInfo.contains("cluster_state:ok") && clusterInfo.contains("cluster_slots_ok:16384"); - } catch (final Exception e) { - return false; - } - }); + final ClientResources clientResources = ClientResources.builder() + .socketAddressResolver(getSocketAddressResolver()) + .build(); - if (Instant.now().isAfter(deadline)) { - throw new RuntimeException("Cluster did not start before deadline"); - } + try { + do { + allNodesUp = redisUris.stream() + .allMatch(redisUri -> { + try (final RedisClient redisClient = RedisClient.create(clientResources, redisUri)) { + final String clusterInfo = redisClient.connect().sync().clusterInfo(); + return clusterInfo.contains("cluster_state:ok") && clusterInfo.contains("cluster_slots_ok:16384"); + } catch (final Exception e) { + return false; + } + }); - if (!allNodesUp) { - Thread.sleep(100); - } - } while (!allNodesUp); + if (Instant.now().isAfter(deadline)) { + throw new RuntimeException("Cluster did not start before deadline"); + } - final CircuitBreakerConfiguration circuitBreakerConfig = new CircuitBreakerConfiguration(); - circuitBreakerConfig.setWaitDurationInOpenState(Duration.ofMillis(500)); - - redisClusterClient = new FaultTolerantRedisClusterClient("test-cluster", - redisClientResources.mutate(), - getRedisURIs(), - timeout, - circuitBreakerConfig, - retryConfiguration); + if (!allNodesUp) { + Thread.sleep(100); + } + } while (!allNodesUp); + } finally { + clientResources.shutdown().await(); + } } } @Override public void beforeEach(final ExtensionContext context) throws Exception { + redisClientResources = ClientResources.builder() + .socketAddressResolver(getSocketAddressResolver()) + .build(); + + final CircuitBreakerConfiguration circuitBreakerConfig = new CircuitBreakerConfiguration(); + circuitBreakerConfig.setWaitDurationInOpenState(Duration.ofMillis(500)); + + redisClusterClient = new FaultTolerantRedisClusterClient("test-cluster", + redisClientResources.mutate(), + getRedisURIs(), + timeout, + circuitBreakerConfig, + retryConfiguration); + redisClusterClient.useCluster(connection -> connection.sync().flushall(FlushMode.SYNC)); } + @Override + public void afterEach(final ExtensionContext context) throws InterruptedException { + redisClusterClient.shutdown(); + redisClientResources.shutdown().await(); + } + public static List getRedisURIs() { return redisUris; } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisServerExtension.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisServerExtension.java index ae05029b0..1b46ee674 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisServerExtension.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/RedisServerExtension.java @@ -10,6 +10,7 @@ import io.lettuce.core.FlushMode; import io.lettuce.core.RedisURI; import io.lettuce.core.resource.ClientResources; import java.time.Duration; +import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.BeforeAllCallback; import org.junit.jupiter.api.extension.BeforeEachCallback; import org.junit.jupiter.api.extension.ExtensionContext; @@ -17,11 +18,11 @@ import org.testcontainers.utility.DockerImageName; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; -public class RedisServerExtension implements BeforeAllCallback, BeforeEachCallback, ExtensionContext.Store.CloseableResource { +public class RedisServerExtension implements BeforeAllCallback, BeforeEachCallback, AfterEachCallback, ExtensionContext.Store.CloseableResource { private static RedisContainer redisContainer; - private static ClientResources redisClientResources; + private ClientResources redisClientResources; private FaultTolerantRedisClient faultTolerantRedisClient; // redis:7.4-apline; see https://hub.docker.com/layers/library/redis/7.4-alpine/images/sha256-e1b05db81cda983ede3bbb3e834e7ebec8faafa275f55f7f91f3ee84114f98a7 @@ -46,8 +47,6 @@ public class RedisServerExtension implements BeforeAllCallback, BeforeEachCallba if (redisContainer == null) { redisContainer = new RedisContainer(REDIS_IMAGE); redisContainer.start(); - - redisClientResources = ClientResources.builder().build(); } } @@ -59,6 +58,9 @@ public class RedisServerExtension implements BeforeAllCallback, BeforeEachCallba public void beforeEach(final ExtensionContext context) { final CircuitBreakerConfiguration circuitBreakerConfig = new CircuitBreakerConfiguration(); circuitBreakerConfig.setWaitDurationInOpenState(Duration.ofMillis(500)); + + redisClientResources = ClientResources.builder().build(); + faultTolerantRedisClient = new FaultTolerantRedisClient("test-redis-client", redisClientResources.mutate(), getRedisURI(), @@ -69,15 +71,18 @@ public class RedisServerExtension implements BeforeAllCallback, BeforeEachCallba faultTolerantRedisClient.useConnection(connection -> connection.sync().flushall(FlushMode.SYNC)); } + @Override + public void afterEach(final ExtensionContext context) throws InterruptedException { + faultTolerantRedisClient.shutdown(); + redisClientResources.shutdown().await(); + } + @Override public void close() throws Throwable { if (redisContainer != null) { - redisClientResources.shutdown().await(); redisContainer.stop(); + redisContainer = null; } - - redisClientResources = null; - redisContainer = null; } public FaultTolerantRedisClient getRedisClient() {