diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index b034a2c9c..de5317998 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -737,7 +737,6 @@ public class WhisperServerService extends Application dynamicConfigurationManager.getConfiguration().getVirtualThreads().allowedPinEvents(), config.getVirtualThreadConfiguration().pinEventThreshold()); StripeManager stripeManager = new StripeManager(config.getStripe().apiKey().value(), subscriptionProcessorExecutor, diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicConfiguration.java index 89622b369..79c3b211f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicConfiguration.java @@ -48,10 +48,6 @@ public class DynamicConfiguration { @Valid DynamicRegistrationConfiguration registrationConfiguration = new DynamicRegistrationConfiguration(false); - @JsonProperty - @Valid - DynamicVirtualThreadConfiguration virtualThreads = new DynamicVirtualThreadConfiguration(Collections.emptySet()); - @JsonProperty @Valid DynamicMetricsConfiguration metricsConfiguration = new DynamicMetricsConfiguration(false, false); @@ -102,10 +98,6 @@ public class DynamicConfiguration { return registrationConfiguration; } - public DynamicVirtualThreadConfiguration getVirtualThreads() { - return virtualThreads; - } - public DynamicMetricsConfiguration getMetricsConfiguration() { return metricsConfiguration; } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicVirtualThreadConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicVirtualThreadConfiguration.java deleted file mode 100644 index 045efa5e8..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicVirtualThreadConfiguration.java +++ /dev/null @@ -1,10 +0,0 @@ -/* - * Copyright 2024 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.configuration.dynamic; - -import java.util.Set; - -public record DynamicVirtualThreadConfiguration(Set allowedPinEvents) {} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/util/VirtualThreadPinEventMonitor.java b/service/src/main/java/org/whispersystems/textsecuregcm/util/VirtualThreadPinEventMonitor.java index 0c3358375..5edcd6cdb 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/util/VirtualThreadPinEventMonitor.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/util/VirtualThreadPinEventMonitor.java @@ -12,6 +12,7 @@ import java.time.Duration; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; import jdk.jfr.consumer.RecordedEvent; @@ -35,36 +36,32 @@ public class VirtualThreadPinEventMonitor implements Managed { private static final long MAX_JFR_REPOSITORY_SIZE = 1024 * 1024 * 4L; // 4MiB private final ExecutorService executorService; - private final Supplier> allowList; private final Duration pinEventThreshold; private final RecordingStream recordingStream; - private final BiConsumer pinEventConsumer; + private final Consumer pinEventConsumer; @VisibleForTesting VirtualThreadPinEventMonitor( final ExecutorService executorService, - final Supplier> allowList, final Duration pinEventThreshold, - final BiConsumer pinEventConsumer) { + final Consumer pinEventConsumer) { this.executorService = executorService; - this.allowList = allowList; this.pinEventThreshold = pinEventThreshold; this.pinEventConsumer = pinEventConsumer; this.recordingStream = new RecordingStream(); } public VirtualThreadPinEventMonitor( final ExecutorService executorService, - final Supplier> allowList, final Duration pinEventThreshold) { - this(executorService, allowList, pinEventThreshold, VirtualThreadPinEventMonitor::processPinEvent); + this(executorService, pinEventThreshold, VirtualThreadPinEventMonitor::processPinEvent); } @Override public void start() { recordingStream.setMaxSize(MAX_JFR_REPOSITORY_SIZE); recordingStream.enable(JFR_THREAD_PINNED_EVENT_NAME).withThreshold(pinEventThreshold).withStackTrace(); - recordingStream.onEvent(JFR_THREAD_PINNED_EVENT_NAME, event -> pinEventConsumer.accept(event, allowed(event))); + recordingStream.onEvent(JFR_THREAD_PINNED_EVENT_NAME, pinEventConsumer); executorService.submit(recordingStream::start); } @@ -76,30 +73,9 @@ public class VirtualThreadPinEventMonitor implements Managed { recordingStream.close(); } - private static void processPinEvent(final RecordedEvent event, final boolean allowedPinEvent) { - if (allowedPinEvent) { - logger.info("Long allowed virtual thread pin event detected {}", prettyEventString(event)); - } else { - logger.error("Long forbidden virtual thread pin event detected {}", prettyEventString(event)); - } - Metrics.counter(PIN_COUNTER_NAME, "allowed", String.valueOf(allowedPinEvent)).increment(); - } - - private boolean allowed(final RecordedEvent event) { - final Set allowedMethodFrames = allowList.get(); - if (event.getStackTrace() == null) { - return false; - } - for (RecordedFrame st : event.getStackTrace().getFrames()) { - if (!st.isJavaFrame()) { - continue; - } - final String qualifiedName = "%s.%s".formatted(st.getMethod().getType().getName(), st.getMethod().getName()); - if (allowedMethodFrames.stream().anyMatch(qualifiedName::contains)) { - return true; - } - } - return false; + private static void processPinEvent(final RecordedEvent event) { + logger.info("Long virtual thread pin event detected {}", prettyEventString(event)); + Metrics.counter(PIN_COUNTER_NAME).increment(); } private static String prettyEventString(final RecordedEvent event) { diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/util/VirtualThreadPinEventMonitorTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/util/VirtualThreadPinEventMonitorTest.java index 137d50723..97941d7f8 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/util/VirtualThreadPinEventMonitorTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/util/VirtualThreadPinEventMonitorTest.java @@ -7,97 +7,74 @@ package org.whispersystems.textsecuregcm.util; import static org.assertj.core.api.Assertions.assertThat; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; import jdk.jfr.consumer.RecordedEvent; import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.signal.libsignal.protocol.InvalidKeyException; +import org.signal.libsignal.protocol.ecc.Curve; +import org.testcontainers.shaded.org.bouncycastle.jce.interfaces.ECPublicKey; public class VirtualThreadPinEventMonitorTest { - private void synchronizedSleep1() { - synchronized (this) { - try { - Thread.sleep(2); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - } - private void synchronizedSleep2() { - synchronized (this) { - try { - Thread.sleep(2); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + private static void nativeMethodCall() { + try { + Curve.decodePoint(Curve.generateKeyPair().getPublicKey().serialize(), 0); + } catch (InvalidKeyException e) { + throw new RuntimeException(e); } } @Test - @Disabled("flaky: no way to ensure the sequencing between the start of the recording stream and emitting the event") + @Disabled("flaky: no way to ensure the sequencing between the start of the recording stream and emitting the event, event detection is timing based") public void testPinEventProduced() throws InterruptedException, ExecutionException { - final BlockingQueue> bq = new LinkedBlockingQueue<>(); + final BlockingQueue bq = new LinkedBlockingQueue<>(); final ExecutorService exec = Executors.newVirtualThreadPerTaskExecutor(); VirtualThreadPinEventMonitor eventMonitor = queueingLogger(exec, Set.of(), bq); eventMonitor.start(); // give start a moment to begin the event stream thread Thread.sleep(100); - exec.submit(() -> synchronizedSleep1()).get(); - eventMonitor.stop(); - final Pair event = bq.poll(1, TimeUnit.SECONDS); - assertThat(event).isNotNull(); - assertThat(event.getRight()).isFalse(); - assertThat(bq.isEmpty()); + final List> futures = IntStream + .range(0, 100) + .mapToObj(ig -> exec.submit(() -> IntStream + .range(0, 100) + .forEach(i -> nativeMethodCall()))) + .toList(); + for (final Future f : futures) { + f.get(); + } + Thread.sleep(1000); + eventMonitor.stop(); + assertThat(bq.isEmpty()).isFalse(); exec.shutdown(); exec.awaitTermination(1, TimeUnit.MILLISECONDS); } - @ParameterizedTest - @ValueSource(strings = {"VirtualThreadPinEventMonitorTest.synchronizedSleep1", "synchronizedSleep1"}) - @Disabled("flaky: no way to ensure the sequencing between the start of the recording stream and emitting the event") - public void testPinEventFiltered(final String allowString) throws InterruptedException, ExecutionException { - final BlockingQueue> bq = new LinkedBlockingQueue<>(); - final ExecutorService exec = Executors.newVirtualThreadPerTaskExecutor(); - final VirtualThreadPinEventMonitor eventMonitor = queueingLogger(exec, Set.of(allowString), bq); - eventMonitor.start(); - // give start a moment to begin the event stream thread - Thread.sleep(100); - exec.submit(() -> synchronizedSleep1()).get(); - exec.submit(() -> synchronizedSleep2()).get(); - eventMonitor.stop(); - - final Pair sleep1Event = bq.poll(1, TimeUnit.SECONDS); - final Pair sleep2Event = bq.poll(1, TimeUnit.SECONDS); - assertThat(sleep1Event).isNotNull(); - assertThat(sleep2Event).isNotNull(); - assertThat(sleep1Event.getRight()).isTrue(); - assertThat(sleep2Event.getRight()).isFalse(); - assertThat(bq.isEmpty()); - exec.shutdown(); - exec.awaitTermination(1, TimeUnit.MILLISECONDS); - } private static VirtualThreadPinEventMonitor queueingLogger( final ExecutorService exec, final Set allowedMethods, - final BlockingQueue> bq) { + final BlockingQueue bq) { return new VirtualThreadPinEventMonitor(exec, - () -> allowedMethods, - Duration.ofMillis(1), - (event, allowed) -> { + Duration.ofNanos(0), + event -> { try { - bq.put(Pair.of(event, allowed)); + bq.put(event); } catch (InterruptedException e) { throw new RuntimeException(e); }