Remove VirtualThreadPinEventMonitor

This commit is contained in:
Ravi Khadiwala
2024-01-30 13:49:12 -06:00
parent 2c1e7e5ed6
commit b924dea045
7 changed files with 1 additions and 250 deletions

View File

@@ -56,7 +56,6 @@ import org.whispersystems.textsecuregcm.configuration.SubscriptionConfiguration;
import org.whispersystems.textsecuregcm.configuration.TlsKeyStoreConfiguration;
import org.whispersystems.textsecuregcm.configuration.TurnSecretConfiguration;
import org.whispersystems.textsecuregcm.configuration.UnidentifiedDeliveryConfiguration;
import org.whispersystems.textsecuregcm.configuration.VirtualThreadConfiguration;
import org.whispersystems.textsecuregcm.configuration.ZkConfig;
import org.whispersystems.textsecuregcm.limits.RateLimiterConfig;
import org.whispersystems.websocket.configuration.WebSocketConfiguration;
@@ -317,11 +316,6 @@ public class WhisperServerConfiguration extends Configuration {
@JsonProperty
private LinkDeviceSecretConfiguration linkDevice;
@Valid
@NotNull
@JsonProperty
private VirtualThreadConfiguration virtualThreadConfiguration = new VirtualThreadConfiguration(Duration.ofMillis(1));
public TlsKeyStoreConfiguration getTlsKeyStoreConfiguration() {
return tlsKeyStore;
}
@@ -533,8 +527,4 @@ public class WhisperServerConfiguration extends Configuration {
public LinkDeviceSecretConfiguration getLinkDeviceSecretConfiguration() {
return linkDevice;
}
public VirtualThreadConfiguration getVirtualThreadConfiguration() {
return virtualThreadConfiguration;
}
}

View File

@@ -212,7 +212,6 @@ import org.whispersystems.textsecuregcm.util.DynamoDbFromConfig;
import org.whispersystems.textsecuregcm.util.ManagedAwsCrt;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.UsernameHashZkProofVerifier;
import org.whispersystems.textsecuregcm.util.VirtualThreadPinEventMonitor;
import org.whispersystems.textsecuregcm.util.logging.LoggingUnhandledExceptionMapper;
import org.whispersystems.textsecuregcm.util.logging.UncaughtExceptionHandler;
import org.whispersystems.textsecuregcm.websocket.AuthenticatedConnectListener;
@@ -435,8 +434,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
.executorService(name(getClass(), "secureValueRecoveryService-%d")).maxThreads(1).minThreads(1).build();
ExecutorService storageServiceExecutor = environment.lifecycle()
.executorService(name(getClass(), "storageService-%d")).maxThreads(1).minThreads(1).build();
ExecutorService virtualThreadEventLoggerExecutor = environment.lifecycle()
.executorService(name(getClass(), "virtualThreadEventLogger-%d")).minThreads(1).maxThreads(1).build();
ScheduledExecutorService secureValueRecoveryServiceRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "secureValueRecoveryServiceRetry-%d")).threads(1).build();
ScheduledExecutorService storageServiceRetryExecutor = environment.lifecycle()
@@ -632,10 +629,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
CoinMarketCapClient coinMarketCapClient = new CoinMarketCapClient(currencyClient, config.getPaymentsServiceConfiguration().coinMarketCapApiKey().value(), config.getPaymentsServiceConfiguration().coinMarketCapCurrencyIds());
CurrencyConversionManager currencyManager = new CurrencyConversionManager(fixerClient, coinMarketCapClient,
cacheCluster, config.getPaymentsServiceConfiguration().paymentCurrencies(), recurringJobExecutor, Clock.systemUTC());
VirtualThreadPinEventMonitor virtualThreadPinEventMonitor = new VirtualThreadPinEventMonitor(
virtualThreadEventLoggerExecutor,
() -> dynamicConfigurationManager.getConfiguration().getVirtualThreads().allowedPinEvents(),
config.getVirtualThreadConfiguration().pinEventThreshold());
environment.lifecycle().manage(apnSender);
environment.lifecycle().manage(apnPushNotificationScheduler);
@@ -645,7 +638,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
environment.lifecycle().manage(currencyManager);
environment.lifecycle().manage(registrationServiceClient);
environment.lifecycle().manage(clientReleaseManager);
environment.lifecycle().manage(virtualThreadPinEventMonitor);
final RegistrationCaptchaManager registrationCaptchaManager = new RegistrationCaptchaManager(captchaChecker,
rateLimiters, config.getTestDevices(), dynamicConfigurationManager);

View File

@@ -1,9 +0,0 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.configuration;
import java.time.Duration;
public record VirtualThreadConfiguration(Duration pinEventThreshold) {}

