Use native exponential histograms

This commit is contained in:
Jonathan Klabunde Tomer
2025-08-21 14:53:21 -07:00
committed by GitHub
parent be8b44d645
commit 78a7112675
21 changed files with 41 additions and 169 deletions

View File

@@ -980,7 +980,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
final MetricsHttpChannelListener metricsHttpChannelListener = new MetricsHttpChannelListener(clientReleaseManager,
Set.of(websocketServletPath, provisioningWebsocketServletPath, "/health-check"));
metricsHttpChannelListener.configure(environment);
final MessageMetrics messageMetrics = new MessageMetrics(config.getDynamoDbTables().getMessages().getExpiration());
final MessageMetrics messageMetrics = new MessageMetrics();
final BackupMetrics backupMetrics = new BackupMetrics();
// BufferingInterceptor is needed on the base environment but not the WebSocketEnvironment,

View File

@@ -8,6 +8,7 @@ package org.whispersystems.textsecuregcm.configuration;
import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.micrometer.registry.otlp.HistogramFlavor;
import io.micrometer.registry.otlp.OtlpConfig;
import java.time.Duration;
import java.util.Map;
@@ -33,6 +34,11 @@ public record OpenTelemetryConfiguration(
return maxBucketsPerMeter;
}
@Override
public HistogramFlavor histogramFlavor() {
return HistogramFlavor.BASE2_EXPONENTIAL_BUCKET_HISTOGRAM;
}
public Duration shutdownWaitDuration() {
if (shutdownWaitDuration == null) {
return step().plus(step().dividedBy(2));

View File

@@ -394,8 +394,6 @@ public class DeviceController {
if (response != null && response.getStatus() == Response.Status.OK.getStatusCode()) {
accountAndSample.second().stop(Timer.builder(WAIT_FOR_LINKED_DEVICE_TIMER_NAME)
.publishPercentileHistogram(true)
.minimumExpectedValue(Duration.ofMillis(100))
.maximumExpectedValue(Duration.ofMinutes(1))
.tags(Tags.of(UserAgentTagUtil.getPlatformTag(userAgent)))
.register(Metrics.globalRegistry));
}
@@ -616,8 +614,6 @@ public class DeviceController {
if (response != null && response.getStatus() == Response.Status.OK.getStatusCode()) {
accountAndSample.second().stop(Timer.builder(WAIT_FOR_TRANSFER_ARCHIVE_TIMER_NAME)
.publishPercentileHistogram(true)
.minimumExpectedValue(Duration.ofMillis(250))
.maximumExpectedValue(Duration.ofMinutes(5))
.tags(Tags.of(
UserAgentTagUtil.getPlatformTag(userAgent),
primaryPlatformTag(accountAndSample.first())))

View File

@@ -61,8 +61,6 @@ public class KeepAliveController {
Timer.builder(CLOSED_CONNECTION_AGE_DISTRIBUTION_NAME)
.tags(Tags.of(UserAgentTagUtil.getPlatformTag(context.getClient().getUserAgent())))
.publishPercentileHistogram(true)
.minimumExpectedValue(Duration.ofMillis(50))
.maximumExpectedValue(Duration.ofMinutes(2))
.register(Metrics.globalRegistry)
.record(age);
}

View File

@@ -152,15 +152,11 @@ public class MessageController {
INDIVIDUAL_MESSAGE_LATENCY_TIMER = Timer.builder(timerName)
.tags(multiRecipientTagName, "false")
.publishPercentileHistogram(true)
.minimumExpectedValue(Duration.ofMillis(1))
.minimumExpectedValue(Duration.ofSeconds(10))
.register(Metrics.globalRegistry);
MULTI_RECIPIENT_MESSAGE_LATENCY_TIMER = Timer.builder(timerName)
.tags(multiRecipientTagName, "true")
.publishPercentileHistogram(true)
.minimumExpectedValue(Duration.ofMillis(1))
.minimumExpectedValue(Duration.ofSeconds(10))
.register(Metrics.globalRegistry);
}

View File

@@ -32,17 +32,15 @@ public final class MessageMetrics {
"mismatchedAccountEnvelopeUuid");
public static final String DELIVERY_LATENCY_TIMER_NAME = name(MessageMetrics.class, "deliveryLatency");
private final Duration messageTtl;
private final MeterRegistry metricRegistry;
@VisibleForTesting
MessageMetrics(final MeterRegistry metricRegistry, final Duration messageTtl) {
MessageMetrics(final MeterRegistry metricRegistry) {
this.metricRegistry = metricRegistry;
this.messageTtl = messageTtl;
}
public MessageMetrics(final Duration messageTtl) {
this(Metrics.globalRegistry, messageTtl);
public MessageMetrics() {
this(Metrics.globalRegistry);
}
public void measureAccountOutgoingMessageUuidMismatches(final Account account,
@@ -85,13 +83,10 @@ public final class MessageMetrics {
tags.add(Tag.of("isUrgent", String.valueOf(isUrgent)));
tags.add(Tag.of("isEphemeral", String.valueOf(isEphemeral)));
// This tag makes the metric extremely expensive on some platforms
// UserAgentTagUtil.getClientVersionTag(userAgent, clientReleaseManager).ifPresent(tags::add);
UserAgentTagUtil.getClientVersionTag(userAgent, clientReleaseManager).ifPresent(tags::add);
Timer.builder(DELIVERY_LATENCY_TIMER_NAME)
.publishPercentileHistogram(true)
.minimumExpectedValue(Duration.ofSeconds(1))
.maximumExpectedValue(messageTtl)
.tags(tags)
.register(metricRegistry)
.record(Duration.between(Instant.ofEpochMilli(serverTimestamp), Instant.now()));

View File

@@ -19,25 +19,17 @@ import io.micrometer.core.instrument.binder.system.FileDescriptorMetrics;
import io.micrometer.core.instrument.binder.system.ProcessorMetrics;
import io.micrometer.core.instrument.config.MeterFilter;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.registry.otlp.HistogramFlavor;
import io.micrometer.registry.otlp.OtlpMeterRegistry;
import io.micrometer.statsd.StatsdMeterRegistry;
import java.time.Duration;
import java.util.Optional;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.WhisperServerVersion;
import org.whispersystems.textsecuregcm.configuration.OpenTelemetryConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.util.Constants;
public class MetricsUtil {
private static final Logger log = LoggerFactory.getLogger(MetricsUtil.class);
public static final String PREFIX = "chat";
private static volatile boolean registeredMetrics = false;
@@ -92,7 +84,6 @@ public class MetricsUtil {
config.getOpenTelemetryConfiguration(), io.micrometer.core.instrument.Clock.SYSTEM);
configureMeterFilters(otlpMeterRegistry.config(), dynamicConfigurationManager);
configureHistogramFilters(otlpMeterRegistry.config(), config.getOpenTelemetryConfiguration());
Metrics.addRegistry(otlpMeterRegistry);
if (config.getOpenTelemetryConfiguration().shutdownWaitDuration().compareTo(shutdownWaitDuration) > 0) {
@@ -112,6 +103,7 @@ public class MetricsUtil {
static void configureMeterFilters(MeterRegistry.Config config,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) {
final DistributionStatisticConfig defaultDistributionStatisticConfig = DistributionStatisticConfig.builder()
.percentilesHistogram(true)
.percentiles(.75, .95, .99, .999)
.build();
@@ -120,7 +112,7 @@ public class MetricsUtil {
config.meterFilter(new MeterFilter() {
@Override
public DistributionStatisticConfig configure(final Meter.Id id, final DistributionStatisticConfig config) {
return Optional.ofNullable(config.isPercentileHistogram()).orElse(false) ? config : defaultDistributionStatisticConfig.merge(config);
return defaultDistributionStatisticConfig.merge(config);
}
})
// Remove high-cardinality `command` tags from Lettuce metrics and prepend "chat." to meter names
@@ -144,54 +136,6 @@ public class MetricsUtil {
&& id.getName().startsWith(awsSdkMetricNamePrefix)));
}
@VisibleForTesting
static void configureHistogramFilters(MeterRegistry.Config config, OpenTelemetryConfiguration openTelemetryConfig) {
if (openTelemetryConfig.histogramFlavor() != HistogramFlavor.EXPLICIT_BUCKET_HISTOGRAM) {
// This workaround for Micrometer's awful defaults is only required for explicit bucket histograms.
return;
}
config.meterFilter(new MeterFilter() {
@Override
public DistributionStatisticConfig configure(final Meter.Id id, final DistributionStatisticConfig config) {
if (config.isPercentileHistogram() == null || !config.isPercentileHistogram()) {
return config;
}
if (config.getMinimumExpectedValueAsDouble() == null || config.getMaximumExpectedValueAsDouble() == null) {
log.error("Distribution {} does not specify lower or upper bounds, not exporting histograms", id.getName());
return DistributionStatisticConfig.builder()
.percentilesHistogram(false)
.build()
.merge(config);
}
final double lowerBound = config.getMinimumExpectedValueAsDouble();
final double upperBound = config.getMaximumExpectedValueAsDouble();
final int numBuckets = Optional.ofNullable(openTelemetryConfig.maxBucketsPerMeter().get(id.getName()))
.orElse(openTelemetryConfig.maxBucketCount());
// Bucket i covers values from (buckets[i-1], buckets[i]] except the first one which covers (-inf, buckets[0]].
// A final bucket will automatically be added at positive infinity, so if we want numBuckets total buckets, we
// need numBuckets - 1 explicit ones; if we want those to have equal ratios between values, and want an explicit
// bucket for (-inf, lowerBound] as well as an implicit one for (upperBound, inf], the ratio between buckets will be
// r = (upperBound/lowerBound)^(1/(numBuckets - 2)), so that we have values at lowerBound * r^i
// for i in [0, numBuckets-2] i.e. numBuckets - 1 values, plus the one at infinity
final double scale = Math.pow(upperBound / lowerBound, 1.0 / (numBuckets - 2));
final double[] buckets = IntStream.range(0, numBuckets - 1).mapToDouble(i -> lowerBound * Math.pow(scale, i)).toArray();
// yes, percentilesHistogram(false)! Otherwise, Micrometer will add its own non-configurable buckets based on an
// inferior selection algorithm that produces 69(!) buckets for a range from 1ms to 30s and still yields ±25% relative error
return DistributionStatisticConfig.builder()
.percentilesHistogram(false)
.serviceLevelObjectives(buckets)
.build()
.merge(config);
}
});
}
public static void registerSystemResourceMetrics(final Environment environment) {
new ProcessorMetrics().bindTo(Metrics.globalRegistry);
new FileDescriptorMetrics().bindTo(Metrics.globalRegistry);

View File

@@ -6,8 +6,6 @@ import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.whispersystems.textsecuregcm.util.EnumMapUtil;
@@ -22,7 +20,6 @@ public class OpenWebSocketCounter {
private final String newConnectionCounterName;
private final String durationTimerName;
private final Duration longestExpectedConnectionDuration;
private final Tags tags;
@@ -31,21 +28,18 @@ public class OpenWebSocketCounter {
public OpenWebSocketCounter(final String openWebSocketGaugeName,
final String newConnectionCounterName,
final String durationTimerName,
final Duration longestExpectedConnectionDuration) {
final String durationTimerName) {
this(openWebSocketGaugeName, newConnectionCounterName, durationTimerName, longestExpectedConnectionDuration, Tags.empty());
this(openWebSocketGaugeName, newConnectionCounterName, durationTimerName, Tags.empty());
}
public OpenWebSocketCounter(final String openWebSocketGaugeName,
final String newConnectionCounterName,
final String durationTimerName,
final Duration longestExpectedConnectionDuration,
final Tags tags) {
this.newConnectionCounterName = newConnectionCounterName;
this.durationTimerName = durationTimerName;
this.longestExpectedConnectionDuration = longestExpectedConnectionDuration;
this.tags = tags;
@@ -92,8 +86,6 @@ public class OpenWebSocketCounter {
context.addWebsocketClosedListener((_, statusCode, _) -> {
sample.stop(Timer.builder(durationTimerName)
.publishPercentileHistogram(true)
.minimumExpectedValue(Duration.ofSeconds(1))
.maximumExpectedValue(longestExpectedConnectionDuration)
.tags(tagsWithClientPlatform)
.register(Metrics.globalRegistry));

View File

@@ -37,8 +37,6 @@ public class MultiRecipientMessageProvider implements MessageBodyReader<SealedSe
private static final DistributionSummary RECIPIENT_COUNT_DISTRIBUTION = DistributionSummary
.builder(name(MultiRecipientMessageProvider.class, "recipients"))
.publishPercentileHistogram(true)
.minimumExpectedValue(1.0)
.maximumExpectedValue((double) MAX_RECIPIENT_COUNT)
.register(Metrics.globalRegistry);
@Override

View File

@@ -313,23 +313,26 @@ public class MessageSender {
final boolean isStory,
final Tag platformTag) throws MessageTooLargeException {
final Tags tags = Tags.of(platformTag,
Tag.of("multiRecipientMessage", String.valueOf(isMultiRecipientMessage)),
Tag.of("syncMessage", String.valueOf(isSyncMessage)),
Tag.of("story", String.valueOf(isStory)));
final boolean oversize = contentLength > MAX_MESSAGE_SIZE;
if (contentLength > MAX_MESSAGE_SIZE) {
Metrics.counter(REJECT_OVERSIZE_MESSAGE_COUNTER_NAME, tags).increment();
throw new MessageTooLargeException();
} else {
DistributionSummary.builder(CONTENT_SIZE_DISTRIBUTION_NAME)
.tags(tags)
DistributionSummary.builder(CONTENT_SIZE_DISTRIBUTION_NAME)
.tags(Tags.of(platformTag,
Tag.of("oversize", String.valueOf(oversize)),
Tag.of("multiRecipientMessage", String.valueOf(isMultiRecipientMessage)),
Tag.of("syncMessage", String.valueOf(isSyncMessage)),
Tag.of("story", String.valueOf(isStory))))
.publishPercentileHistogram(true)
.minimumExpectedValue(256.0)
.maximumExpectedValue((double) MAX_MESSAGE_SIZE)
.register(Metrics.globalRegistry)
.record(contentLength);
if (oversize) {
Metrics.counter(REJECT_OVERSIZE_MESSAGE_COUNTER_NAME, Tags.of(platformTag,
Tag.of("multiRecipientMessage", String.valueOf(isMultiRecipientMessage)),
Tag.of("syncMessage", String.valueOf(isSyncMessage)),
Tag.of("story", String.valueOf(isStory))))
.increment();
throw new MessageTooLargeException();
}
}

View File

@@ -66,8 +66,6 @@ public class MessagePersister implements Managed {
private static final DistributionSummary QUEUE_COUNT_DISTRIBUTION_SUMMARY = DistributionSummary.builder(
name(MessagePersister.class, "queueCount"))
.publishPercentiles(new double[0])
.serviceLevelObjectives(IntStream.range(1, QUEUE_BATCH_LIMIT + 1).mapToDouble(i -> i).toArray())
.register(Metrics.globalRegistry);
private static final long EXCEPTION_PAUSE_MILLIS = Duration.ofSeconds(3).toMillis();

View File

@@ -22,8 +22,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.signal.libsignal.protocol.InvalidKeyException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,8 +73,7 @@ public class PagedSingleUseKEMPreKeyStore {
final DistributionSummary availableKeyCountDistributionSummary = DistributionSummary
.builder(name(getClass(), "availableKeyCount"))
.publishPercentiles(new double[0])
.serviceLevelObjectives(IntStream.range(1, 102).mapToDouble(i -> i).toArray())
.publishPercentileHistogram()
.register(Metrics.globalRegistry);
private final String takeKeyTimerName = name(getClass(), "takeKey");

View File

@@ -8,8 +8,6 @@ package org.whispersystems.textsecuregcm.websocket;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import io.micrometer.core.instrument.Tags;
import java.time.Duration;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,10 +86,10 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
this.experimentEnrollmentManager = experimentEnrollmentManager;
openAuthenticatedWebSocketCounter =
new OpenWebSocketCounter(OPEN_WEBSOCKET_GAUGE_NAME, NEW_CONNECTION_COUNTER_NAME, CONNECTED_DURATION_TIMER_NAME, Duration.ofHours(3), Tags.of(AUTHENTICATED_TAG_NAME, "true"));
new OpenWebSocketCounter(OPEN_WEBSOCKET_GAUGE_NAME, NEW_CONNECTION_COUNTER_NAME, CONNECTED_DURATION_TIMER_NAME, Tags.of(AUTHENTICATED_TAG_NAME, "true"));
openUnauthenticatedWebSocketCounter =
new OpenWebSocketCounter(OPEN_WEBSOCKET_GAUGE_NAME, NEW_CONNECTION_COUNTER_NAME, CONNECTED_DURATION_TIMER_NAME, Duration.ofHours(3), Tags.of(AUTHENTICATED_TAG_NAME, "false"));
new OpenWebSocketCounter(OPEN_WEBSOCKET_GAUGE_NAME, NEW_CONNECTION_COUNTER_NAME, CONNECTED_DURATION_TIMER_NAME, Tags.of(AUTHENTICATED_TAG_NAME, "false"));
}
@Override

View File

@@ -55,8 +55,7 @@ public class ProvisioningConnectListener implements WebSocketConnectListener {
this.timeout = timeout;
this.openWebSocketCounter = new OpenWebSocketCounter(MetricsUtil.name(getClass(), "openWebsockets"),
MetricsUtil.name(getClass(), "newConnections"),
MetricsUtil.name(getClass(), "sessionDuration"),
Duration.ofSeconds(90));
MetricsUtil.name(getClass(), "sessionDuration"));
}
@Override

View File

@@ -236,8 +236,6 @@ public class WebSocketConnection implements MessageAvailabilityListener, Disconn
})
.thenRun(() -> sample.stop(Timer.builder(SEND_MESSAGE_DURATION_TIMER_NAME)
.publishPercentileHistogram(true)
.minimumExpectedValue(Duration.ofMillis(100))
.maximumExpectedValue(Duration.ofDays(1))
.tags(Tags.of(UserAgentTagUtil.getPlatformTag(client.getUserAgent())))
.register(Metrics.globalRegistry)));
}