Add "additional specifiers" dimensions to open WebSocket counters and simplify meter construction

This commit is contained in:
Jon Chambers
2026-02-18 16:58:45 -05:00
committed by Jon Chambers
parent cb3363410c
commit f390aabb3a
7 changed files with 82 additions and 88 deletions

View File

@@ -1148,7 +1148,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
WebSocketEnvironment<AuthenticatedDevice> provisioningEnvironment = new WebSocketEnvironment<>(environment,
webSocketEnvironment.getRequestLog(), Duration.ofMillis(60000));
provisioningEnvironment.setConnectListener(new ProvisioningConnectListener(provisioningManager, provisioningWebsocketTimeoutExecutor, Duration.ofSeconds(90)));
provisioningEnvironment.setConnectListener(new ProvisioningConnectListener(provisioningManager, clientReleaseManager, provisioningWebsocketTimeoutExecutor, Duration.ofSeconds(90)));
provisioningEnvironment.jersey().register(new MetricsApplicationEventListener(TrafficSource.WEBSOCKET, clientReleaseManager));
provisioningEnvironment.jersey().register(new KeepAliveController(redisMessageAvailabilityManager));
provisioningEnvironment.jersey().register(new TimestampResponseFilter());

View File

@@ -3,95 +3,100 @@ package org.whispersystems.textsecuregcm.metrics;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.whispersystems.textsecuregcm.util.EnumMapUtil;
import org.whispersystems.textsecuregcm.util.ua.ClientPlatform;
import javax.annotation.Nullable;
import org.whispersystems.textsecuregcm.storage.ClientReleaseManager;
import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException;
import org.whispersystems.textsecuregcm.util.ua.UserAgent;
import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil;
import org.whispersystems.websocket.session.WebSocketSessionContext;
public class OpenWebSocketCounter {
private static final String WEBSOCKET_CLOSED_COUNTER_NAME = name(OpenWebSocketCounter.class, "websocketClosed");
private final ClientReleaseManager clientReleaseManager;
private final String newConnectionCounterName;
private final String durationTimerName;
private final Tags baseTags;
private final Tags tags;
private final Map<Tags, AtomicInteger> openWebsocketsByTags;
private final AtomicInteger totalConnections;
private final Map<ClientPlatform, AtomicInteger> openWebsocketsByClientPlatform;
private final AtomicInteger openWebsocketsFromUnknownPlatforms;
private static final int MAX_COUNTERS = 4096;
public OpenWebSocketCounter(final String openWebSocketGaugeName,
final String newConnectionCounterName,
final String durationTimerName) {
private static final String OPEN_WEBSOCKET_GAUGE_NAME = name(OpenWebSocketCounter.class, "openWebsockets");
private static final String TOTAL_CONNECTIONS_GAUGE_NAME = name(OpenWebSocketCounter.class, "totalOpenWebsockets");
private static final String NEW_CONNECTION_COUNTER_NAME = name(OpenWebSocketCounter.class, "newConnections");
private static final String WEB_SOCKET_CLOSED_COUNTER_NAME = name(OpenWebSocketCounter.class, "websocketClosed");
private static final String SESSION_DURATION_TIMER_NAME = name(OpenWebSocketCounter.class, "sessionDuration");
private static final String GAUGE_COUNT_GAUGE_NAME = name(OpenWebSocketCounter.class, "gaugeCount");
this(openWebSocketGaugeName, newConnectionCounterName, durationTimerName, Tags.empty());
}
public OpenWebSocketCounter(final String webSocketType,
final ClientReleaseManager clientReleaseManager) {
public OpenWebSocketCounter(final String openWebSocketGaugeName,
final String newConnectionCounterName,
final String durationTimerName,
final Tags tags) {
this.clientReleaseManager = clientReleaseManager;
this.newConnectionCounterName = newConnectionCounterName;
this.durationTimerName = durationTimerName;
this.baseTags = Tags.of("webSocketType", webSocketType);
this.openWebsocketsByTags = Metrics.gaugeMapSize(GAUGE_COUNT_GAUGE_NAME, baseTags, new ConcurrentHashMap<>());
this.tags = tags;
openWebsocketsByClientPlatform = EnumMapUtil.toEnumMap(ClientPlatform.class,
clientPlatform -> buildGauge(openWebSocketGaugeName, clientPlatform.name().toLowerCase(), tags));
openWebsocketsFromUnknownPlatforms = buildGauge(openWebSocketGaugeName, "unknown", tags);
}
private static AtomicInteger buildGauge(final String gaugeName, final String clientPlatformName, final Tags tags) {
return Metrics.gauge(gaugeName,
tags.and(Tag.of(UserAgentTagUtil.PLATFORM_TAG, clientPlatformName)),
new AtomicInteger(0));
this.totalConnections = Metrics.gauge(TOTAL_CONNECTIONS_GAUGE_NAME, baseTags, new AtomicInteger(0));
}
public void countOpenWebSocket(final WebSocketSessionContext context) {
final Timer.Sample sample = Timer.start();
// We have to jump through some hoops here to have something "effectively final" for the close listener, but
// assignable from a `catch` block.
final AtomicInteger openWebSocketCounter;
@Nullable final UserAgent userAgent;
{
AtomicInteger calculatedOpenWebSocketCounter;
UserAgent parsedUserAgent;
try {
final ClientPlatform clientPlatform =
UserAgentUtil.parseUserAgentString(context.getClient().getUserAgent()).platform();
calculatedOpenWebSocketCounter = openWebsocketsByClientPlatform.get(clientPlatform);
parsedUserAgent = UserAgentUtil.parseUserAgentString(context.getClient().getUserAgent());
} catch (final UnrecognizedUserAgentException e) {
calculatedOpenWebSocketCounter = openWebsocketsFromUnknownPlatforms;
parsedUserAgent = null;
}
openWebSocketCounter = calculatedOpenWebSocketCounter;
userAgent = parsedUserAgent;
}
openWebSocketCounter.incrementAndGet();
final Tags tagsWithClientPlatform = baseTags.and(UserAgentTagUtil.getPlatformTag(userAgent));
final Tags tagsWithClientPlatform = tags.and(UserAgentTagUtil.getPlatformTag(context.getClient().getUserAgent()));
final Optional<AtomicInteger> maybeOpenWebSocketCounter;
{
final Tags tagsWithAdditionalSpecifiers = tagsWithClientPlatform
.and(UserAgentTagUtil.getClientVersionTag(userAgent, clientReleaseManager)
.map(Tags::of)
.orElseGet(Tags::empty))
.and(UserAgentTagUtil.getAdditionalSpecifierTags(userAgent));
Metrics.counter(newConnectionCounterName, tagsWithClientPlatform).increment();
maybeOpenWebSocketCounter = getCounter(tagsWithAdditionalSpecifiers);
}
maybeOpenWebSocketCounter.ifPresent(AtomicInteger::incrementAndGet);
totalConnections.incrementAndGet();
Metrics.counter(NEW_CONNECTION_COUNTER_NAME, tagsWithClientPlatform).increment();
context.addWebsocketClosedListener((_, statusCode, _) -> {
sample.stop(Timer.builder(durationTimerName)
sample.stop(Timer.builder(SESSION_DURATION_TIMER_NAME)
.tags(tagsWithClientPlatform)
.register(Metrics.globalRegistry));
openWebSocketCounter.decrementAndGet();
maybeOpenWebSocketCounter.ifPresent(AtomicInteger::decrementAndGet);
totalConnections.decrementAndGet();
Metrics.counter(WEBSOCKET_CLOSED_COUNTER_NAME, tagsWithClientPlatform.and("status", String.valueOf(statusCode)))
Metrics.counter(WEB_SOCKET_CLOSED_COUNTER_NAME, tagsWithClientPlatform.and("status", String.valueOf(statusCode)))
.increment();
});
}
private Optional<AtomicInteger> getCounter(final Tags tags) {
// Make a reasonable effort to avoid creating new counters if we're already full
return openWebsocketsByTags.size() >= MAX_COUNTERS
? Optional.ofNullable(openWebsocketsByTags.get(tags))
: Optional.of(openWebsocketsByTags.computeIfAbsent(tags,
t -> Metrics.gauge(OPEN_WEBSOCKET_GAUGE_NAME, t, new AtomicInteger(0))));
}
}

View File

@@ -5,12 +5,8 @@
package org.whispersystems.textsecuregcm.websocket;
import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Tags;
import java.util.Optional;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.AuthenticatedDevice;
@@ -35,13 +31,6 @@ import reactor.core.scheduler.Scheduler;
public class AuthenticatedConnectListener implements WebSocketConnectListener {
private static final String OPEN_WEBSOCKET_GAUGE_NAME = name(AuthenticatedConnectListener.class, "openWebsockets");
private static final String NEW_CONNECTION_COUNTER_NAME = name(AuthenticatedConnectListener.class, "newConnections");
private static final String CONNECTED_DURATION_TIMER_NAME =
name(AuthenticatedConnectListener.class, "connectedDuration");
private static final String AUTHENTICATED_TAG_NAME = "authenticated";
private static final Logger log = LoggerFactory.getLogger(AuthenticatedConnectListener.class);
private final AccountsManager accountsManager;
@@ -72,6 +61,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
this(accountsManager,
disconnectionRequestManager,
clientReleaseManager,
(account, device, client) -> new WebSocketConnection(receiptSender,
messagesManager,
messageMetrics,
@@ -83,26 +73,22 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
messageDeliveryScheduler,
clientReleaseManager,
messageDeliveryLoopMonitor,
experimentEnrollmentManager),
authenticated -> new OpenWebSocketCounter(OPEN_WEBSOCKET_GAUGE_NAME,
NEW_CONNECTION_COUNTER_NAME,
CONNECTED_DURATION_TIMER_NAME,
Tags.of(AUTHENTICATED_TAG_NAME, String.valueOf(authenticated)))
experimentEnrollmentManager)
);
}
@VisibleForTesting AuthenticatedConnectListener(
final AccountsManager accountsManager,
final DisconnectionRequestManager disconnectionRequestManager,
final WebSocketConnectionBuilder webSocketConnectionBuilder,
final Function<Boolean, OpenWebSocketCounter> openWebSocketCounterBuilder) {
final ClientReleaseManager clientReleaseManager,
final WebSocketConnectionBuilder webSocketConnectionBuilder) {
this.accountsManager = accountsManager;
this.disconnectionRequestManager = disconnectionRequestManager;
this.webSocketConnectionBuilder = webSocketConnectionBuilder;
openAuthenticatedWebSocketCounter = openWebSocketCounterBuilder.apply(true);
openUnauthenticatedWebSocketCounter = openWebSocketCounterBuilder.apply(false);
this.openAuthenticatedWebSocketCounter = new OpenWebSocketCounter("rpc-authenticated", clientReleaseManager);
this.openUnauthenticatedWebSocketCounter = new OpenWebSocketCounter("rpc-unauthenticated", clientReleaseManager);
}
@Override

View File

@@ -18,9 +18,9 @@ import org.whispersystems.textsecuregcm.auth.AuthenticatedDevice;
import org.whispersystems.textsecuregcm.controllers.ProvisioningController;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.entities.ProvisioningMessage;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.metrics.OpenWebSocketCounter;
import org.whispersystems.textsecuregcm.push.ProvisioningManager;
import org.whispersystems.textsecuregcm.storage.ClientReleaseManager;
import org.whispersystems.textsecuregcm.storage.PubSubProtos;
import org.whispersystems.textsecuregcm.util.HeaderUtils;
import org.whispersystems.websocket.session.WebSocketSessionContext;
@@ -48,14 +48,13 @@ public class ProvisioningConnectListener implements WebSocketConnectListener {
private final Duration timeout;
public ProvisioningConnectListener(final ProvisioningManager provisioningManager,
final ClientReleaseManager clientReleaseManager,
final ScheduledExecutorService timeoutExecutor,
final Duration timeout) {
this.provisioningManager = provisioningManager;
this.timeoutExecutor = timeoutExecutor;
this.timeout = timeout;
this.openWebSocketCounter = new OpenWebSocketCounter(MetricsUtil.name(getClass(), "openWebsockets"),
MetricsUtil.name(getClass(), "newConnections"),
MetricsUtil.name(getClass(), "sessionDuration"));
this.openWebSocketCounter = new OpenWebSocketCounter("provisioning", clientReleaseManager);
}
@Override
@@ -67,7 +66,7 @@ public class ProvisioningConnectListener implements WebSocketConnectListener {
final String provisioningAddress = generateProvisioningAddress();
context.addWebsocketClosedListener((context1, statusCode, reason) -> {
context.addWebsocketClosedListener((_, _, _) -> {
provisioningManager.removeListener(provisioningAddress);
timeoutFuture.cancel(false);
});
@@ -78,7 +77,7 @@ public class ProvisioningConnectListener implements WebSocketConnectListener {
final Optional<byte[]> body = Optional.of(message.getContent().toByteArray());
context.getClient().sendRequest("PUT", "/v1/message", List.of(HeaderUtils.getTimestampHeader()), body)
.whenComplete((ignored, throwable) -> context.getClient().close(1000, "Closed"));
.whenComplete((_, _) -> context.getClient().close(1000, "Closed"));
});
context.getClient().sendRequest("PUT", "/v1/address", List.of(HeaderUtils.getTimestampHeader()),