Use Micrometer rather than Dropwizard for a few metrics

This commit is contained in:
Jonathan Klabunde Tomer
2025-07-28 14:32:42 -07:00
committed by GitHub
parent 00062fdd5c
commit 4f0337021c
4 changed files with 83 additions and 70 deletions

View File

@@ -21,6 +21,7 @@ import io.dropwizard.core.server.DefaultServerFactory;
import io.dropwizard.core.setup.Bootstrap;
import io.dropwizard.core.setup.Environment;
import io.dropwizard.jetty.HttpsConnectorFactory;
import io.dropwizard.lifecycle.setup.LifecycleEnvironment;
import io.grpc.ServerBuilder;
import io.lettuce.core.metrics.MicrometerCommandLatencyRecorder;
import io.lettuce.core.metrics.MicrometerOptions;
@@ -364,8 +365,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
UncaughtExceptionHandler.register();
ScheduledExecutorService dynamicConfigurationExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "dynamicConfiguration-%d")).threads(1).build();
ScheduledExecutorService dynamicConfigurationExecutor = ScheduledExecutorServiceBuilder.of(environment, "dynamicConfiguration")
.threads(1).build();
DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager =
new DynamicConfigurationManager<>(
@@ -415,8 +416,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
BlockingQueue<Runnable> messageDeletionQueue = new LinkedBlockingQueue<>();
Metrics.gaugeCollectionSize(name(getClass(), "messageDeletionQueueSize"), Collections.emptyList(),
messageDeletionQueue);
ExecutorService messageDeletionAsyncExecutor = environment.lifecycle()
.executorService(name(getClass(), "messageDeletionAsyncExecutor-%d"))
ExecutorService messageDeletionAsyncExecutor = ExecutorServiceBuilder.of(environment, "messageDeletionAsyncExecutor")
.minThreads(2)
.maxThreads(2)
.allowCoreThreadTimeOut(true)
@@ -503,76 +503,66 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
Metrics.gaugeCollectionSize(MetricsUtil.name(getClass(), "messageDeliveryQueue"), Collections.emptyList(),
messageDeliveryQueue);
ScheduledExecutorService recurringJobExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "recurringJob-%d")).threads(6).build();
ScheduledExecutorService websocketScheduledExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "websocket-%d")).threads(8).build();
ExecutorService apnSenderExecutor = environment.lifecycle().executorService(name(getClass(), "apnSender-%d"))
ScheduledExecutorService recurringJobExecutor = ScheduledExecutorServiceBuilder.of(environment, "recurringJob").threads(6).build();
ScheduledExecutorService websocketScheduledExecutor = ScheduledExecutorServiceBuilder.of(environment, "websocket").threads(8).build();
ExecutorService apnSenderExecutor = ExecutorServiceBuilder.of(environment, "apnSender")
.maxThreads(1).minThreads(1).build();
ExecutorService fcmSenderExecutor = environment.lifecycle().executorService(name(getClass(), "fcmSender-%d"))
ExecutorService fcmSenderExecutor = ExecutorServiceBuilder.of(environment, "fcmSender")
.maxThreads(32).minThreads(32).workQueue(fcmSenderQueue).build();
ExecutorService secureValueRecoveryServiceExecutor = environment.lifecycle()
.executorService(name(getClass(), "secureValueRecoveryService-%d")).maxThreads(1).minThreads(1).build();
ExecutorService storageServiceExecutor = environment.lifecycle()
.executorService(name(getClass(), "storageService-%d")).maxThreads(1).minThreads(1).build();
ExecutorService virtualThreadEventLoggerExecutor = environment.lifecycle()
.executorService(name(getClass(), "virtualThreadEventLogger-%d")).minThreads(1).maxThreads(1).build();
ExecutorService asyncOperationQueueingExecutor = environment.lifecycle()
.executorService(name(getClass(), "asyncOperationQueueing-%d")).minThreads(1).maxThreads(1).build();
ScheduledExecutorService secureValueRecoveryServiceRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "secureValueRecoveryServiceRetry-%d")).threads(1).build();
ScheduledExecutorService storageServiceRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "storageServiceRetry-%d")).threads(1).build();
ScheduledExecutorService remoteStorageRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "remoteStorageRetry-%d")).threads(1).build();
ScheduledExecutorService registrationIdentityTokenRefreshExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "registrationIdentityTokenRefresh-%d")).threads(1).build();
ExecutorService secureValueRecoveryServiceExecutor = ExecutorServiceBuilder.of(environment, "secureValueRecoveryService")
.maxThreads(1).minThreads(1).build();
ExecutorService storageServiceExecutor = ExecutorServiceBuilder.of(environment, "storageService")
.maxThreads(1).minThreads(1).build();
ExecutorService virtualThreadEventLoggerExecutor = ExecutorServiceBuilder.of(environment, "virtualThreadEventLogger")
.minThreads(1).maxThreads(1).build();
ExecutorService asyncOperationQueueingExecutor = ExecutorServiceBuilder.of(environment, "asyncOperationQueueing")
.minThreads(1).maxThreads(1).build();
ScheduledExecutorService secureValueRecoveryServiceRetryExecutor =
ScheduledExecutorServiceBuilder.of(environment, "secureValueRecoveryServiceRetry").threads(1).build();
ScheduledExecutorService storageServiceRetryExecutor =
ScheduledExecutorServiceBuilder.of(environment, "storageServiceRetry").threads(1).build();
ScheduledExecutorService remoteStorageRetryExecutor =
ScheduledExecutorServiceBuilder.of(environment, "remoteStorageRetry").threads(1).build();
ScheduledExecutorService registrationIdentityTokenRefreshExecutor =
ScheduledExecutorServiceBuilder.of(environment, "registrationIdentityTokenRefresh").threads(1).build();
Scheduler messageDeliveryScheduler = Schedulers.fromExecutorService(
ExecutorServiceMetrics.monitor(Metrics.globalRegistry,
environment.lifecycle().executorService(name(getClass(), "messageDelivery-%d"))
.minThreads(20)
.maxThreads(20)
.workQueue(messageDeliveryQueue)
.build(),
MetricsUtil.name(getClass(), "messageDeliveryExecutor"), MetricsUtil.PREFIX),
ExecutorServiceBuilder.of(environment, "messageDelivery")
.minThreads(20)
.maxThreads(20)
.workQueue(messageDeliveryQueue)
.build(),
"messageDelivery");
// TODO: generally speaking this is a DynamoDB I/O executor for the accounts table; we should eventually have a general executor for speaking to the accounts table, but most of the server is still synchronous so this isn't widely useful yet
ExecutorService batchIdentityCheckExecutor = environment.lifecycle().executorService(name(getClass(), "batchIdentityCheck-%d")).minThreads(32).maxThreads(32).build();
ExecutorService subscriptionProcessorExecutor = environment.lifecycle()
.executorService(name(getClass(), "subscriptionProcessor-%d"))
ExecutorService batchIdentityCheckExecutor = ExecutorServiceBuilder.of(environment, "batchIdentityCheck").minThreads(32).maxThreads(32).build();
ExecutorService subscriptionProcessorExecutor = ExecutorServiceBuilder.of(environment, "subscriptionProcessor")
.maxThreads(availableProcessors) // mostly this is IO bound so tying to number of processors is tenuous at best
.minThreads(availableProcessors) // mostly this is IO bound so tying to number of processors is tenuous at best
.allowCoreThreadTimeOut(true).
build();
ExecutorService receiptSenderExecutor = environment.lifecycle()
.executorService(name(getClass(), "receiptSender-%d"))
ExecutorService receiptSenderExecutor = ExecutorServiceBuilder.of(environment, "receiptSender")
.maxThreads(2)
.minThreads(2)
.workQueue(receiptSenderQueue)
.rejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy())
.build();
ExecutorService registrationCallbackExecutor = environment.lifecycle()
.executorService(name(getClass(), "registration-%d"))
ExecutorService registrationCallbackExecutor = ExecutorServiceBuilder.of(environment, "registration")
.maxThreads(2)
.minThreads(2)
.build();
ExecutorService accountLockExecutor = environment.lifecycle()
.executorService(name(getClass(), "accountLock-%d"))
ExecutorService accountLockExecutor = ExecutorServiceBuilder.of(environment, "accountLock")
.minThreads(8)
.maxThreads(8)
.build();
// unbounded executor (same as cachedThreadPool)
ExecutorService remoteStorageHttpExecutor = environment.lifecycle()
.executorService(name(getClass(), "remoteStorage-%d"))
ExecutorService remoteStorageHttpExecutor = ExecutorServiceBuilder.of(environment, "remoteStorage")
.minThreads(0)
.maxThreads(Integer.MAX_VALUE)
.workQueue(new SynchronousQueue<>())
.keepAliveTime(io.dropwizard.util.Duration.seconds(60L))
.build();
ExecutorService cloudflareTurnHttpExecutor = environment.lifecycle()
.executorService(name(getClass(), "cloudflareTurn-%d"))
ExecutorService cloudflareTurnHttpExecutor = ExecutorServiceBuilder.of(environment, "cloudflareTurn")
.maxThreads(2)
.minThreads(2)
.build();
@@ -585,16 +575,11 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
ExecutorService disconnectionRequestListenerExecutor = environment.lifecycle()
.virtualExecutorService(name(getClass(), "disconnectionRequest-%d"));
ScheduledExecutorService appleAppStoreRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "appleAppStoreRetry-%d")).threads(1).build();
ScheduledExecutorService subscriptionProcessorRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "subscriptionProcessorRetry-%d")).threads(1).build();
ScheduledExecutorService cloudflareTurnRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "cloudflareTurnRetry-%d")).threads(1).build();
ScheduledExecutorService messagePollExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "messagePollExecutor-%d")).threads(1).build();
ScheduledExecutorService provisioningWebsocketTimeoutExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "provisioningWebsocketTimeout-%d")).threads(1).build();
ScheduledExecutorService appleAppStoreRetryExecutor = ScheduledExecutorServiceBuilder.of(environment, "appleAppStoreRetry").threads(1).build();
ScheduledExecutorService subscriptionProcessorRetryExecutor = ScheduledExecutorServiceBuilder.of(environment, "subscriptionProcessorRetry").threads(1).build();
ScheduledExecutorService cloudflareTurnRetryExecutor = ScheduledExecutorServiceBuilder.of(environment, "cloudflareTurnRetry").threads(1).build();
ScheduledExecutorService messagePollExecutor = ScheduledExecutorServiceBuilder.of(environment, "messagePollExecutor").threads(1).build();
ScheduledExecutorService provisioningWebsocketTimeoutExecutor = ScheduledExecutorServiceBuilder.of(environment, "provisioningWebsocketTimeout").threads(1).build();
final ManagedNioEventLoopGroup dnsResolutionEventLoopGroup = new ManagedNioEventLoopGroup();
final DnsNameResolver cloudflareDnsResolver = new DnsNameResolverBuilder(dnsResolutionEventLoopGroup.next())
@@ -922,8 +907,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
noiseWebSocketTlsPrivateKey = null;
}
final ExecutorService noiseWebSocketDelegatedTaskExecutor = environment.lifecycle()
.executorService(name(getClass(), "noiseWebsocketDelegatedTask-%d"))
final ExecutorService noiseWebSocketDelegatedTaskExecutor = ExecutorServiceBuilder.of(environment, "noiseWebsocketDelegatedTask")
.minThreads(8)
.maxThreads(8)
.allowCoreThreadTimeOut(false)
@@ -1227,6 +1211,42 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
});
}
private static class ExecutorServiceBuilder extends io.dropwizard.lifecycle.setup.ExecutorServiceBuilder {
private final String baseName;
public ExecutorServiceBuilder(final LifecycleEnvironment environment, final String baseName) {
super(environment, name(WhisperServerService.class, baseName) + "-%d");
this.baseName = baseName;
}
@Override
public ExecutorService build() {
return ExecutorServiceMetrics.monitor(Metrics.globalRegistry, super.build(), baseName, MetricsUtil.PREFIX);
}
public static ExecutorServiceBuilder of(final Environment environment, final String name) {
return new ExecutorServiceBuilder(environment.lifecycle(), name);
}
}
private static class ScheduledExecutorServiceBuilder extends io.dropwizard.lifecycle.setup.ScheduledExecutorServiceBuilder {
private final String baseName;
public ScheduledExecutorServiceBuilder(final LifecycleEnvironment environment, final String baseName) {
super(environment, name(WhisperServerService.class, baseName) + "-%d", false);
this.baseName = baseName;
}
@Override
public ScheduledExecutorService build() {
return ExecutorServiceMetrics.monitor(Metrics.globalRegistry, super.build(), baseName, MetricsUtil.PREFIX);
}
public static ScheduledExecutorServiceBuilder of(final Environment environment, final String name) {
return new ScheduledExecutorServiceBuilder(environment.lifecycle(), name);
}
}
public static void main(String[] args) throws Exception {
new WhisperServerService().run(args);
}

