Augment gRPC metrics with UA

This commit is contained in:
Ravi Khadiwala
2025-08-20 13:00:45 -05:00
committed by ravi-signal
parent c5af8f3a9e
commit 7ca3604601
3 changed files with 303 additions and 6 deletions

View File

@@ -148,6 +148,7 @@ import org.whispersystems.textsecuregcm.grpc.ExternalServiceCredentialsAnonymous
import org.whispersystems.textsecuregcm.grpc.ExternalServiceCredentialsGrpcService;
import org.whispersystems.textsecuregcm.grpc.KeysAnonymousGrpcService;
import org.whispersystems.textsecuregcm.grpc.KeysGrpcService;
import org.whispersystems.textsecuregcm.grpc.MetricServerInterceptor;
import org.whispersystems.textsecuregcm.grpc.PaymentsGrpcService;
import org.whispersystems.textsecuregcm.grpc.ProfileAnonymousGrpcService;
import org.whispersystems.textsecuregcm.grpc.ProfileGrpcService;
@@ -830,8 +831,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
final ManagedDefaultEventLoopGroup localEventLoopGroup = new ManagedDefaultEventLoopGroup();
final RemoteDeprecationFilter remoteDeprecationFilter = new RemoteDeprecationFilter(dynamicConfigurationManager);
final MetricCollectingServerInterceptor metricCollectingServerInterceptor =
new MetricCollectingServerInterceptor(Metrics.globalRegistry);
final MetricServerInterceptor metricServerInterceptor = new MetricServerInterceptor(Metrics.globalRegistry, clientReleaseManager);
final ErrorMappingInterceptor errorMappingInterceptor = new ErrorMappingInterceptor();
final RequestAttributesInterceptor requestAttributesInterceptor =
@@ -853,8 +853,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
new ExternalRequestFilter(config.getExternalRequestFilterConfiguration().permittedInternalRanges(),
config.getExternalRequestFilterConfiguration().grpcMethods()))
.intercept(validatingInterceptor)
// TODO: specialize metrics with user-agent platform
.intercept(metricCollectingServerInterceptor)
.intercept(metricServerInterceptor)
.intercept(errorMappingInterceptor)
.intercept(remoteDeprecationFilter)
.intercept(requestAttributesInterceptor)
@@ -874,9 +873,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
// depends on the user-agent context so it has to come first here!
// http://grpc.github.io/grpc-java/javadoc/io/grpc/ServerBuilder.html#intercept-io.grpc.ServerInterceptor-
serverBuilder
// TODO: specialize metrics with user-agent platform
.intercept(validatingInterceptor)
.intercept(metricCollectingServerInterceptor)
.intercept(metricServerInterceptor)
.intercept(errorMappingInterceptor)
.intercept(remoteDeprecationFilter)
.intercept(requestAttributesInterceptor)

View File

@@ -0,0 +1,134 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.grpc;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.*;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
import org.whispersystems.textsecuregcm.storage.ClientReleaseManager;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
public class MetricServerInterceptor implements ServerInterceptor {
private static final String TAG_SERVICE_NAME = "grpcService";
private static final String TAG_METHOD_NAME = "method";
private static final String TAG_METHOD_TYPE = "methodType";
private static final String TAG_STATUS_CODE = "statusCode";
@VisibleForTesting
static final String REQUEST_MESSAGE_COUNTER_NAME = MetricsUtil.name(MetricServerInterceptor.class, "requestMessage");
@VisibleForTesting
static final String RESPONSE_COUNTER_NAME = MetricsUtil.name(MetricServerInterceptor.class, "responseMessage");
@VisibleForTesting
static final String RPC_COUNTER_NAME = MetricsUtil.name(MetricServerInterceptor.class, "rpc");
@VisibleForTesting
static final String DURATION_TIMER_NAME = MetricsUtil.name(MetricServerInterceptor.class, "processingDuration");
private final MeterRegistry meterRegistry;
private final ClientReleaseManager clientReleaseManager;
public MetricServerInterceptor(final MeterRegistry meterRegistry, final ClientReleaseManager clientReleaseManager) {
this.meterRegistry = meterRegistry;
this.clientReleaseManager = clientReleaseManager;
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
final ServerCall<ReqT, RespT> call,
final Metadata headers,
final ServerCallHandler<ReqT, RespT> next) {
final Optional<String> userAgentString = RequestAttributesUtil.getUserAgent();
final List<Tag> tagList = new ArrayList<>(6);
tagList.add(Tag.of(TAG_SERVICE_NAME, call.getMethodDescriptor().getServiceName()));
tagList.add(Tag.of(TAG_METHOD_NAME, call.getMethodDescriptor().getBareMethodName()));
tagList.add(Tag.of(TAG_METHOD_TYPE, call.getMethodDescriptor().getType().name()));
RequestAttributesUtil.getUserAgent()
.map(UserAgentTagUtil::getLibsignalAndPlatformTags)
.ifPresent(tagList::addAll);
userAgentString
.flatMap(ua -> UserAgentTagUtil.getClientVersionTag(ua, clientReleaseManager))
.ifPresent(tagList::add);
final Tags tags = Tags.of(tagList);
final MetricServerCall<ReqT, RespT> monitoringServerCall = new MetricServerCall<>(call, tags);
return new MetricServerCallListener<>(next.startCall(monitoringServerCall, headers), tags);
}
/**
* A ServerCall delegator that updates metrics on response messages and the final RPC status
*/
private class MetricServerCall<ReqT, RespT> extends ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT> {
private final Counter responseMessageCounter;
private final Tags tags;
MetricServerCall(final ServerCall<ReqT, RespT> delegate, final Tags tags) {
super(delegate);
this.responseMessageCounter = meterRegistry.counter(RESPONSE_COUNTER_NAME, tags);
this.tags = tags;
}
@Override
public void close(final Status status, final Metadata responseHeaders) {
meterRegistry.counter(RPC_COUNTER_NAME, tags.and(TAG_STATUS_CODE, status.getCode().name())).increment();
super.close(status, responseHeaders);
}
@Override
public void sendMessage(final RespT responseMessage) {
this.responseMessageCounter.increment();
super.sendMessage(responseMessage);
}
}
/**
* A ServerCallListener delegator that updates metrics on requests and measures the RPC time on completion
*/
private class MetricServerCallListener<ReqT> extends ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT> {
private final Counter requestCounter;
private final Timer responseTimer;
private final Timer.Sample sample;
MetricServerCallListener(final ServerCall.Listener<ReqT> delegate, final Tags tags) {
super(delegate);
this.requestCounter = meterRegistry.counter(REQUEST_MESSAGE_COUNTER_NAME, tags);
this.responseTimer = meterRegistry.timer(DURATION_TIMER_NAME, tags);
this.sample = Timer.start(meterRegistry);
}
@Override
public void onMessage(final ReqT requestMessage) {
this.requestCounter.increment();
super.onMessage(requestMessage);
}
@Override
public void onComplete() {
this.sample.stop(responseTimer);
super.onComplete();
}
@Override
public void onCancel() {
this.sample.stop(responseTimer);
super.onCancel();
}
}
}