Configure Micrometer distribution buckets for OpenTelemetry

Co-authored-by: Jon Chambers <63609320+jon-signal@users.noreply.github.com>
This commit is contained in:
Jonathan Klabunde Tomer
2025-08-11 11:45:33 -07:00
committed by GitHub
parent 1429efd573
commit 36d0c4422e
19 changed files with 155 additions and 67 deletions

View File

@@ -968,7 +968,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
final MetricsHttpChannelListener metricsHttpChannelListener = new MetricsHttpChannelListener(clientReleaseManager,
Set.of(websocketServletPath, provisioningWebsocketServletPath, "/health-check"));
metricsHttpChannelListener.configure(environment);
final MessageMetrics messageMetrics = new MessageMetrics();
final MessageMetrics messageMetrics = new MessageMetrics(config.getDynamoDbTables().getMessages().getExpiration());
final BackupMetrics backupMetrics = new BackupMetrics();
// BufferingInterceptor is needed on the base environment but not the WebSocketEnvironment,

View File

@@ -6,8 +6,6 @@
package org.whispersystems.textsecuregcm.auth;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import io.netty.resolver.dns.DnsNameResolver;
import jakarta.ws.rs.core.Response;
import java.io.IOException;
@@ -26,7 +24,6 @@ import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.configuration.RetryConfiguration;
import org.whispersystems.textsecuregcm.http.FaultTolerantHttpClient;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import org.whispersystems.textsecuregcm.util.SystemMapper;
@@ -34,9 +31,6 @@ public class CloudflareTurnCredentialsManager {
private static final Logger logger = LoggerFactory.getLogger(CloudflareTurnCredentialsManager.class);
private static final String CREDENTIAL_FETCH_TIMER_NAME = MetricsUtil.name(CloudflareTurnCredentialsManager.class,
"credentialFetchLatency");
private final List<String> cloudflareTurnUrls;
private final List<String> cloudflareTurnUrlsWithIps;
private final String cloudflareTurnHostname;
@@ -119,20 +113,11 @@ public class CloudflareTurnCredentialsManager {
throw new IOException(e);
}
final Timer.Sample sample = Timer.start();
final HttpResponse<String> response;
try {
response = cloudflareTurnClient.sendAsync(getCredentialsRequest, HttpResponse.BodyHandlers.ofString()).join();
sample.stop(Timer.builder(CREDENTIAL_FETCH_TIMER_NAME)
.publishPercentileHistogram(true)
.tags("outcome", "success")
.register(Metrics.globalRegistry));
} catch (CompletionException e) {
logger.warn("failed to make http request to Cloudflare Turn: {}", e.getMessage());
sample.stop(Timer.builder(CREDENTIAL_FETCH_TIMER_NAME)
.publishPercentileHistogram(true)
.tags("outcome", "failure")
.register(Metrics.globalRegistry));
throw new IOException(ExceptionUtils.unwrap(e));
}

View File

@@ -8,8 +8,6 @@ package org.whispersystems.textsecuregcm.configuration;
import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.micrometer.registry.otlp.HistogramFlavor;
import io.micrometer.registry.otlp.OtlpConfig;
import java.time.Duration;
import java.util.Map;
@@ -35,11 +33,6 @@ public record OpenTelemetryConfiguration(
return maxBucketsPerMeter;
}
@Override
public HistogramFlavor histogramFlavor() {
return HistogramFlavor.BASE2_EXPONENTIAL_BUCKET_HISTOGRAM;
}
public Duration shutdownWaitDuration() {
if (shutdownWaitDuration == null) {
return step().plus(step().dividedBy(2));

View File

@@ -394,6 +394,8 @@ public class DeviceController {
if (response != null && response.getStatus() == Response.Status.OK.getStatusCode()) {
accountAndSample.second().stop(Timer.builder(WAIT_FOR_LINKED_DEVICE_TIMER_NAME)
.publishPercentileHistogram(true)
.minimumExpectedValue(Duration.ofMillis(100))
.maximumExpectedValue(Duration.ofMinutes(1))
.tags(Tags.of(UserAgentTagUtil.getPlatformTag(userAgent)))
.register(Metrics.globalRegistry));
}
@@ -612,6 +614,8 @@ public class DeviceController {
if (response != null && response.getStatus() == Response.Status.OK.getStatusCode()) {
accountAndSample.second().stop(Timer.builder(WAIT_FOR_TRANSFER_ARCHIVE_TIMER_NAME)
.publishPercentileHistogram(true)
.minimumExpectedValue(Duration.ofMillis(250))
.maximumExpectedValue(Duration.ofMinutes(5))
.tags(Tags.of(
UserAgentTagUtil.getPlatformTag(userAgent),
primaryPlatformTag(accountAndSample.first())))

View File

@@ -61,6 +61,8 @@ public class KeepAliveController {
Timer.builder(CLOSED_CONNECTION_AGE_DISTRIBUTION_NAME)
.tags(Tags.of(UserAgentTagUtil.getPlatformTag(context.getClient().getUserAgent())))
.publishPercentileHistogram(true)
.minimumExpectedValue(Duration.ofMillis(50))
.maximumExpectedValue(Duration.ofMinutes(2))
.register(Metrics.globalRegistry)
.record(age);
}

View File

@@ -91,8 +91,6 @@ public class KeysController {
private final Clock clock;
private static final String STORE_KEYS_COUNTER_NAME = MetricsUtil.name(KeysController.class, "storeKeys");
private static final String STORE_KEY_BUNDLE_SIZE_DISTRIBUTION_NAME =
MetricsUtil.name(KeysController.class, "storeKeyBundleSize");
private static final String PRIMARY_DEVICE_TAG_NAME = "isPrimary";
private static final String IDENTITY_TYPE_TAG_NAME = "identityType";
private static final String KEY_TYPE_TAG_NAME = "keyType";
@@ -173,12 +171,6 @@ public class KeysController {
Metrics.counter(STORE_KEYS_COUNTER_NAME, tags).increment();
DistributionSummary.builder(STORE_KEY_BUNDLE_SIZE_DISTRIBUTION_NAME)
.tags(tags)
.publishPercentileHistogram()
.register(Metrics.globalRegistry)
.record(setKeysRequest.preKeys().size());
storeFutures.add(keysManager.storeEcOneTimePreKeys(identifier, device.getId(), setKeysRequest.preKeys()));
}
@@ -194,12 +186,6 @@ public class KeysController {
final Tags tags = Tags.of(platformTag, primaryDeviceTag, identityTypeTag, Tag.of(KEY_TYPE_TAG_NAME, "kyber"));
Metrics.counter(STORE_KEYS_COUNTER_NAME, tags).increment();
DistributionSummary.builder(STORE_KEY_BUNDLE_SIZE_DISTRIBUTION_NAME)
.tags(tags)
.publishPercentileHistogram()
.register(Metrics.globalRegistry)
.record(setKeysRequest.pqPreKeys().size());
storeFutures.add(keysManager.storeKemOneTimePreKeys(identifier, device.getId(), setKeysRequest.pqPreKeys()));
}

View File

@@ -152,11 +152,15 @@ public class MessageController {
INDIVIDUAL_MESSAGE_LATENCY_TIMER = Timer.builder(timerName)
.tags(multiRecipientTagName, "false")
.publishPercentileHistogram(true)
.minimumExpectedValue(Duration.ofMillis(1))
.minimumExpectedValue(Duration.ofSeconds(10))
.register(Metrics.globalRegistry);
MULTI_RECIPIENT_MESSAGE_LATENCY_TIMER = Timer.builder(timerName)
.tags(multiRecipientTagName, "true")
.publishPercentileHistogram(true)
.minimumExpectedValue(Duration.ofMillis(1))
.minimumExpectedValue(Duration.ofSeconds(10))
.register(Metrics.globalRegistry);
}

View File

@@ -32,15 +32,22 @@ public final class MessageMetrics {
"mismatchedAccountEnvelopeUuid");
public static final String DELIVERY_LATENCY_TIMER_NAME = name(MessageMetrics.class, "deliveryLatency");
private final Duration messageTtl;
private final MeterRegistry metricRegistry;
@VisibleForTesting
MessageMetrics(final MeterRegistry metricRegistry) {
MessageMetrics(final MeterRegistry metricRegistry, final Duration messageTtl) {
this.metricRegistry = metricRegistry;
this.messageTtl = messageTtl;
}
public MessageMetrics(final Duration messageTtl) {
this(Metrics.globalRegistry, messageTtl);
}
@VisibleForTesting
public MessageMetrics() {
this(Metrics.globalRegistry);
this(Metrics.globalRegistry, Duration.ofDays(30));
}
public void measureAccountOutgoingMessageUuidMismatches(final Account account,
@@ -83,10 +90,12 @@ public final class MessageMetrics {
tags.add(Tag.of("isUrgent", String.valueOf(isUrgent)));
tags.add(Tag.of("isEphemeral", String.valueOf(isEphemeral)));
UserAgentTagUtil.getClientVersionTag(userAgent, clientReleaseManager).ifPresent(tags::add);
// UserAgentTagUtil.getClientVersionTag(userAgent, clientReleaseManager).ifPresent(tags::add);
Timer.builder(DELIVERY_LATENCY_TIMER_NAME)
.publishPercentileHistogram(true)
.minimumExpectedValue(Duration.ofSeconds(1))
.maximumExpectedValue(messageTtl)
.tags(tags)
.register(metricRegistry)
.record(Duration.between(Instant.ofEpochMilli(serverTimestamp), Instant.now()));

View File

@@ -19,13 +19,16 @@ import io.micrometer.core.instrument.binder.system.FileDescriptorMetrics;
import io.micrometer.core.instrument.binder.system.ProcessorMetrics;
import io.micrometer.core.instrument.config.MeterFilter;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.registry.otlp.HistogramFlavor;
import io.micrometer.registry.otlp.OtlpMeterRegistry;
import io.micrometer.statsd.StatsdMeterRegistry;
import java.time.Duration;
import java.util.Optional;
import java.util.stream.IntStream;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.WhisperServerVersion;
import org.whispersystems.textsecuregcm.configuration.OpenTelemetryConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.util.Constants;
@@ -86,6 +89,7 @@ public class MetricsUtil {
config.getOpenTelemetryConfiguration(), io.micrometer.core.instrument.Clock.SYSTEM);
configureMeterFilters(otlpMeterRegistry.config(), dynamicConfigurationManager);
configureHistogramFilters(otlpMeterRegistry.config(), config.getOpenTelemetryConfiguration());
Metrics.addRegistry(otlpMeterRegistry);
if (config.getOpenTelemetryConfiguration().shutdownWaitDuration().compareTo(shutdownWaitDuration) > 0) {
@@ -113,7 +117,7 @@ public class MetricsUtil {
config.meterFilter(new MeterFilter() {
@Override
public DistributionStatisticConfig configure(final Meter.Id id, final DistributionStatisticConfig config) {
return defaultDistributionStatisticConfig.merge(config);
return Optional.ofNullable(config.isPercentileHistogram()).orElse(false) ? config : defaultDistributionStatisticConfig.merge(config);
}
})
// Remove high-cardinality `command` tags from Lettuce metrics and prepend "chat." to meter names
@@ -137,6 +141,46 @@ public class MetricsUtil {
&& id.getName().startsWith(awsSdkMetricNamePrefix)));
}
@VisibleForTesting
static void configureHistogramFilters(MeterRegistry.Config config, OpenTelemetryConfiguration openTelemetryConfig) {
if (openTelemetryConfig.histogramFlavor() != HistogramFlavor.EXPLICIT_BUCKET_HISTOGRAM) {
// This workaround for Micrometer's awful defaults is only required for explicit bucket histograms.
return;
}
config.meterFilter(new MeterFilter() {
@Override
public DistributionStatisticConfig configure(final Meter.Id id, final DistributionStatisticConfig config) {
if (config.isPercentileHistogram() == null || !config.isPercentileHistogram()) {
return config;
}
final double lowerBound = config.getMinimumExpectedValueAsDouble();
final double upperBound = config.getMaximumExpectedValueAsDouble();
final int numBuckets = Optional.ofNullable(openTelemetryConfig.maxBucketsPerMeter().get(id.getName()))
.orElse(openTelemetryConfig.maxBucketCount());
// Bucket i covers values from (buckets[i-1], buckets[i]] except the first one which covers (-inf, buckets[0]].
// A final bucket will automatically be added at positive infinity, so if we want numBuckets total buckets, we
// need numBuckets - 1 explicit ones; if we want those to have equal ratios between values, and want an explicit
// bucket for (-inf, lowerBound] as well as an implicit one for (upperBound, inf], the ratio between buckets will be
// r = (upperBound/lowerBound)^(1/(numBuckets - 2)), so that we have values at lowerBound * r^i
// for i in [0, numBuckets-2] i.e. numBuckets - 1 values, plus the one at infinity
final double scale = Math.pow(upperBound / lowerBound, 1.0 / (numBuckets - 2));
final double[] buckets = IntStream.range(0, numBuckets - 1).mapToDouble(i -> lowerBound * Math.pow(scale, i)).toArray();
// yes, percentilesHistogram(false)! Otherwise, Micrometer will add its own non-configurable buckets based on an
// inferior selection algorithm that produces 69(!) buckets for a range from 1ms to 30s and still yields ±25% relative error
return DistributionStatisticConfig.builder()
.percentilesHistogram(false)
.serviceLevelObjectives(buckets)
.build()
.merge(config);
}
});
}
public static void registerSystemResourceMetrics(final Environment environment) {
new ProcessorMetrics().bindTo(Metrics.globalRegistry);
new FileDescriptorMetrics().bindTo(Metrics.globalRegistry);

View File

@@ -6,6 +6,8 @@ import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.whispersystems.textsecuregcm.util.EnumMapUtil;
@@ -20,6 +22,7 @@ public class OpenWebSocketCounter {
private final String newConnectionCounterName;
private final String durationTimerName;
private final Duration longestExpectedConnectionDuration;
private final Tags tags;
@@ -28,18 +31,21 @@ public class OpenWebSocketCounter {
public OpenWebSocketCounter(final String openWebSocketGaugeName,
final String newConnectionCounterName,
final String durationTimerName) {
final String durationTimerName,
final Duration longestExpectedConnectionDuration) {
this(openWebSocketGaugeName, newConnectionCounterName, durationTimerName, Tags.empty());
this(openWebSocketGaugeName, newConnectionCounterName, durationTimerName, longestExpectedConnectionDuration, Tags.empty());
}
public OpenWebSocketCounter(final String openWebSocketGaugeName,
final String newConnectionCounterName,
final String durationTimerName,
final Duration longestExpectedConnectionDuration,
final Tags tags) {
this.newConnectionCounterName = newConnectionCounterName;
this.durationTimerName = durationTimerName;
this.longestExpectedConnectionDuration = longestExpectedConnectionDuration;
this.tags = tags;
@@ -83,9 +89,11 @@ public class OpenWebSocketCounter {
Metrics.counter(newConnectionCounterName, tagsWithClientPlatform).increment();
context.addWebsocketClosedListener((context1, statusCode, reason) -> {
context.addWebsocketClosedListener((_, statusCode, _) -> {
sample.stop(Timer.builder(durationTimerName)
.publishPercentileHistogram(true)
.minimumExpectedValue(Duration.ofSeconds(1))
.maximumExpectedValue(longestExpectedConnectionDuration)
.tags(tagsWithClientPlatform)
.register(Metrics.globalRegistry));

View File

@@ -37,6 +37,8 @@ public class MultiRecipientMessageProvider implements MessageBodyReader<SealedSe
private static final DistributionSummary RECIPIENT_COUNT_DISTRIBUTION = DistributionSummary
.builder(name(MultiRecipientMessageProvider.class, "recipients"))
.publishPercentileHistogram(true)
.minimumExpectedValue(1.0)
.maximumExpectedValue((double) MAX_RECIPIENT_COUNT)
.register(Metrics.globalRegistry);
@Override

View File

@@ -313,26 +313,23 @@ public class MessageSender {
final boolean isStory,
final Tag platformTag) throws MessageTooLargeException {
final boolean oversize = contentLength > MAX_MESSAGE_SIZE;
final Tags tags = Tags.of(platformTag,
Tag.of("multiRecipientMessage", String.valueOf(isMultiRecipientMessage)),
Tag.of("syncMessage", String.valueOf(isSyncMessage)),
Tag.of("story", String.valueOf(isStory)));
DistributionSummary.builder(CONTENT_SIZE_DISTRIBUTION_NAME)
.tags(Tags.of(platformTag,
Tag.of("oversize", String.valueOf(oversize)),
Tag.of("multiRecipientMessage", String.valueOf(isMultiRecipientMessage)),
Tag.of("syncMessage", String.valueOf(isSyncMessage)),
Tag.of("story", String.valueOf(isStory))))
.publishPercentileHistogram(true)
.register(Metrics.globalRegistry)
.record(contentLength);
if (oversize) {
Metrics.counter(REJECT_OVERSIZE_MESSAGE_COUNTER_NAME, Tags.of(platformTag,
Tag.of("multiRecipientMessage", String.valueOf(isMultiRecipientMessage)),
Tag.of("syncMessage", String.valueOf(isSyncMessage)),
Tag.of("story", String.valueOf(isStory))))
.increment();
if (contentLength > MAX_MESSAGE_SIZE) {
Metrics.counter(REJECT_OVERSIZE_MESSAGE_COUNTER_NAME, tags).increment();
throw new MessageTooLargeException();
} else {
DistributionSummary.builder(CONTENT_SIZE_DISTRIBUTION_NAME)
.tags(tags)
.publishPercentileHistogram(true)
.minimumExpectedValue(256.0)
.maximumExpectedValue((double) MAX_MESSAGE_SIZE)
.register(Metrics.globalRegistry)
.record(contentLength);
}
}

View File

@@ -22,6 +22,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.signal.libsignal.protocol.InvalidKeyException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,7 +75,8 @@ public class PagedSingleUseKEMPreKeyStore {
final DistributionSummary availableKeyCountDistributionSummary = DistributionSummary
.builder(name(getClass(), "availableKeyCount"))
.publishPercentileHistogram()
.publishPercentiles(new double[0])
.serviceLevelObjectives(IntStream.range(1, 102).mapToDouble(i -> i).toArray())
.register(Metrics.globalRegistry);
private final String takeKeyTimerName = name(getClass(), "takeKey");

View File

@@ -8,6 +8,8 @@ package org.whispersystems.textsecuregcm.websocket;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import io.micrometer.core.instrument.Tags;
import java.time.Duration;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,10 +88,10 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
this.experimentEnrollmentManager = experimentEnrollmentManager;
openAuthenticatedWebSocketCounter =
new OpenWebSocketCounter(OPEN_WEBSOCKET_GAUGE_NAME, NEW_CONNECTION_COUNTER_NAME, CONNECTED_DURATION_TIMER_NAME, Tags.of(AUTHENTICATED_TAG_NAME, "true"));
new OpenWebSocketCounter(OPEN_WEBSOCKET_GAUGE_NAME, NEW_CONNECTION_COUNTER_NAME, CONNECTED_DURATION_TIMER_NAME, Duration.ofHours(3), Tags.of(AUTHENTICATED_TAG_NAME, "true"));
openUnauthenticatedWebSocketCounter =
new OpenWebSocketCounter(OPEN_WEBSOCKET_GAUGE_NAME, NEW_CONNECTION_COUNTER_NAME, CONNECTED_DURATION_TIMER_NAME, Tags.of(AUTHENTICATED_TAG_NAME, "false"));
new OpenWebSocketCounter(OPEN_WEBSOCKET_GAUGE_NAME, NEW_CONNECTION_COUNTER_NAME, CONNECTED_DURATION_TIMER_NAME, Duration.ofHours(3), Tags.of(AUTHENTICATED_TAG_NAME, "false"));
}
@Override

View File

@@ -55,7 +55,8 @@ public class ProvisioningConnectListener implements WebSocketConnectListener {
this.timeout = timeout;
this.openWebSocketCounter = new OpenWebSocketCounter(MetricsUtil.name(getClass(), "openWebsockets"),
MetricsUtil.name(getClass(), "newConnections"),
MetricsUtil.name(getClass(), "sessionDuration"));
MetricsUtil.name(getClass(), "sessionDuration"),
Duration.ofSeconds(90));
}
@Override

View File

@@ -236,6 +236,8 @@ public class WebSocketConnection implements MessageAvailabilityListener, Disconn
})
.thenRun(() -> sample.stop(Timer.builder(SEND_MESSAGE_DURATION_TIMER_NAME)
.publishPercentileHistogram(true)
.minimumExpectedValue(Duration.ofMillis(100))
.maximumExpectedValue(Duration.ofDays(1))
.tags(Tags.of(UserAgentTagUtil.getPlatformTag(client.getUserAgent())))
.register(Metrics.globalRegistry)));
}