View File

@@ -6,7 +6,6 @@ package org.whispersystems.textsecuregcm.controllers;
import static com.codahale.metrics.MetricRegistry.name;
import com.codahale.metrics.annotation.Timed;
import com.google.common.net.HttpHeaders;
import io.dropwizard.auth.Auth;
import io.micrometer.core.instrument.Metrics;
@@ -469,7 +468,6 @@ public class MessageController {
}
}
@Timed
@Path("/multi_recipient")
@PUT
@Consumes(MultiRecipientMessageProvider.MEDIA_TYPE)
@@ -756,7 +754,6 @@ public class MessageController {
}
}
@Timed
@GET
@Produces(MediaType.APPLICATION_JSON)
public CompletableFuture<OutgoingMessageEntityList> getPendingMessages(@Auth AuthenticatedDevice auth,
@@ -835,7 +832,6 @@ public class MessageController {
return size;
}
@Timed
@DELETE
@Path("/uuid/{uuid}")
public CompletableFuture<Response> removePendingMessage(@Auth AuthenticatedDevice auth, @PathParam("uuid") UUID uuid) {
@@ -874,7 +870,6 @@ public class MessageController {
.thenApply(Util.ASYNC_EMPTY_RESPONSE);
}
@Timed
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Path("/report/{source}/{messageGuid}")

View File

@@ -14,6 +14,8 @@ import org.eclipse.jetty.util.component.Container;
import org.eclipse.jetty.util.component.LifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.binder.jetty.JettyConnectionMetrics;
/**
* Uses {@link Container.Listener} to update {@link org.eclipse.jetty.server.HttpConfiguration}
@@ -25,7 +27,6 @@ public class JettyHttpConfigurationCustomizer implements Container.Listener, Lif
@Override
public void beanAdded(final Container parent, final Object child) {
if (child instanceof Connector c) {
for (ConnectionFactory cf : c.getConnectionFactories()) {
final HttpConfiguration httpConfiguration = switch (cf) {
case HTTP2ServerConnectionFactory h2cf -> h2cf.getHttpConfiguration();
@@ -39,6 +40,8 @@ public class JettyHttpConfigurationCustomizer implements Container.Listener, Lif
httpConfiguration.setNotifyRemoteAsyncErrors(false);
}
}
c.addBean(new JettyConnectionMetrics(Metrics.globalRegistry));
}
}

View File

@@ -5,8 +5,6 @@
package org.whispersystems.textsecuregcm.push;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
@@ -16,7 +14,6 @@ import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier;
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
@@ -33,9 +30,7 @@ public class ReceiptSender {
final ExecutorService executor) {
this.accountManager = accountManager;
this.messageSender = messageSender;
this.executor = ExecutorServiceMetrics.monitor(
Metrics.globalRegistry, executor, MetricsUtil.name(ReceiptSender.class, "executor"), MetricsUtil.PREFIX)
;
this.executor = executor;
}
public void sendReceipt(ServiceIdentifier sourceIdentifier, byte sourceDeviceId, AciServiceIdentifier destinationIdentifier, long messageId) {