mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-19 10:38:03 +01:00
Use Micrometer rather than Dropwizard for a few metrics
This commit is contained in:
committed by
GitHub
parent
c92a29db1e
commit
2bc91c1f21
@@ -21,6 +21,7 @@ import io.dropwizard.core.server.DefaultServerFactory;
|
||||
import io.dropwizard.core.setup.Bootstrap;
|
||||
import io.dropwizard.core.setup.Environment;
|
||||
import io.dropwizard.jetty.HttpsConnectorFactory;
|
||||
import io.dropwizard.lifecycle.setup.LifecycleEnvironment;
|
||||
import io.grpc.ServerBuilder;
|
||||
import io.lettuce.core.metrics.MicrometerCommandLatencyRecorder;
|
||||
import io.lettuce.core.metrics.MicrometerOptions;
|
||||
@@ -364,8 +365,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
|
||||
UncaughtExceptionHandler.register();
|
||||
|
||||
ScheduledExecutorService dynamicConfigurationExecutor = environment.lifecycle()
|
||||
.scheduledExecutorService(name(getClass(), "dynamicConfiguration-%d")).threads(1).build();
|
||||
ScheduledExecutorService dynamicConfigurationExecutor = ScheduledExecutorServiceBuilder.of(environment, "dynamicConfiguration")
|
||||
.threads(1).build();
|
||||
|
||||
DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager =
|
||||
new DynamicConfigurationManager<>(
|
||||
@@ -397,8 +398,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
|
||||
environment.lifecycle().manage(new ManagedAwsCrt());
|
||||
|
||||
final ExecutorService awsSdkMetricsExecutor = environment.lifecycle()
|
||||
.virtualExecutorService(name(getClass(), "awsSdkMetrics-%d"));
|
||||
final ExecutorService awsSdkMetricsExecutor = virtualExecutorService(environment, "awsSdkMetrics");
|
||||
|
||||
final DynamoDbAsyncClient dynamoDbAsyncClient = config.getDynamoDbClientConfiguration()
|
||||
.buildAsyncClient(awsCredentialsProvider, new MicrometerAwsSdkMetricPublisher(awsSdkMetricsExecutor, "dynamoDbAsync"));
|
||||
@@ -415,8 +415,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
BlockingQueue<Runnable> messageDeletionQueue = new LinkedBlockingQueue<>();
|
||||
Metrics.gaugeCollectionSize(name(getClass(), "messageDeletionQueueSize"), Collections.emptyList(),
|
||||
messageDeletionQueue);
|
||||
ExecutorService messageDeletionAsyncExecutor = environment.lifecycle()
|
||||
.executorService(name(getClass(), "messageDeletionAsyncExecutor-%d"))
|
||||
ExecutorService messageDeletionAsyncExecutor = ExecutorServiceBuilder.of(environment, "messageDeletionAsyncExecutor")
|
||||
.minThreads(2)
|
||||
.maxThreads(2)
|
||||
.allowCoreThreadTimeOut(true)
|
||||
@@ -503,98 +502,79 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
Metrics.gaugeCollectionSize(MetricsUtil.name(getClass(), "messageDeliveryQueue"), Collections.emptyList(),
|
||||
messageDeliveryQueue);
|
||||
|
||||
ScheduledExecutorService recurringJobExecutor = environment.lifecycle()
|
||||
.scheduledExecutorService(name(getClass(), "recurringJob-%d")).threads(6).build();
|
||||
ScheduledExecutorService websocketScheduledExecutor = environment.lifecycle()
|
||||
.scheduledExecutorService(name(getClass(), "websocket-%d")).threads(8).build();
|
||||
ExecutorService apnSenderExecutor = environment.lifecycle().executorService(name(getClass(), "apnSender-%d"))
|
||||
ScheduledExecutorService recurringJobExecutor = ScheduledExecutorServiceBuilder.of(environment, "recurringJob").threads(6).build();
|
||||
ScheduledExecutorService websocketScheduledExecutor = ScheduledExecutorServiceBuilder.of(environment, "websocket").threads(8).build();
|
||||
ExecutorService apnSenderExecutor = ExecutorServiceBuilder.of(environment, "apnSender")
|
||||
.maxThreads(1).minThreads(1).build();
|
||||
ExecutorService fcmSenderExecutor = environment.lifecycle().executorService(name(getClass(), "fcmSender-%d"))
|
||||
ExecutorService fcmSenderExecutor = ExecutorServiceBuilder.of(environment, "fcmSender")
|
||||
.maxThreads(32).minThreads(32).workQueue(fcmSenderQueue).build();
|
||||
ExecutorService secureValueRecoveryServiceExecutor = environment.lifecycle()
|
||||
.executorService(name(getClass(), "secureValueRecoveryService-%d")).maxThreads(1).minThreads(1).build();
|
||||
ExecutorService storageServiceExecutor = environment.lifecycle()
|
||||
.executorService(name(getClass(), "storageService-%d")).maxThreads(1).minThreads(1).build();
|
||||
ExecutorService virtualThreadEventLoggerExecutor = environment.lifecycle()
|
||||
.executorService(name(getClass(), "virtualThreadEventLogger-%d")).minThreads(1).maxThreads(1).build();
|
||||
ExecutorService asyncOperationQueueingExecutor = environment.lifecycle()
|
||||
.executorService(name(getClass(), "asyncOperationQueueing-%d")).minThreads(1).maxThreads(1).build();
|
||||
ScheduledExecutorService secureValueRecoveryServiceRetryExecutor = environment.lifecycle()
|
||||
.scheduledExecutorService(name(getClass(), "secureValueRecoveryServiceRetry-%d")).threads(1).build();
|
||||
ScheduledExecutorService storageServiceRetryExecutor = environment.lifecycle()
|
||||
.scheduledExecutorService(name(getClass(), "storageServiceRetry-%d")).threads(1).build();
|
||||
ScheduledExecutorService remoteStorageRetryExecutor = environment.lifecycle()
|
||||
.scheduledExecutorService(name(getClass(), "remoteStorageRetry-%d")).threads(1).build();
|
||||
ScheduledExecutorService registrationIdentityTokenRefreshExecutor = environment.lifecycle()
|
||||
.scheduledExecutorService(name(getClass(), "registrationIdentityTokenRefresh-%d")).threads(1).build();
|
||||
ExecutorService secureValueRecoveryServiceExecutor = ExecutorServiceBuilder.of(environment, "secureValueRecoveryService")
|
||||
.maxThreads(1).minThreads(1).build();
|
||||
ExecutorService storageServiceExecutor = ExecutorServiceBuilder.of(environment, "storageService")
|
||||
.maxThreads(1).minThreads(1).build();
|
||||
ExecutorService virtualThreadEventLoggerExecutor = ExecutorServiceBuilder.of(environment, "virtualThreadEventLogger")
|
||||
.minThreads(1).maxThreads(1).build();
|
||||
ExecutorService asyncOperationQueueingExecutor = ExecutorServiceBuilder.of(environment, "asyncOperationQueueing")
|
||||
.minThreads(1).maxThreads(1).build();
|
||||
ScheduledExecutorService secureValueRecoveryServiceRetryExecutor =
|
||||
ScheduledExecutorServiceBuilder.of(environment, "secureValueRecoveryServiceRetry").threads(1).build();
|
||||
ScheduledExecutorService storageServiceRetryExecutor =
|
||||
ScheduledExecutorServiceBuilder.of(environment, "storageServiceRetry").threads(1).build();
|
||||
ScheduledExecutorService remoteStorageRetryExecutor =
|
||||
ScheduledExecutorServiceBuilder.of(environment, "remoteStorageRetry").threads(1).build();
|
||||
ScheduledExecutorService registrationIdentityTokenRefreshExecutor =
|
||||
ScheduledExecutorServiceBuilder.of(environment, "registrationIdentityTokenRefresh").threads(1).build();
|
||||
|
||||
Scheduler messageDeliveryScheduler = Schedulers.fromExecutorService(
|
||||
ExecutorServiceMetrics.monitor(Metrics.globalRegistry,
|
||||
environment.lifecycle().executorService(name(getClass(), "messageDelivery-%d"))
|
||||
.minThreads(20)
|
||||
.maxThreads(20)
|
||||
.workQueue(messageDeliveryQueue)
|
||||
.build(),
|
||||
MetricsUtil.name(getClass(), "messageDeliveryExecutor"), MetricsUtil.PREFIX),
|
||||
ExecutorServiceBuilder.of(environment, "messageDelivery")
|
||||
.minThreads(20)
|
||||
.maxThreads(20)
|
||||
.workQueue(messageDeliveryQueue)
|
||||
.build(),
|
||||
"messageDelivery");
|
||||
|
||||
// TODO: generally speaking this is a DynamoDB I/O executor for the accounts table; we should eventually have a general executor for speaking to the accounts table, but most of the server is still synchronous so this isn't widely useful yet
|
||||
ExecutorService batchIdentityCheckExecutor = environment.lifecycle().executorService(name(getClass(), "batchIdentityCheck-%d")).minThreads(32).maxThreads(32).build();
|
||||
ExecutorService subscriptionProcessorExecutor = environment.lifecycle()
|
||||
.executorService(name(getClass(), "subscriptionProcessor-%d"))
|
||||
ExecutorService batchIdentityCheckExecutor = ExecutorServiceBuilder.of(environment, "batchIdentityCheck").minThreads(32).maxThreads(32).build();
|
||||
ExecutorService subscriptionProcessorExecutor = ExecutorServiceBuilder.of(environment, "subscriptionProcessor")
|
||||
.maxThreads(availableProcessors) // mostly this is IO bound so tying to number of processors is tenuous at best
|
||||
.minThreads(availableProcessors) // mostly this is IO bound so tying to number of processors is tenuous at best
|
||||
.allowCoreThreadTimeOut(true).
|
||||
build();
|
||||
ExecutorService receiptSenderExecutor = environment.lifecycle()
|
||||
.executorService(name(getClass(), "receiptSender-%d"))
|
||||
ExecutorService receiptSenderExecutor = ExecutorServiceBuilder.of(environment, "receiptSender")
|
||||
.maxThreads(2)
|
||||
.minThreads(2)
|
||||
.workQueue(receiptSenderQueue)
|
||||
.rejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy())
|
||||
.build();
|
||||
ExecutorService registrationCallbackExecutor = environment.lifecycle()
|
||||
.executorService(name(getClass(), "registration-%d"))
|
||||
ExecutorService registrationCallbackExecutor = ExecutorServiceBuilder.of(environment, "registration")
|
||||
.maxThreads(2)
|
||||
.minThreads(2)
|
||||
.build();
|
||||
ExecutorService accountLockExecutor = environment.lifecycle()
|
||||
.executorService(name(getClass(), "accountLock-%d"))
|
||||
ExecutorService accountLockExecutor = ExecutorServiceBuilder.of(environment, "accountLock")
|
||||
.minThreads(8)
|
||||
.maxThreads(8)
|
||||
.build();
|
||||
// unbounded executor (same as cachedThreadPool)
|
||||
ExecutorService remoteStorageHttpExecutor = environment.lifecycle()
|
||||
.executorService(name(getClass(), "remoteStorage-%d"))
|
||||
ExecutorService remoteStorageHttpExecutor = ExecutorServiceBuilder.of(environment, "remoteStorage")
|
||||
.minThreads(0)
|
||||
.maxThreads(Integer.MAX_VALUE)
|
||||
.workQueue(new SynchronousQueue<>())
|
||||
.keepAliveTime(io.dropwizard.util.Duration.seconds(60L))
|
||||
.build();
|
||||
ExecutorService cloudflareTurnHttpExecutor = environment.lifecycle()
|
||||
.executorService(name(getClass(), "cloudflareTurn-%d"))
|
||||
ExecutorService cloudflareTurnHttpExecutor = ExecutorServiceBuilder.of(environment, "cloudflareTurn")
|
||||
.maxThreads(2)
|
||||
.minThreads(2)
|
||||
.build();
|
||||
ExecutorService googlePlayBillingExecutor = environment.lifecycle()
|
||||
.virtualExecutorService(name(getClass(), "googlePlayBilling-%d"));
|
||||
ExecutorService appleAppStoreExecutor = environment.lifecycle()
|
||||
.virtualExecutorService(name(getClass(), "appleAppStore-%d"));
|
||||
ExecutorService clientEventExecutor = environment.lifecycle()
|
||||
.virtualExecutorService(name(getClass(), "clientEvent-%d"));
|
||||
ExecutorService disconnectionRequestListenerExecutor = environment.lifecycle()
|
||||
.virtualExecutorService(name(getClass(), "disconnectionRequest-%d"));
|
||||
ExecutorService googlePlayBillingExecutor = virtualExecutorService(environment, "googlePlayBilling");
|
||||
ExecutorService appleAppStoreExecutor = virtualExecutorService(environment, "appleAppStore");
|
||||
ExecutorService clientEventExecutor = virtualExecutorService(environment, "clientEvent");
|
||||
ExecutorService disconnectionRequestListenerExecutor = virtualExecutorService(environment, "disconnectionRequest");
|
||||
|
||||
ScheduledExecutorService appleAppStoreRetryExecutor = environment.lifecycle()
|
||||
.scheduledExecutorService(name(getClass(), "appleAppStoreRetry-%d")).threads(1).build();
|
||||
ScheduledExecutorService subscriptionProcessorRetryExecutor = environment.lifecycle()
|
||||
.scheduledExecutorService(name(getClass(), "subscriptionProcessorRetry-%d")).threads(1).build();
|
||||
ScheduledExecutorService cloudflareTurnRetryExecutor = environment.lifecycle()
|
||||
.scheduledExecutorService(name(getClass(), "cloudflareTurnRetry-%d")).threads(1).build();
|
||||
ScheduledExecutorService messagePollExecutor = environment.lifecycle()
|
||||
.scheduledExecutorService(name(getClass(), "messagePollExecutor-%d")).threads(1).build();
|
||||
ScheduledExecutorService provisioningWebsocketTimeoutExecutor = environment.lifecycle()
|
||||
.scheduledExecutorService(name(getClass(), "provisioningWebsocketTimeout-%d")).threads(1).build();
|
||||
ScheduledExecutorService appleAppStoreRetryExecutor = ScheduledExecutorServiceBuilder.of(environment, "appleAppStoreRetry").threads(1).build();
|
||||
ScheduledExecutorService subscriptionProcessorRetryExecutor = ScheduledExecutorServiceBuilder.of(environment, "subscriptionProcessorRetry").threads(1).build();
|
||||
ScheduledExecutorService cloudflareTurnRetryExecutor = ScheduledExecutorServiceBuilder.of(environment, "cloudflareTurnRetry").threads(1).build();
|
||||
ScheduledExecutorService messagePollExecutor = ScheduledExecutorServiceBuilder.of(environment, "messagePollExecutor").threads(1).build();
|
||||
ScheduledExecutorService provisioningWebsocketTimeoutExecutor = ScheduledExecutorServiceBuilder.of(environment, "provisioningWebsocketTimeout").threads(1).build();
|
||||
|
||||
final ManagedNioEventLoopGroup dnsResolutionEventLoopGroup = new ManagedNioEventLoopGroup();
|
||||
final DnsNameResolver cloudflareDnsResolver = new DnsNameResolverBuilder(dnsResolutionEventLoopGroup.next())
|
||||
@@ -922,8 +902,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
noiseWebSocketTlsPrivateKey = null;
|
||||
}
|
||||
|
||||
final ExecutorService noiseWebSocketDelegatedTaskExecutor = environment.lifecycle()
|
||||
.executorService(name(getClass(), "noiseWebsocketDelegatedTask-%d"))
|
||||
final ExecutorService noiseWebSocketDelegatedTaskExecutor = ExecutorServiceBuilder.of(environment, "noiseWebsocketDelegatedTask")
|
||||
.minThreads(8)
|
||||
.maxThreads(8)
|
||||
.allowCoreThreadTimeOut(false)
|
||||
@@ -1227,6 +1206,47 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
});
|
||||
}
|
||||
|
||||
private static class ExecutorServiceBuilder extends io.dropwizard.lifecycle.setup.ExecutorServiceBuilder {
|
||||
private final String baseName;
|
||||
|
||||
public ExecutorServiceBuilder(final LifecycleEnvironment environment, final String baseName) {
|
||||
super(environment, name(WhisperServerService.class, baseName) + "-%d");
|
||||
this.baseName = baseName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecutorService build() {
|
||||
return ExecutorServiceMetrics.monitor(Metrics.globalRegistry, super.build(), baseName, MetricsUtil.PREFIX);
|
||||
}
|
||||
|
||||
public static ExecutorServiceBuilder of(final Environment environment, final String name) {
|
||||
return new ExecutorServiceBuilder(environment.lifecycle(), name);
|
||||
}
|
||||
}
|
||||
|
||||
private static class ScheduledExecutorServiceBuilder extends io.dropwizard.lifecycle.setup.ScheduledExecutorServiceBuilder {
|
||||
private final String baseName;
|
||||
|
||||
public ScheduledExecutorServiceBuilder(final LifecycleEnvironment environment, final String baseName) {
|
||||
super(environment, name(WhisperServerService.class, baseName) + "-%d", false);
|
||||
this.baseName = baseName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledExecutorService build() {
|
||||
return ExecutorServiceMetrics.monitor(Metrics.globalRegistry, super.build(), baseName, MetricsUtil.PREFIX);
|
||||
}
|
||||
|
||||
public static ScheduledExecutorServiceBuilder of(final Environment environment, final String name) {
|
||||
return new ScheduledExecutorServiceBuilder(environment.lifecycle(), name);
|
||||
}
|
||||
}
|
||||
|
||||
private ExecutorService virtualExecutorService(final Environment environment, final String name) {
|
||||
return ExecutorServiceMetrics.monitor(
|
||||
Metrics.globalRegistry, environment.lifecycle().virtualExecutorService(name(getClass(), name) + "-%d"), name, MetricsUtil.PREFIX);
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
new WhisperServerService().run(args);
|
||||
}
|
||||
|
||||
@@ -6,7 +6,6 @@ package org.whispersystems.textsecuregcm.controllers;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
import com.codahale.metrics.annotation.Timed;
|
||||
import com.google.common.net.HttpHeaders;
|
||||
import io.dropwizard.auth.Auth;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
@@ -469,7 +468,6 @@ public class MessageController {
|
||||
}
|
||||
}
|
||||
|
||||
@Timed
|
||||
@Path("/multi_recipient")
|
||||
@PUT
|
||||
@Consumes(MultiRecipientMessageProvider.MEDIA_TYPE)
|
||||
@@ -756,7 +754,6 @@ public class MessageController {
|
||||
}
|
||||
}
|
||||
|
||||
@Timed
|
||||
@GET
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
public CompletableFuture<OutgoingMessageEntityList> getPendingMessages(@Auth AuthenticatedDevice auth,
|
||||
@@ -835,7 +832,6 @@ public class MessageController {
|
||||
return size;
|
||||
}
|
||||
|
||||
@Timed
|
||||
@DELETE
|
||||
@Path("/uuid/{uuid}")
|
||||
public CompletableFuture<Response> removePendingMessage(@Auth AuthenticatedDevice auth, @PathParam("uuid") UUID uuid) {
|
||||
@@ -874,7 +870,6 @@ public class MessageController {
|
||||
.thenApply(Util.ASYNC_EMPTY_RESPONSE);
|
||||
}
|
||||
|
||||
@Timed
|
||||
@POST
|
||||
@Consumes(MediaType.APPLICATION_JSON)
|
||||
@Path("/report/{source}/{messageGuid}")
|
||||
|
||||
@@ -14,6 +14,8 @@ import org.eclipse.jetty.util.component.Container;
|
||||
import org.eclipse.jetty.util.component.LifeCycle;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import io.micrometer.core.instrument.binder.jetty.JettyConnectionMetrics;
|
||||
|
||||
/**
|
||||
* Uses {@link Container.Listener} to update {@link org.eclipse.jetty.server.HttpConfiguration}
|
||||
@@ -25,7 +27,6 @@ public class JettyHttpConfigurationCustomizer implements Container.Listener, Lif
|
||||
@Override
|
||||
public void beanAdded(final Container parent, final Object child) {
|
||||
if (child instanceof Connector c) {
|
||||
|
||||
for (ConnectionFactory cf : c.getConnectionFactories()) {
|
||||
final HttpConfiguration httpConfiguration = switch (cf) {
|
||||
case HTTP2ServerConnectionFactory h2cf -> h2cf.getHttpConfiguration();
|
||||
@@ -39,6 +40,8 @@ public class JettyHttpConfigurationCustomizer implements Container.Listener, Lif
|
||||
httpConfiguration.setNotifyRemoteAsyncErrors(false);
|
||||
}
|
||||
}
|
||||
|
||||
c.addBean(new JettyConnectionMetrics(Metrics.globalRegistry));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -5,8 +5,6 @@
|
||||
|
||||
package org.whispersystems.textsecuregcm.push;
|
||||
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
@@ -16,7 +14,6 @@ import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
||||
import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier;
|
||||
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
|
||||
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
||||
import org.whispersystems.textsecuregcm.metrics.UserAgentTagUtil;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
@@ -33,9 +30,7 @@ public class ReceiptSender {
|
||||
final ExecutorService executor) {
|
||||
this.accountManager = accountManager;
|
||||
this.messageSender = messageSender;
|
||||
this.executor = ExecutorServiceMetrics.monitor(
|
||||
Metrics.globalRegistry, executor, MetricsUtil.name(ReceiptSender.class, "executor"), MetricsUtil.PREFIX)
|
||||
;
|
||||
this.executor = executor;
|
||||
}
|
||||
|
||||
public void sendReceipt(ServiceIdentifier sourceIdentifier, byte sourceDeviceId, AciServiceIdentifier destinationIdentifier, long messageId) {
|
||||
|
||||
Reference in New Issue
Block a user