Remove legacy circuit breaker/retry metrics

This commit is contained in:
Jon Chambers
2025-08-27 13:07:43 -04:00
committed by Jon Chambers
parent a747afb487
commit 807e03ca2b
5 changed files with 16 additions and 208 deletions

View File

@@ -60,8 +60,6 @@ public class FaultTolerantHttpClient {
this.retryExecutor = retryExecutor;
this.retry = retry;
this.breaker = circuitBreaker;
CircuitBreakerUtil.registerMetrics(breaker, FaultTolerantHttpClient.class, Tags.empty());
}
private HttpClient httpClient() {

View File

@@ -82,8 +82,9 @@ public class FaultTolerantRedisClusterClient {
redisUri.setLibraryVersion(null);
});
final LettuceShardCircuitBreaker lettuceShardCircuitBreaker = new LettuceShardCircuitBreaker(name,
circuitBreakerConfigurationName, Schedulers.newSingle("topology-changed-" + name, true));
final LettuceShardCircuitBreaker lettuceShardCircuitBreaker =
new LettuceShardCircuitBreaker(name, circuitBreakerConfigurationName);
this.clusterClient = RedisClusterClient.create(
clientResourcesBuilder.nettyCustomizer(lettuceShardCircuitBreaker).
build(),
@@ -101,8 +102,6 @@ public class FaultTolerantRedisClusterClient {
.publishOnScheduler(true)
.build());
lettuceShardCircuitBreaker.setEventBus(clusterClient.getResources().eventBus());
this.stringConnection = clusterClient.connect();
this.binaryConnection = clusterClient.connect(ByteArrayCodec.INSTANCE);

View File

@@ -10,13 +10,10 @@ import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.lettuce.core.RedisNoScriptException;
import io.lettuce.core.cluster.event.ClusterTopologyChangedEvent;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.event.EventBus;
import io.lettuce.core.protocol.CommandHandler;
import io.lettuce.core.protocol.CompleteableCommand;
import io.lettuce.core.protocol.RedisCommand;
import io.lettuce.core.resource.NettyCustomizer;
import io.micrometer.core.instrument.Tags;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
@@ -24,18 +21,13 @@ import io.netty.channel.ChannelPromise;
import java.net.SocketAddress;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.util.CircuitBreakerUtil;
import reactor.core.scheduler.Scheduler;
/**
* Adds a circuit breaker to every Netty {@link Channel} that gets created, so that a single unhealthy shard does not
@@ -56,64 +48,16 @@ public class LettuceShardCircuitBreaker implements NettyCustomizer {
private final String clusterName;
@Nullable
private final String circuitBreakerConfigurationName;
private final Scheduler scheduler;
// this set will be shared with all child channel breakers
private final Set<String> upstreamAddresses = ConcurrentHashMap.newKeySet();
// The EventBus is not available at construction time, because it is one of the client
// resources, which cannot be built without this NettyCustomizer
private EventBus eventBus;
public LettuceShardCircuitBreaker(final String clusterName,
@Nullable final String circuitBreakerConfigurationName,
final Scheduler scheduler) {
public LettuceShardCircuitBreaker(final String clusterName, @Nullable final String circuitBreakerConfigurationName) {
this.clusterName = clusterName;
this.circuitBreakerConfigurationName = circuitBreakerConfigurationName;
this.scheduler = scheduler;
}
private static String toShardAddress(final RedisClusterNode redisClusterNode) {
return "%s:%s".formatted(redisClusterNode.getUri().getHost(), redisClusterNode.getUri().getPort());
}
void setEventBus(final EventBus eventBus) {
this.eventBus = eventBus;
eventBus.get()
.filter(e -> e instanceof ClusterTopologyChangedEvent)
.map(e -> (ClusterTopologyChangedEvent) e)
.subscribeOn(scheduler)
.subscribe(event -> {
final Set<String> currentUpstreams = event.after().stream()
.filter(node -> node.getRole().isUpstream())
.map(LettuceShardCircuitBreaker::toShardAddress)
.collect(Collectors.toSet());
final Set<String> previousUpstreams = event.before().stream()
.filter(node -> node.getRole().isUpstream())
.map(LettuceShardCircuitBreaker::toShardAddress)
.collect(Collectors.toSet());
if (previousUpstreams.removeAll(currentUpstreams)) {
logger.info("No longer upstream in cluster {}: {}", clusterName, StringUtils.join(previousUpstreams, ", "));
}
// Channels may be created at any time, not just immediately after the cluster client connect()s or when topology
// changes, so we maintain a set that can be queried by channel handlers during their connect() method.
upstreamAddresses.addAll(currentUpstreams);
upstreamAddresses.removeAll(previousUpstreams);
});
}
@Override
public void afterChannelInitialized(final Channel channel) {
if (eventBus == null) {
throw new IllegalStateException("Event bus must be set before channel customization can occur");
}
final ChannelCircuitBreakerHandler channelCircuitBreakerHandler = new ChannelCircuitBreakerHandler(clusterName,
circuitBreakerConfigurationName, upstreamAddresses, eventBus, scheduler);
final ChannelCircuitBreakerHandler channelCircuitBreakerHandler =
new ChannelCircuitBreakerHandler(clusterName, circuitBreakerConfigurationName);
final String commandHandlerName = StreamSupport.stream(channel.pipeline().spliterator(), false)
.filter(entry -> entry.getValue() instanceof CommandHandler)
@@ -128,52 +72,19 @@ public class LettuceShardCircuitBreaker implements NettyCustomizer {
private static final Logger logger = LoggerFactory.getLogger(ChannelCircuitBreakerHandler.class);
private static final String CLUSTER_TAG_NAME = "cluster";
private static final String SHARD_ADDRESS_TAG_NAME = "shard";
private final String clusterName;
@Nullable private final String circuitBreakerConfigurationName;
private final AtomicBoolean registeredMetrics = new AtomicBoolean(false);
private final Set<String> upstreamAddresses;
private String shardAddress;
@VisibleForTesting
CircuitBreaker breaker;
public ChannelCircuitBreakerHandler(final String name,
@Nullable final String circuitBreakerConfigurationName,
final Set<String> upstreamAddresses,
final EventBus eventBus, final Scheduler scheduler) {
public ChannelCircuitBreakerHandler(final String name, @Nullable final String circuitBreakerConfigurationName) {
this.clusterName = name;
this.circuitBreakerConfigurationName = circuitBreakerConfigurationName;
this.upstreamAddresses = upstreamAddresses;
eventBus.get()
.filter(e -> e instanceof ClusterTopologyChangedEvent)
.map(e -> (ClusterTopologyChangedEvent) e)
.subscribeOn(scheduler)
.subscribe(event -> {
if (shardAddress == null) {
logger.warn("Received a topology changed event without a shard address");
return;
}
final Set<String> newUpstreams = event.after().stream().filter(node -> node.getRole().isUpstream())
.map(LettuceShardCircuitBreaker::toShardAddress)
.collect(Collectors.toSet());
if (newUpstreams.contains(shardAddress)) {
registerMetrics();
}
});
}
void registerMetrics() {
// Registering metrics is not idempotent--some counters are added as event listeners,
// and there would be duplicated calls to increment()
if (registeredMetrics.compareAndSet(false, true)) {
logger.info("Registered metrics for: {}/{}", clusterName, shardAddress);
CircuitBreakerUtil.registerMetrics(breaker, getClass(), Tags.of(CLUSTER_TAG_NAME, clusterName));
}
}
@Override
@@ -188,15 +99,14 @@ public class LettuceShardCircuitBreaker implements NettyCustomizer {
// In some cases, like the default connection, the remote address includes the DNS hostname, which we want to exclude.
shardAddress = StringUtils.substringAfter(remoteAddress.toString(), "/");
final String circuitBreakerName = "%s/%s-breaker".formatted(clusterName, shardAddress);
final String circuitBreakerName = "%s/%s".formatted(clusterName, shardAddress);
final Map<String, String> tags = Map.of(
CLUSTER_TAG_NAME, clusterName,
SHARD_ADDRESS_TAG_NAME, shardAddress);
breaker = circuitBreakerConfigurationName != null
? CircuitBreakerUtil.getCircuitBreakerRegistry().circuitBreaker(circuitBreakerName, circuitBreakerConfigurationName)
: CircuitBreakerUtil.getCircuitBreakerRegistry().circuitBreaker(circuitBreakerName);
if (upstreamAddresses.contains(shardAddress)) {
registerMetrics();
}
? CircuitBreakerUtil.getCircuitBreakerRegistry().circuitBreaker(circuitBreakerName, circuitBreakerConfigurationName, tags)
: CircuitBreakerUtil.getCircuitBreakerRegistry().circuitBreaker(circuitBreakerName, tags);
}
@Override

View File

@@ -5,32 +5,19 @@
package org.whispersystems.textsecuregcm.util;
import static com.codahale.metrics.MetricRegistry.name;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import io.github.resilience4j.micrometer.tagged.TaggedCircuitBreakerMetrics;
import io.github.resilience4j.micrometer.tagged.TaggedRetryMetrics;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryRegistry;
import io.lettuce.core.RedisCommandTimeoutException;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import org.apache.commons.lang3.RandomStringUtils;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
public class CircuitBreakerUtil {
private static final String CIRCUIT_BREAKER_CALL_COUNTER_NAME = name(CircuitBreakerUtil.class, "breaker", "call");
private static final String CIRCUIT_BREAKER_STATE_GAUGE_NAME = name(CircuitBreakerUtil.class, "breaker", "state");
private static final String RETRY_CALL_COUNTER_NAME = name(CircuitBreakerUtil.class, "retry", "call");
private static final String BREAKER_NAME_TAG_NAME = "breakerName";
private static final String OUTCOME_TAG_NAME = "outcome";
private static final CircuitBreakerRegistry CIRCUIT_BREAKER_REGISTRY =
CircuitBreakerRegistry.of(new CircuitBreakerConfiguration().toCircuitBreakerConfig());
@@ -72,79 +59,4 @@ public class CircuitBreakerUtil {
public static Retry getGeneralRedisRetry(final String name) {
return RETRY_REGISTRY.retry(name, GENERAL_REDIS_CONFIGURATION_NAME);
}
/// @deprecated [CircuitBreakerRegistry] maintains its own set of metrics, and manually managing them is unnecessary
@Deprecated(forRemoval = true)
public static void registerMetrics(CircuitBreaker circuitBreaker, Class<?> clazz, Tags additionalTags) {
final String breakerName = clazz.getSimpleName() + "/" + circuitBreaker.getName();
final Counter successCounter = Metrics.counter(CIRCUIT_BREAKER_CALL_COUNTER_NAME,
additionalTags.and(
BREAKER_NAME_TAG_NAME, breakerName,
OUTCOME_TAG_NAME, "success"));
final Counter failureCounter = Metrics.counter(CIRCUIT_BREAKER_CALL_COUNTER_NAME,
additionalTags.and(
BREAKER_NAME_TAG_NAME, breakerName,
OUTCOME_TAG_NAME, "failure"));
final Counter unpermittedCounter = Metrics.counter(CIRCUIT_BREAKER_CALL_COUNTER_NAME,
additionalTags.and(BREAKER_NAME_TAG_NAME, breakerName,
OUTCOME_TAG_NAME, "unpermitted"));
circuitBreaker.getEventPublisher().onSuccess(event -> {
successCounter.increment();
});
circuitBreaker.getEventPublisher().onError(event -> {
failureCounter.increment();
});
circuitBreaker.getEventPublisher().onCallNotPermitted(event -> {
unpermittedCounter.increment();
});
Metrics.gauge(CIRCUIT_BREAKER_STATE_GAUGE_NAME,
Tags.of(Tag.of(BREAKER_NAME_TAG_NAME, circuitBreaker.getName())),
circuitBreaker, breaker -> breaker.getState().getOrder());
}
/// @deprecated [RetryRegistry] maintains its own set of metrics, and manually managing them is unnecessary
@Deprecated(forRemoval = true)
public static void registerMetrics(Retry retry, Class<?> clazz) {
final String retryName = clazz.getSimpleName() + "/" + retry.getName();
final Counter successCounter = Metrics.counter(RETRY_CALL_COUNTER_NAME,
BREAKER_NAME_TAG_NAME, retryName,
OUTCOME_TAG_NAME, "success");
final Counter retryCounter = Metrics.counter(RETRY_CALL_COUNTER_NAME,
BREAKER_NAME_TAG_NAME, retryName,
OUTCOME_TAG_NAME, "retry");
final Counter errorCounter = Metrics.counter(RETRY_CALL_COUNTER_NAME,
BREAKER_NAME_TAG_NAME, retryName,
OUTCOME_TAG_NAME, "error");
final Counter ignoredErrorCounter = Metrics.counter(RETRY_CALL_COUNTER_NAME,
BREAKER_NAME_TAG_NAME, retryName,
OUTCOME_TAG_NAME, "ignored_error");
retry.getEventPublisher().onSuccess(event -> {
successCounter.increment();
});
retry.getEventPublisher().onRetry(event -> {
retryCounter.increment();
});
retry.getEventPublisher().onError(event -> {
errorCounter.increment();
});
retry.getEventPublisher().onIgnoredError(event -> {
ignoredErrorCounter.increment();
});
}
}

View File

@@ -13,13 +13,11 @@ import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.event.EventBus;
import io.lettuce.core.output.StatusOutput;
import io.lettuce.core.protocol.AsyncCommand;
import io.lettuce.core.protocol.Command;
@@ -33,7 +31,6 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -42,29 +39,21 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
class LettuceShardCircuitBreakerTest {
private LettuceShardCircuitBreaker.ChannelCircuitBreakerHandler channelCircuitBreakerHandler;
private EventBus eventBus;
@BeforeEach
void setUp() {
eventBus = mock(EventBus.class);
when(eventBus.get()).thenReturn(Flux.never());
channelCircuitBreakerHandler = new LettuceShardCircuitBreaker.ChannelCircuitBreakerHandler(
"test", null, Collections.emptySet(), eventBus, Schedulers.immediate());
channelCircuitBreakerHandler = new LettuceShardCircuitBreaker.ChannelCircuitBreakerHandler("test", null);
}
@Test
void testAfterChannelInitialized() {
final LettuceShardCircuitBreaker lettuceShardCircuitBreaker =
new LettuceShardCircuitBreaker("test", null, Schedulers.immediate());
lettuceShardCircuitBreaker.setEventBus(eventBus);
new LettuceShardCircuitBreaker("test", null);
final Channel channel = new EmbeddedChannel(
new CommandHandler(ClientOptions.create(), ClientResources.create(), mock(Endpoint.class)));