Introduce CallQualitySurveyManager

This commit is contained in:
Jon Chambers
2025-10-09 13:07:21 -04:00
committed by Jon Chambers
parent c9760f4c38
commit c68e3103c4
9 changed files with 463 additions and 0 deletions

View File

@@ -21,6 +21,7 @@ import org.whispersystems.textsecuregcm.configuration.AppleDeviceCheckConfigurat
import org.whispersystems.textsecuregcm.configuration.AwsCredentialsProviderFactory;
import org.whispersystems.textsecuregcm.configuration.BadgesConfiguration;
import org.whispersystems.textsecuregcm.configuration.BraintreeConfiguration;
import org.whispersystems.textsecuregcm.configuration.CallQualitySurveyConfiguration;
import org.whispersystems.textsecuregcm.configuration.Cdn3StorageManagerConfiguration;
import org.whispersystems.textsecuregcm.configuration.CdnConfiguration;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
@@ -353,6 +354,11 @@ public class WhisperServerConfiguration extends Configuration {
@JsonProperty
private S3ObjectMonitorFactory asnTable;
@Valid
@NotNull
@JsonProperty
private CallQualitySurveyConfiguration callQualitySurvey;
public TlsKeyStoreConfiguration getTlsKeyStoreConfiguration() {
return tlsKeyStore;
}
@@ -591,4 +597,8 @@ public class WhisperServerConfiguration extends Configuration {
public S3ObjectMonitorFactory getAsnTableConfiguration() {
return asnTable;
}
public CallQualitySurveyConfiguration getCallQualitySurveyConfiguration() {
return callQualitySurvey;
}
}

View File

@@ -179,6 +179,7 @@ import org.whispersystems.textsecuregcm.mappers.RegistrationServiceSenderExcepti
import org.whispersystems.textsecuregcm.mappers.ServerRejectedExceptionMapper;
import org.whispersystems.textsecuregcm.mappers.SubscriptionExceptionMapper;
import org.whispersystems.textsecuregcm.metrics.BackupMetrics;
import org.whispersystems.textsecuregcm.metrics.CallQualitySurveyManager;
import org.whispersystems.textsecuregcm.metrics.MessageMetrics;
import org.whispersystems.textsecuregcm.metrics.MetricsApplicationEventListener;
import org.whispersystems.textsecuregcm.metrics.MetricsHttpChannelListener;
@@ -582,6 +583,10 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
"disconnectionRequest",
config.getVirtualThreadConfiguration().maxConcurrentThreadsPerExecutor(),
environment);
ExecutorService callQualitySurveyPubSubExecutor = ManagedExecutors.newVirtualThreadPerTaskExecutor(
"callQualitySurvey",
config.getVirtualThreadConfiguration().maxConcurrentThreadsPerExecutor(),
environment);
ScheduledExecutorService cloudflareTurnRetryExecutor = ScheduledExecutorServiceBuilder.of(environment, "cloudflareTurnRetry").threads(1).build();
ScheduledExecutorService messagePollExecutor = ScheduledExecutorServiceBuilder.of(environment, "messagePollExecutor").threads(1).build();
@@ -690,6 +695,10 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
config.getDynamoDbTables().getSubscriptions().getTableName(), dynamoDbAsyncClient);
MessageDeliveryLoopMonitor messageDeliveryLoopMonitor =
config.logMessageDeliveryLoops() ? new RedisMessageDeliveryLoopMonitor(rateLimitersCluster) : new NoopMessageDeliveryLoopMonitor();
CallQualitySurveyManager callQualitySurveyManager = new CallQualitySurveyManager(asnInfoProviderSupplier,
config.getCallQualitySurveyConfiguration().pubSubPublisher().build(),
Clock.systemUTC(),
callQualitySurveyPubSubExecutor);
final RegistrationLockVerificationManager registrationLockVerificationManager = new RegistrationLockVerificationManager(
accountsManager, disconnectionRequestManager, svr2CredentialsGenerator, registrationRecoveryPasswordsManager,

View File

@@ -0,0 +1,12 @@
/*
* Copyright 2025 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.configuration;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
public record CallQualitySurveyConfiguration (@Valid @NotNull PubSubPublisherFactory pubSubPublisher) {
}

View File

@@ -0,0 +1,135 @@
/*
* Copyright 2025 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.metrics;
import com.google.cloud.pubsub.v1.PublisherInterface;
import com.google.pubsub.v1.PubsubMessage;
import io.micrometer.core.instrument.Metrics;
import java.time.Clock;
import java.util.Locale;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.signal.calling.survey.CallQualitySurveyResponsePubSubMessage;
import org.signal.chat.calling.quality.SubmitCallQualitySurveyRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.asn.AsnInfoProvider;
import org.whispersystems.textsecuregcm.util.GoogleApiUtil;
import org.whispersystems.textsecuregcm.util.ua.UnrecognizedUserAgentException;
import org.whispersystems.textsecuregcm.util.ua.UserAgent;
import org.whispersystems.textsecuregcm.util.ua.UserAgentUtil;
public class CallQualitySurveyManager {
private final Supplier<AsnInfoProvider> asnInfoProviderSupplier;
private final PublisherInterface pubSubPublisher;
private final Clock clock;
private final Executor pubSubCallbackExecutor;
private final String PUB_SUB_MESSAGE_COUNTER_NAME = MetricsUtil.name(CallQualitySurveyManager.class, "pubSubMessage");
private static final Logger logger = LoggerFactory.getLogger(CallQualitySurveyManager.class);
public CallQualitySurveyManager(final Supplier<AsnInfoProvider> asnInfoProviderSupplier,
final PublisherInterface pubSubPublisher,
final Clock clock,
final Executor pubSubCallbackExecutor) {
this.asnInfoProviderSupplier = asnInfoProviderSupplier;
this.pubSubPublisher = pubSubPublisher;
this.clock = clock;
this.pubSubCallbackExecutor = pubSubCallbackExecutor;
}
public void submitCallQualitySurvey(final SubmitCallQualitySurveyRequest submitCallQualitySurveyRequest,
final String remoteAddress,
final String userAgentString) {
final CallQualitySurveyResponsePubSubMessage.Builder pubSubMessageBuilder =
CallQualitySurveyResponsePubSubMessage.newBuilder()
.setResponseId(UUID.randomUUID().toString())
.setSubmissionTimestamp(clock.millis() * 1000);
try {
final UserAgent userAgent = UserAgentUtil.parseUserAgentString(userAgentString);
pubSubMessageBuilder.setClientPlatform(userAgent.platform().name().toLowerCase(Locale.ROOT));
pubSubMessageBuilder.setClientVersion(userAgent.version().toString());
if (StringUtils.isNotBlank(userAgent.additionalSpecifiers())) {
pubSubMessageBuilder.setClientUaAdditionalSpecifiers(userAgent.additionalSpecifiers());
}
} catch (final UnrecognizedUserAgentException _) {
}
asnInfoProviderSupplier.get().lookup(remoteAddress)
.ifPresent(asnInfo -> pubSubMessageBuilder.setAsnRegion(asnInfo.regionCode()));
if (submitCallQualitySurveyRequest.hasUserSatisfied()) {
pubSubMessageBuilder.setUserSatisfied(submitCallQualitySurveyRequest.getUserSatisfied());
}
pubSubMessageBuilder.addAllCallQualityIssues(submitCallQualitySurveyRequest.getCallQualityIssuesList());
if (submitCallQualitySurveyRequest.hasAdditionalIssuesDescription()) {
pubSubMessageBuilder.setAdditionalIssuesDescription(submitCallQualitySurveyRequest.getAdditionalIssuesDescription());
}
if (submitCallQualitySurveyRequest.hasDebugLogUrl()) {
pubSubMessageBuilder.setDebugLogUrl(submitCallQualitySurveyRequest.getDebugLogUrl());
}
if (submitCallQualitySurveyRequest.hasStartTimestamp()) {
pubSubMessageBuilder.setStartTimestamp(submitCallQualitySurveyRequest.getStartTimestamp());
}
if (submitCallQualitySurveyRequest.hasEndTimestamp()) {
pubSubMessageBuilder.setEndTimestamp(submitCallQualitySurveyRequest.getEndTimestamp());
}
if (submitCallQualitySurveyRequest.hasCallType()) {
pubSubMessageBuilder.setCallType(submitCallQualitySurveyRequest.getCallType());
}
if (submitCallQualitySurveyRequest.hasSuccess()) {
pubSubMessageBuilder.setSuccess(submitCallQualitySurveyRequest.getSuccess());
}
if (submitCallQualitySurveyRequest.hasCallEndReason()) {
pubSubMessageBuilder.setCallEndReason(submitCallQualitySurveyRequest.getCallEndReason());
}
if (submitCallQualitySurveyRequest.hasRttMedian()) {
pubSubMessageBuilder.setRttMedian(submitCallQualitySurveyRequest.getRttMedian());
}
if (submitCallQualitySurveyRequest.hasJitterMedian()) {
pubSubMessageBuilder.setJitterMedian(submitCallQualitySurveyRequest.getJitterMedian());
}
if (submitCallQualitySurveyRequest.hasPacketLossFraction()) {
pubSubMessageBuilder.setPacketLossFraction(submitCallQualitySurveyRequest.getPacketLossFraction());
}
if (submitCallQualitySurveyRequest.hasCallTelemetry()) {
pubSubMessageBuilder.setCallTelemetry(submitCallQualitySurveyRequest.getCallTelemetry());
}
GoogleApiUtil.toCompletableFuture(pubSubPublisher.publish(PubsubMessage.newBuilder()
.setData(pubSubMessageBuilder.build().toByteString())
.build()), pubSubCallbackExecutor)
.whenComplete((_, throwable) -> {
if (throwable != null) {
logger.warn("Failed to publish call quality survey pub/sub message", throwable);
}
Metrics.counter(PUB_SUB_MESSAGE_COUNTER_NAME, "success", String.valueOf(throwable == null))
.increment();
});
}
}