Add noise tunnel connection metrics

This commit is contained in:
Ravi Khadiwala
2025-08-20 14:01:59 -05:00
committed by ravi-signal
parent 7ca3604601
commit be8b44d645
11 changed files with 96 additions and 12 deletions

View File

@@ -1,5 +1,8 @@
package org.whispersystems.textsecuregcm.grpc.net;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
@@ -15,6 +18,8 @@ import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.grpc.AuthenticatedDevice;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
/**
* An "establish local connection" handler waits for a Noise handshake to complete upstream in the pipeline, buffering
@@ -22,24 +27,28 @@ import org.whispersystems.textsecuregcm.auth.grpc.AuthenticatedDevice;
* server.
*/
public class EstablishLocalGrpcConnectionHandler extends ChannelInboundHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(EstablishLocalGrpcConnectionHandler.class);
private final GrpcClientConnectionManager grpcClientConnectionManager;
private final LocalAddress authenticatedGrpcServerAddress;
private final LocalAddress anonymousGrpcServerAddress;
private final FramingType framingType;
private final List<Object> pendingReads = new ArrayList<>();
private static final Logger log = LoggerFactory.getLogger(EstablishLocalGrpcConnectionHandler.class);
private static final String CONNECTION_ESTABLISHED_COUNTER_NAME = MetricsUtil.name(EstablishLocalGrpcConnectionHandler.class, "established");
public EstablishLocalGrpcConnectionHandler(final GrpcClientConnectionManager grpcClientConnectionManager,
final LocalAddress authenticatedGrpcServerAddress,
final LocalAddress anonymousGrpcServerAddress) {
final LocalAddress anonymousGrpcServerAddress,
final FramingType framingType) {
this.grpcClientConnectionManager = grpcClientConnectionManager;
this.authenticatedGrpcServerAddress = authenticatedGrpcServerAddress;
this.anonymousGrpcServerAddress = anonymousGrpcServerAddress;
this.framingType = framingType;
}
@Override
@@ -63,6 +72,12 @@ public class EstablishLocalGrpcConnectionHandler extends ChannelInboundHandlerAd
GrpcClientConnectionManager.handleHandshakeInitiated(
remoteChannelContext.channel(), remoteAddress, userAgent, acceptLanguage);
final List<Tag> tags = UserAgentTagUtil.getLibsignalAndPlatformTags(userAgent);
Metrics.counter(CONNECTION_ESTABLISHED_COUNTER_NAME, Tags.of(tags)
.and("authenticated", Boolean.toString(authenticatedDevice.isPresent()))
.and("framingType", framingType.name()))
.increment();
new Bootstrap()
.remoteAddress(grpcServerAddress)
.channel(LocalChannel.class)

View File

@@ -0,0 +1,6 @@
package org.whispersystems.textsecuregcm.grpc.net;
public enum FramingType {
NOISE_DIRECT,
WEBSOCKET
}

View File

@@ -18,4 +18,5 @@ public record NoiseIdentityDeterminedEvent(
Optional<AuthenticatedDevice> authenticatedDevice,
InetAddress remoteAddress,
String userAgent,
String acceptLanguage) {}
String acceptLanguage) {
}

View File

@@ -16,7 +16,7 @@ import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
* Watches for inbound close frames and closes the connection in response
*/
public class NoiseDirectInboundCloseHandler extends ChannelInboundHandlerAdapter {
private static String CLIENT_CLOSE_COUNTER_NAME = MetricsUtil.name(ChannelInboundHandlerAdapter.class, "clientClose");
private static String CLIENT_CLOSE_COUNTER_NAME = MetricsUtil.name(NoiseDirectInboundCloseHandler.class, "clientClose");
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
if (msg instanceof NoiseDirectFrame ndf && ndf.frameType() == NoiseDirectFrame.FrameType.CLOSE) {

View File

@@ -1,5 +1,6 @@
package org.whispersystems.textsecuregcm.grpc.net.noisedirect;
import io.micrometer.core.instrument.Metrics;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelFutureListener;
@@ -7,12 +8,14 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import org.whispersystems.textsecuregcm.grpc.net.OutboundCloseErrorMessage;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
/**
* Translates {@link OutboundCloseErrorMessage}s into {@link NoiseDirectFrame} error frames. After error frames are
* written, the channel is closed
*/
class NoiseDirectOutboundErrorHandler extends ChannelOutboundHandlerAdapter {
private static String SERVER_CLOSE_COUNTER_NAME = MetricsUtil.name(NoiseDirectInboundCloseHandler.class, "serverClose");
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
@@ -23,6 +26,8 @@ class NoiseDirectOutboundErrorHandler extends ChannelOutboundHandlerAdapter {
case NOISE_HANDSHAKE_ERROR -> NoiseDirectProtos.CloseReason.Code.HANDSHAKE_ERROR;
case INTERNAL_SERVER_ERROR -> NoiseDirectProtos.CloseReason.Code.INTERNAL_ERROR;
};
Metrics.counter(SERVER_CLOSE_COUNTER_NAME, "reason", code.name()).increment();
final NoiseDirectProtos.CloseReason proto = NoiseDirectProtos.CloseReason.newBuilder()
.setCode(code)
.setMessage(err.message())

View File

@@ -17,6 +17,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.grpc.net.ErrorHandler;
import org.whispersystems.textsecuregcm.grpc.net.EstablishLocalGrpcConnectionHandler;
import org.whispersystems.textsecuregcm.grpc.net.FramingType;
import org.whispersystems.textsecuregcm.grpc.net.GrpcClientConnectionManager;
import org.whispersystems.textsecuregcm.grpc.net.HAProxyMessageHandler;
import org.whispersystems.textsecuregcm.grpc.net.NoiseHandshakeHandler;
@@ -68,7 +69,9 @@ public class NoiseDirectTunnelServer implements Managed {
// This handler will open a local connection to the appropriate gRPC server and install a ProxyHandler
// once the Noise handshake has completed
.addLast(new EstablishLocalGrpcConnectionHandler(
grpcClientConnectionManager, authenticatedGrpcServerAddress, anonymousGrpcServerAddress))
grpcClientConnectionManager,
authenticatedGrpcServerAddress, anonymousGrpcServerAddress,
FramingType.NOISE_DIRECT))
.addLast(new ErrorHandler());
}
});