View File

@@ -55,13 +55,10 @@ public class DynamicConfiguration {
@Valid
DynamicInboundMessageByteLimitConfiguration inboundMessageByteLimit = new DynamicInboundMessageByteLimitConfiguration(true);
@JsonProperty
@Valid
DynamicRegistrationConfiguration registrationConfiguration = new DynamicRegistrationConfiguration(false);
@JsonProperty
@Valid
DynamicVirtualThreadConfiguration virtualThreads = new DynamicVirtualThreadConfiguration(Collections.emptySet());
DynamicRegistrationConfiguration registrationConfiguration = new DynamicRegistrationConfiguration(false);
public Optional<DynamicExperimentEnrollmentConfiguration> getExperimentEnrollmentConfiguration(
final String experimentName) {
@@ -108,9 +105,4 @@ public class DynamicConfiguration {
public DynamicRegistrationConfiguration getRegistrationConfiguration() {
return registrationConfiguration;
}
public DynamicVirtualThreadConfiguration getVirtualThreads() {
return virtualThreads;
}
}

View File

@@ -1,10 +0,0 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.configuration.dynamic;
import java.util.Set;
public record DynamicVirtualThreadConfiguration(Set<String> allowedPinEvents) {}

View File

@@ -1,97 +0,0 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.util;
import com.google.common.annotations.VisibleForTesting;
import io.dropwizard.lifecycle.Managed;
import io.micrometer.core.instrument.Metrics;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import jdk.jfr.consumer.RecordedEvent;
import jdk.jfr.consumer.RecordedFrame;
import jdk.jfr.consumer.RecordingStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
/**
* Watches for JFR events indicating that a virtual thread was pinned
*/
public class VirtualThreadPinEventMonitor implements Managed {
private static final Logger logger = LoggerFactory.getLogger(VirtualThreadPinEventMonitor.class);
private static final String PIN_COUNTER_NAME = MetricsUtil.name(VirtualThreadPinEventMonitor.class, "virtualThreadPinned");
private static final String JFR_THREAD_PINNED_EVENT_NAME = "jdk.VirtualThreadPinned";
private static final long MAX_JFR_REPOSITORY_SIZE = 1024 * 1024 * 4L; // 4MiB
private final ExecutorService executorService;
private final Supplier<Set<String>> allowList;
private final Duration pinEventThreshold;
private final RecordingStream recordingStream;
private BiConsumer<RecordedEvent, Boolean> pinEventConsumer;
@VisibleForTesting
VirtualThreadPinEventMonitor(
final ExecutorService executorService,
final Supplier<Set<String>> allowList,
final Duration pinEventThreshold,
final BiConsumer<RecordedEvent, Boolean> pinEventConsumer) {
this.executorService = executorService;
this.allowList = allowList;
this.pinEventThreshold = pinEventThreshold;
this.pinEventConsumer = pinEventConsumer;
this.recordingStream = new RecordingStream();
}
public VirtualThreadPinEventMonitor(
final ExecutorService executorService,
final Supplier<Set<String>> allowList,
final Duration pinEventThreshold) {
this(executorService, allowList, pinEventThreshold, VirtualThreadPinEventMonitor::processPinEvent);
}
@Override
public void start() {
recordingStream.setMaxSize(MAX_JFR_REPOSITORY_SIZE);
recordingStream.enable(JFR_THREAD_PINNED_EVENT_NAME).withThreshold(pinEventThreshold).withStackTrace();
recordingStream.onEvent(event -> pinEventConsumer.accept(event, allowed(event)));
executorService.submit(() -> recordingStream.start());
}
@Override
public void stop() throws InterruptedException {
// flushes events and waits for callbacks to finish
recordingStream.stop();
// immediately frees all resources
recordingStream.close();
}
private static void processPinEvent(final RecordedEvent event, final boolean allowedPinEvent) {
if (allowedPinEvent) {
logger.info("Long allowed virtual thread pin event detected", event);
} else {
logger.error("Long forbidden virtual thread pin event detected", event);
}
Metrics.counter(PIN_COUNTER_NAME, "allowed", String.valueOf(allowedPinEvent)).increment();
}
private boolean allowed(final RecordedEvent event) {
final Set<String> allowedMethodFrames = allowList.get();
for (RecordedFrame st : event.getStackTrace().getFrames()) {
if (!st.isJavaFrame()) {
continue;
}
final String qualifiedName = "%s.%s".formatted(st.getMethod().getType().getName(), st.getMethod().getName());
if (allowedMethodFrames.stream().anyMatch(qualifiedName::contains)) {
return true;
}
}
return false;
}
}