diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/http/FaultTolerantHttpClient.java b/service/src/main/java/org/whispersystems/textsecuregcm/http/FaultTolerantHttpClient.java index 7a99b1a15..6beb85516 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/http/FaultTolerantHttpClient.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/http/FaultTolerantHttpClient.java @@ -60,8 +60,6 @@ public class FaultTolerantHttpClient { this.retryExecutor = retryExecutor; this.retry = retry; this.breaker = circuitBreaker; - - CircuitBreakerUtil.registerMetrics(breaker, FaultTolerantHttpClient.class, Tags.empty()); } private HttpClient httpClient() { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java index 5e858a85d..520c88897 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/FaultTolerantRedisClusterClient.java @@ -82,8 +82,9 @@ public class FaultTolerantRedisClusterClient { redisUri.setLibraryVersion(null); }); - final LettuceShardCircuitBreaker lettuceShardCircuitBreaker = new LettuceShardCircuitBreaker(name, - circuitBreakerConfigurationName, Schedulers.newSingle("topology-changed-" + name, true)); + final LettuceShardCircuitBreaker lettuceShardCircuitBreaker = + new LettuceShardCircuitBreaker(name, circuitBreakerConfigurationName); + this.clusterClient = RedisClusterClient.create( clientResourcesBuilder.nettyCustomizer(lettuceShardCircuitBreaker). build(), @@ -101,8 +102,6 @@ public class FaultTolerantRedisClusterClient { .publishOnScheduler(true) .build()); - lettuceShardCircuitBreaker.setEventBus(clusterClient.getResources().eventBus()); - this.stringConnection = clusterClient.connect(); this.binaryConnection = clusterClient.connect(ByteArrayCodec.INSTANCE); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/redis/LettuceShardCircuitBreaker.java b/service/src/main/java/org/whispersystems/textsecuregcm/redis/LettuceShardCircuitBreaker.java index b340ed59e..1c296550f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/redis/LettuceShardCircuitBreaker.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/redis/LettuceShardCircuitBreaker.java @@ -10,13 +10,10 @@ import io.github.resilience4j.circuitbreaker.CallNotPermittedException; import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.lettuce.core.RedisNoScriptException; import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent; -import io.lettuce.core.cluster.models.partitions.RedisClusterNode; -import io.lettuce.core.event.EventBus; import io.lettuce.core.protocol.CommandHandler; import io.lettuce.core.protocol.CompleteableCommand; import io.lettuce.core.protocol.RedisCommand; import io.lettuce.core.resource.NettyCustomizer; -import io.micrometer.core.instrument.Tags; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; @@ -24,18 +21,13 @@ import io.netty.channel.ChannelPromise; import java.net.SocketAddress; import java.util.Collection; import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; import java.util.stream.StreamSupport; import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil; -import reactor.core.scheduler.Scheduler; /** * Adds a circuit breaker to every Netty {@link Channel} that gets created, so that a single unhealthy shard does not @@ -56,64 +48,16 @@ public class LettuceShardCircuitBreaker implements NettyCustomizer { private final String clusterName; @Nullable private final String circuitBreakerConfigurationName; - private final Scheduler scheduler; - // this set will be shared with all child channel breakers - private final Set upstreamAddresses = ConcurrentHashMap.newKeySet(); - // The EventBus is not available at construction time, because it is one of the client - // resources, which cannot be built without this NettyCustomizer - private EventBus eventBus; - - public LettuceShardCircuitBreaker(final String clusterName, - @Nullable final String circuitBreakerConfigurationName, - final Scheduler scheduler) { + public LettuceShardCircuitBreaker(final String clusterName, @Nullable final String circuitBreakerConfigurationName) { this.clusterName = clusterName; this.circuitBreakerConfigurationName = circuitBreakerConfigurationName; - this.scheduler = scheduler; - } - - private static String toShardAddress(final RedisClusterNode redisClusterNode) { - return "%s:%s".formatted(redisClusterNode.getUri().getHost(), redisClusterNode.getUri().getPort()); - } - - void setEventBus(final EventBus eventBus) { - this.eventBus = eventBus; - - eventBus.get() - .filter(e -> e instanceof ClusterTopologyChangedEvent) - .map(e -> (ClusterTopologyChangedEvent) e) - .subscribeOn(scheduler) - .subscribe(event -> { - - final Set currentUpstreams = event.after().stream() - .filter(node -> node.getRole().isUpstream()) - .map(LettuceShardCircuitBreaker::toShardAddress) - .collect(Collectors.toSet()); - - final Set previousUpstreams = event.before().stream() - .filter(node -> node.getRole().isUpstream()) - .map(LettuceShardCircuitBreaker::toShardAddress) - .collect(Collectors.toSet()); - if (previousUpstreams.removeAll(currentUpstreams)) { - logger.info("No longer upstream in cluster {}: {}", clusterName, StringUtils.join(previousUpstreams, ", ")); - } - - // Channels may be created at any time, not just immediately after the cluster client connect()s or when topology - // changes, so we maintain a set that can be queried by channel handlers during their connect() method. - upstreamAddresses.addAll(currentUpstreams); - upstreamAddresses.removeAll(previousUpstreams); - }); } @Override public void afterChannelInitialized(final Channel channel) { - - if (eventBus == null) { - throw new IllegalStateException("Event bus must be set before channel customization can occur"); - } - - final ChannelCircuitBreakerHandler channelCircuitBreakerHandler = new ChannelCircuitBreakerHandler(clusterName, - circuitBreakerConfigurationName, upstreamAddresses, eventBus, scheduler); + final ChannelCircuitBreakerHandler channelCircuitBreakerHandler = + new ChannelCircuitBreakerHandler(clusterName, circuitBreakerConfigurationName); final String commandHandlerName = StreamSupport.stream(channel.pipeline().spliterator(), false) .filter(entry -> entry.getValue() instanceof CommandHandler) @@ -128,52 +72,19 @@ public class LettuceShardCircuitBreaker implements NettyCustomizer { private static final Logger logger = LoggerFactory.getLogger(ChannelCircuitBreakerHandler.class); private static final String CLUSTER_TAG_NAME = "cluster"; + private static final String SHARD_ADDRESS_TAG_NAME = "shard"; private final String clusterName; @Nullable private final String circuitBreakerConfigurationName; - private final AtomicBoolean registeredMetrics = new AtomicBoolean(false); - private final Set upstreamAddresses; private String shardAddress; @VisibleForTesting CircuitBreaker breaker; - public ChannelCircuitBreakerHandler(final String name, - @Nullable final String circuitBreakerConfigurationName, - final Set upstreamAddresses, - final EventBus eventBus, final Scheduler scheduler) { + public ChannelCircuitBreakerHandler(final String name, @Nullable final String circuitBreakerConfigurationName) { this.clusterName = name; this.circuitBreakerConfigurationName = circuitBreakerConfigurationName; - this.upstreamAddresses = upstreamAddresses; - - eventBus.get() - .filter(e -> e instanceof ClusterTopologyChangedEvent) - .map(e -> (ClusterTopologyChangedEvent) e) - .subscribeOn(scheduler) - .subscribe(event -> { - if (shardAddress == null) { - logger.warn("Received a topology changed event without a shard address"); - return; - } - - final Set newUpstreams = event.after().stream().filter(node -> node.getRole().isUpstream()) - .map(LettuceShardCircuitBreaker::toShardAddress) - .collect(Collectors.toSet()); - - if (newUpstreams.contains(shardAddress)) { - registerMetrics(); - } - }); - } - - void registerMetrics() { - // Registering metrics is not idempotent--some counters are added as event listeners, - // and there would be duplicated calls to increment() - if (registeredMetrics.compareAndSet(false, true)) { - logger.info("Registered metrics for: {}/{}", clusterName, shardAddress); - CircuitBreakerUtil.registerMetrics(breaker, getClass(), Tags.of(CLUSTER_TAG_NAME, clusterName)); - } } @Override @@ -188,15 +99,14 @@ public class LettuceShardCircuitBreaker implements NettyCustomizer { // In some cases, like the default connection, the remote address includes the DNS hostname, which we want to exclude. shardAddress = StringUtils.substringAfter(remoteAddress.toString(), "/"); - final String circuitBreakerName = "%s/%s-breaker".formatted(clusterName, shardAddress); + final String circuitBreakerName = "%s/%s".formatted(clusterName, shardAddress); + final Map tags = Map.of( + CLUSTER_TAG_NAME, clusterName, + SHARD_ADDRESS_TAG_NAME, shardAddress); breaker = circuitBreakerConfigurationName != null - ? CircuitBreakerUtil.getCircuitBreakerRegistry().circuitBreaker(circuitBreakerName, circuitBreakerConfigurationName) - : CircuitBreakerUtil.getCircuitBreakerRegistry().circuitBreaker(circuitBreakerName); - - if (upstreamAddresses.contains(shardAddress)) { - registerMetrics(); - } + ? CircuitBreakerUtil.getCircuitBreakerRegistry().circuitBreaker(circuitBreakerName, circuitBreakerConfigurationName, tags) + : CircuitBreakerUtil.getCircuitBreakerRegistry().circuitBreaker(circuitBreakerName, tags); } @Override diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/util/CircuitBreakerUtil.java b/service/src/main/java/org/whispersystems/textsecuregcm/util/CircuitBreakerUtil.java index cfa995694..fa08c1c6d 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/util/CircuitBreakerUtil.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/util/CircuitBreakerUtil.java @@ -5,32 +5,19 @@ package org.whispersystems.textsecuregcm.util; -import static com.codahale.metrics.MetricRegistry.name; - -import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry; import io.github.resilience4j.micrometer.tagged.TaggedCircuitBreakerMetrics; import io.github.resilience4j.micrometer.tagged.TaggedRetryMetrics; import io.github.resilience4j.retry.Retry; import io.github.resilience4j.retry.RetryRegistry; import io.lettuce.core.RedisCommandTimeoutException; -import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Metrics; -import io.micrometer.core.instrument.Tag; -import io.micrometer.core.instrument.Tags; import org.apache.commons.lang3.RandomStringUtils; import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; import org.whispersystems.textsecuregcm.configuration.RetryConfiguration; public class CircuitBreakerUtil { - private static final String CIRCUIT_BREAKER_CALL_COUNTER_NAME = name(CircuitBreakerUtil.class, "breaker", "call"); - private static final String CIRCUIT_BREAKER_STATE_GAUGE_NAME = name(CircuitBreakerUtil.class, "breaker", "state"); - private static final String RETRY_CALL_COUNTER_NAME = name(CircuitBreakerUtil.class, "retry", "call"); - - private static final String BREAKER_NAME_TAG_NAME = "breakerName"; - private static final String OUTCOME_TAG_NAME = "outcome"; - private static final CircuitBreakerRegistry CIRCUIT_BREAKER_REGISTRY = CircuitBreakerRegistry.of(new CircuitBreakerConfiguration().toCircuitBreakerConfig()); @@ -72,79 +59,4 @@ public class CircuitBreakerUtil { public static Retry getGeneralRedisRetry(final String name) { return RETRY_REGISTRY.retry(name, GENERAL_REDIS_CONFIGURATION_NAME); } - - /// @deprecated [CircuitBreakerRegistry] maintains its own set of metrics, and manually managing them is unnecessary - @Deprecated(forRemoval = true) - public static void registerMetrics(CircuitBreaker circuitBreaker, Class clazz, Tags additionalTags) { - final String breakerName = clazz.getSimpleName() + "/" + circuitBreaker.getName(); - - final Counter successCounter = Metrics.counter(CIRCUIT_BREAKER_CALL_COUNTER_NAME, - additionalTags.and( - BREAKER_NAME_TAG_NAME, breakerName, - OUTCOME_TAG_NAME, "success")); - - final Counter failureCounter = Metrics.counter(CIRCUIT_BREAKER_CALL_COUNTER_NAME, - additionalTags.and( - BREAKER_NAME_TAG_NAME, breakerName, - OUTCOME_TAG_NAME, "failure")); - - final Counter unpermittedCounter = Metrics.counter(CIRCUIT_BREAKER_CALL_COUNTER_NAME, - additionalTags.and(BREAKER_NAME_TAG_NAME, breakerName, - OUTCOME_TAG_NAME, "unpermitted")); - - circuitBreaker.getEventPublisher().onSuccess(event -> { - successCounter.increment(); - }); - - circuitBreaker.getEventPublisher().onError(event -> { - failureCounter.increment(); - }); - - circuitBreaker.getEventPublisher().onCallNotPermitted(event -> { - unpermittedCounter.increment(); - }); - - Metrics.gauge(CIRCUIT_BREAKER_STATE_GAUGE_NAME, - Tags.of(Tag.of(BREAKER_NAME_TAG_NAME, circuitBreaker.getName())), - circuitBreaker, breaker -> breaker.getState().getOrder()); - } - - /// @deprecated [RetryRegistry] maintains its own set of metrics, and manually managing them is unnecessary - @Deprecated(forRemoval = true) - public static void registerMetrics(Retry retry, Class clazz) { - final String retryName = clazz.getSimpleName() + "/" + retry.getName(); - - final Counter successCounter = Metrics.counter(RETRY_CALL_COUNTER_NAME, - BREAKER_NAME_TAG_NAME, retryName, - OUTCOME_TAG_NAME, "success"); - - final Counter retryCounter = Metrics.counter(RETRY_CALL_COUNTER_NAME, - BREAKER_NAME_TAG_NAME, retryName, - OUTCOME_TAG_NAME, "retry"); - - final Counter errorCounter = Metrics.counter(RETRY_CALL_COUNTER_NAME, - BREAKER_NAME_TAG_NAME, retryName, - OUTCOME_TAG_NAME, "error"); - - final Counter ignoredErrorCounter = Metrics.counter(RETRY_CALL_COUNTER_NAME, - BREAKER_NAME_TAG_NAME, retryName, - OUTCOME_TAG_NAME, "ignored_error"); - - retry.getEventPublisher().onSuccess(event -> { - successCounter.increment(); - }); - - retry.getEventPublisher().onRetry(event -> { - retryCounter.increment(); - }); - - retry.getEventPublisher().onError(event -> { - errorCounter.increment(); - }); - - retry.getEventPublisher().onIgnoredError(event -> { - ignoredErrorCounter.increment(); - }); - } - } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/redis/LettuceShardCircuitBreakerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/redis/LettuceShardCircuitBreakerTest.java index 8a1a38309..e0264940d 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/redis/LettuceShardCircuitBreakerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/redis/LettuceShardCircuitBreakerTest.java @@ -13,13 +13,11 @@ import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; -import static org.mockito.Mockito.when; import io.github.resilience4j.circuitbreaker.CallNotPermittedException; import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.lettuce.core.ClientOptions; import io.lettuce.core.codec.StringCodec; -import io.lettuce.core.event.EventBus; import io.lettuce.core.output.StatusOutput; import io.lettuce.core.protocol.AsyncCommand; import io.lettuce.core.protocol.Command; @@ -33,7 +31,6 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.embedded.EmbeddedChannel; import java.io.IOException; import java.net.SocketAddress; -import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -42,29 +39,21 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import reactor.core.publisher.Flux; -import reactor.core.scheduler.Schedulers; class LettuceShardCircuitBreakerTest { private LettuceShardCircuitBreaker.ChannelCircuitBreakerHandler channelCircuitBreakerHandler; - private EventBus eventBus; @BeforeEach void setUp() { - eventBus = mock(EventBus.class); - when(eventBus.get()).thenReturn(Flux.never()); - channelCircuitBreakerHandler = new LettuceShardCircuitBreaker.ChannelCircuitBreakerHandler( - "test", null, Collections.emptySet(), eventBus, Schedulers.immediate()); + channelCircuitBreakerHandler = new LettuceShardCircuitBreaker.ChannelCircuitBreakerHandler("test", null); } @Test void testAfterChannelInitialized() { final LettuceShardCircuitBreaker lettuceShardCircuitBreaker = - new LettuceShardCircuitBreaker("test", null, Schedulers.immediate()); - - lettuceShardCircuitBreaker.setEventBus(eventBus); + new LettuceShardCircuitBreaker("test", null); final Channel channel = new EmbeddedChannel( new CommandHandler(ClientOptions.create(), ClientResources.create(), mock(Endpoint.class)));