Add ScheduledApnPushNotificationSenderServiceCommand

This commit is contained in:
Chris Eager
2023-05-10 15:32:23 -05:00
committed by Chris Eager
parent 0d9fd043a4
commit 6043c1a4e8
6 changed files with 201 additions and 18 deletions

View File

@@ -41,6 +41,7 @@ import java.time.Duration;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -219,6 +220,7 @@ import org.whispersystems.textsecuregcm.workers.CertificateCommand;
import org.whispersystems.textsecuregcm.workers.CheckDynamicConfigurationCommand;
import org.whispersystems.textsecuregcm.workers.CrawlAccountsCommand;
import org.whispersystems.textsecuregcm.workers.DeleteUserCommand;
import org.whispersystems.textsecuregcm.workers.ScheduledApnPushNotificationSenderServiceCommand;
import org.whispersystems.textsecuregcm.workers.ServerVersionCommand;
import org.whispersystems.textsecuregcm.workers.SetRequestLoggingEnabledTask;
import org.whispersystems.textsecuregcm.workers.SetUserDiscoverabilityCommand;
@@ -266,6 +268,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
bootstrap.addCommand(new AssignUsernameCommand());
bootstrap.addCommand(new UnlinkDeviceCommand());
bootstrap.addCommand(new CrawlAccountsCommand());
bootstrap.addCommand(new ScheduledApnPushNotificationSenderServiceCommand());
}
@Override
@@ -521,25 +524,27 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
deletedAccountsLockDynamoDbClient, config.getDynamoDbTables().getDeletedAccountsLock().getTableName());
AccountsManager accountsManager = new AccountsManager(accounts, phoneNumberIdentifiers, cacheCluster,
deletedAccountsManager, keys, messagesManager, profilesManager,
pendingAccountsManager, secureStorageClient, secureBackupClient, secureValueRecovery2Client, clientPresenceManager,
pendingAccountsManager, secureStorageClient, secureBackupClient, secureValueRecovery2Client,
clientPresenceManager,
experimentEnrollmentManager, registrationRecoveryPasswordsManager, clock);
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
APNSender apnSender = new APNSender(apnSenderExecutor, config.getApnConfiguration());
FcmSender fcmSender = new FcmSender(fcmSenderExecutor, config.getFcmConfiguration().credentials().value());
ApnPushNotificationScheduler apnPushNotificationScheduler = new ApnPushNotificationScheduler(pushSchedulerCluster,
apnSender, accountsManager);
apnSender, accountsManager, Optional.empty(), dynamicConfigurationManager);
PushNotificationManager pushNotificationManager = new PushNotificationManager(accountsManager, apnSender, fcmSender,
apnPushNotificationScheduler, pushLatencyManager);
RateLimiters rateLimiters = RateLimiters.createAndValidate(config.getLimitsConfiguration(),
dynamicConfigurationManager, rateLimitersCluster);
ProvisioningManager provisioningManager = new ProvisioningManager(config.getPubsubCacheConfiguration().getUri(), redisClientResources, config.getPubsubCacheConfiguration().getTimeout(), config.getPubsubCacheConfiguration().getCircuitBreakerConfiguration());
ProvisioningManager provisioningManager = new ProvisioningManager(config.getPubsubCacheConfiguration().getUri(),
redisClientResources, config.getPubsubCacheConfiguration().getTimeout(),
config.getPubsubCacheConfiguration().getCircuitBreakerConfiguration());
IssuedReceiptsManager issuedReceiptsManager = new IssuedReceiptsManager(
config.getDynamoDbTables().getIssuedReceipts().getTableName(),
config.getDynamoDbTables().getIssuedReceipts().getExpiration(),
dynamoDbAsyncClient,
config.getDynamoDbTables().getIssuedReceipts().getGenerator());
RedeemedReceiptsManager redeemedReceiptsManager = new RedeemedReceiptsManager(
clock,
RedeemedReceiptsManager redeemedReceiptsManager = new RedeemedReceiptsManager(clock,
config.getDynamoDbTables().getRedeemedReceipts().getTableName(),
dynamoDbAsyncClient,
config.getDynamoDbTables().getRedeemedReceipts().getExpiration());

View File

@@ -47,6 +47,11 @@ public class DynamicConfiguration {
@Valid
private DynamicTurnConfiguration turn = new DynamicTurnConfiguration();
@JsonProperty
@Valid
DynamicScheduledApnNotificationSendingConfiguration scheduledApnNotificationSending = new DynamicScheduledApnNotificationSendingConfiguration(
true, false);
@JsonProperty
@Valid
DynamicMessagePersisterConfiguration messagePersister = new DynamicMessagePersisterConfiguration();
@@ -95,6 +100,10 @@ public class DynamicConfiguration {
return turn;
}
public DynamicScheduledApnNotificationSendingConfiguration getScheduledApnNotificationSendingConfiguration() {
return scheduledApnNotificationSending;
}
public DynamicMessagePersisterConfiguration getMessagePersisterConfiguration() {
return messagePersister;
}

View File

@@ -0,0 +1,11 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.configuration.dynamic;
public record DynamicScheduledApnNotificationSendingConfiguration(boolean enabledForServer,
boolean enabledForDedicatedProcess) {
}

View File

@@ -30,15 +30,16 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.redis.ClusterLuaScript;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.util.RedisClusterUtil;
import org.whispersystems.textsecuregcm.util.Util;
@@ -73,9 +74,11 @@ public class ApnPushNotificationScheduler implements Managed {
private final ClusterLuaScript scheduleBackgroundNotificationScript;
private final Thread[] workerThreads = new Thread[WORKER_THREAD_COUNT];
private final Thread[] workerThreads;
private final boolean dedicatedProcess;
private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;
private static final int WORKER_THREAD_COUNT = 4;
private static final int DEFAULT_WORKER_THREAD_COUNT = 4;
@VisibleForTesting
static final Duration BACKGROUND_NOTIFICATION_PERIOD = Duration.ofMinutes(20);
@@ -102,6 +105,18 @@ public class ApnPushNotificationScheduler implements Managed {
}
private long processNextSlot() {
if (dedicatedProcess) {
if (!dynamicConfigurationManager.getConfiguration().getScheduledApnNotificationSendingConfiguration()
.enabledForDedicatedProcess()) {
return 0;
}
} else {
if (!dynamicConfigurationManager.getConfiguration().getScheduledApnNotificationSendingConfiguration()
.enabledForServer()) {
return 0;
}
}
final int slot = (int) (pushSchedulingCluster.withCluster(connection ->
connection.sync().incr(NEXT_SLOT_TO_PROCESS_KEY)) % SlotHash.SLOT_COUNT);
@@ -166,32 +181,44 @@ public class ApnPushNotificationScheduler implements Managed {
}
public ApnPushNotificationScheduler(FaultTolerantRedisCluster pushSchedulingCluster,
APNSender apnSender,
AccountsManager accountsManager) throws IOException {
APNSender apnSender, AccountsManager accountsManager, final Optional<Integer> dedicatedProcessWorkerThreadCount,
DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) throws IOException {
this(pushSchedulingCluster, apnSender, accountsManager, Clock.systemUTC());
this(pushSchedulingCluster, apnSender, accountsManager, Clock.systemUTC(), dedicatedProcessWorkerThreadCount,
dynamicConfigurationManager);
}
@VisibleForTesting
ApnPushNotificationScheduler(FaultTolerantRedisCluster pushSchedulingCluster,
APNSender apnSender,
AccountsManager accountsManager,
Clock clock) throws IOException {
Clock clock,
Optional<Integer> dedicatedProcessThreadCount,
DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager) throws IOException {
this.apnSender = apnSender;
this.accountsManager = accountsManager;
this.pushSchedulingCluster = pushSchedulingCluster;
this.clock = clock;
this.getPendingVoipDestinationsScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/get.lua", ScriptOutputType.MULTI);
this.insertPendingVoipDestinationScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/insert.lua", ScriptOutputType.VALUE);
this.removePendingVoipDestinationScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/remove.lua", ScriptOutputType.INTEGER);
this.getPendingVoipDestinationsScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/get.lua",
ScriptOutputType.MULTI);
this.insertPendingVoipDestinationScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/insert.lua",
ScriptOutputType.VALUE);
this.removePendingVoipDestinationScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/remove.lua",
ScriptOutputType.INTEGER);
this.scheduleBackgroundNotificationScript = ClusterLuaScript.fromResource(pushSchedulingCluster, "lua/apn/schedule_background_notification.lua", ScriptOutputType.VALUE);
this.scheduleBackgroundNotificationScript = ClusterLuaScript.fromResource(pushSchedulingCluster,
"lua/apn/schedule_background_notification.lua", ScriptOutputType.VALUE);
this.workerThreads = dedicatedProcessThreadCount.map(Thread[]::new)
.orElseGet(() -> new Thread[DEFAULT_WORKER_THREAD_COUNT]);
for (int i = 0; i < this.workerThreads.length; i++) {
this.workerThreads[i] = new Thread(new NotificationWorker(), "ApnFallbackManagerWorker-" + i);
}
this.dedicatedProcess = dedicatedProcessThreadCount.isPresent();
this.dynamicConfigurationManager = dynamicConfigurationManager;
}
/**

View File

@@ -0,0 +1,75 @@
/*
* Copyright 2023 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.workers;
import static com.codahale.metrics.MetricRegistry.name;
import io.dropwizard.Application;
import io.dropwizard.cli.EnvironmentCommand;
import io.dropwizard.setup.Environment;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.push.APNSender;
import org.whispersystems.textsecuregcm.push.ApnPushNotificationScheduler;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisCluster;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
public class ScheduledApnPushNotificationSenderServiceCommand extends EnvironmentCommand<WhisperServerConfiguration> {
private static final String WORKER_COUNT = "workers";
public ScheduledApnPushNotificationSenderServiceCommand() {
super(new Application<>() {
@Override
public void run(WhisperServerConfiguration configuration, Environment environment) {
}
}, "scheduled-apn-push-notification-sender-service",
"Starts a persistent service to send scheduled APNs push notifications");
}
@Override
public void configure(final Subparser subparser) {
super.configure(subparser);
subparser.addArgument("--workers")
.type(Integer.class)
.dest(WORKER_COUNT)
.required(true)
.help("The number of worker threads");
}
@Override
protected void run(Environment environment, Namespace namespace, WhisperServerConfiguration configuration)
throws Exception {
final CommandDependencies deps = CommandDependencies.build("scheduled-apn-sender", environment, configuration);
final FaultTolerantRedisCluster pushSchedulerCluster = new FaultTolerantRedisCluster("push_scheduler",
configuration.getPushSchedulerCluster(), deps.redisClusterClientResources());
final ExecutorService apnSenderExecutor = environment.lifecycle().executorService(name(getClass(), "apnSender-%d"))
.maxThreads(1).minThreads(1).build();
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager = new DynamicConfigurationManager<>(
configuration.getAppConfig().getApplication(),
configuration.getAppConfig().getEnvironment(),
configuration.getAppConfig().getConfigurationName(),
DynamicConfiguration.class);
final APNSender apnSender = new APNSender(apnSenderExecutor, configuration.getApnConfiguration());
final ApnPushNotificationScheduler apnPushNotificationScheduler = new ApnPushNotificationScheduler(
pushSchedulerCluster, apnSender, deps.accountsManager(), Optional.of(namespace.getInt(WORKER_COUNT)),
dynamicConfigurationManager);
environment.lifecycle().manage(apnSender);
environment.lifecycle().manage(apnPushNotificationScheduler);
}
}