View File

@@ -104,6 +104,8 @@ public class NoiseWebSocketTunnelServer implements Managed {
// request and passed it down the pipeline
.addLast(new WebSocketOpeningHandshakeHandler(AUTHENTICATED_SERVICE_PATH, ANONYMOUS_SERVICE_PATH, HEALTH_CHECK_PATH))
.addLast(new WebSocketServerProtocolHandler("/", true))
// Metrics on inbound/outbound Close frames
.addLast(new WebSocketCloseMetricHandler())
// Turn generic OutboundCloseErrorMessages into websocket close frames
.addLast(new WebSocketOutboundErrorHandler())
.addLast(new RejectUnsupportedMessagesHandler())
@@ -116,7 +118,10 @@ public class NoiseWebSocketTunnelServer implements Managed {
.addLast(new NoiseHandshakeHandler(clientPublicKeysManager, ecKeyPair))
// This handler will open a local connection to the appropriate gRPC server and install a ProxyHandler
// once the Noise handshake has completed
.addLast(new EstablishLocalGrpcConnectionHandler(grpcClientConnectionManager, authenticatedGrpcServerAddress, anonymousGrpcServerAddress))
.addLast(new EstablishLocalGrpcConnectionHandler(
grpcClientConnectionManager,
authenticatedGrpcServerAddress, anonymousGrpcServerAddress,
FramingType.WEBSOCKET))
.addLast(new ErrorHandler());
}
});

View File

@@ -0,0 +1,50 @@
/*
* Copyright 2025 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.grpc.net.websocket;
import io.micrometer.core.instrument.Metrics;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
public class WebSocketCloseMetricHandler extends ChannelDuplexHandler {
private static String CLIENT_CLOSE_COUNTER_NAME = MetricsUtil.name(WebSocketCloseMetricHandler.class, "clientClose");
private static String SERVER_CLOSE_COUNTER_NAME = MetricsUtil.name(WebSocketCloseMetricHandler.class, "serverClose");
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
if (msg instanceof CloseWebSocketFrame closeFrame) {
Metrics.counter(CLIENT_CLOSE_COUNTER_NAME, "closeCode", validatedCloseCode(closeFrame.statusCode())).increment();
}
ctx.fireChannelRead(msg);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof CloseWebSocketFrame closeFrame) {
Metrics.counter(SERVER_CLOSE_COUNTER_NAME, "closeCode", Integer.toString(closeFrame.statusCode())).increment();
}
ctx.write(msg, promise);
}
private static String validatedCloseCode(int closeCode) {
if (closeCode >= 1000 && closeCode <= 1015) {
// RFC-6455 pre-defined status codes
return Integer.toString(closeCode);
} else if (closeCode >= 4000 && closeCode <= 4100) {
// Application status codes
return Integer.toString(closeCode);
} else {
return "unknown";
}
}
}

View File

@@ -10,11 +10,13 @@ import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.grpc.net.OutboundCloseErrorMessage;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
/**
* Converts {@link OutboundCloseErrorMessage}s written to the pipeline into WebSocket close frames
*/
class WebSocketOutboundErrorHandler extends ChannelDuplexHandler {
private static String SERVER_CLOSE_COUNTER_NAME = MetricsUtil.name(WebSocketOutboundErrorHandler.class, "serverClose");
private boolean websocketHandshakeComplete = false;