Remove commands for removing accounts/devices without PQ or PNI key material

This commit is contained in:
Jon Chambers
2025-05-05 15:04:46 -04:00
committed by Jon Chambers
parent dee3723d97
commit e43487155f
13 changed files with 0 additions and 1459 deletions

View File

@@ -266,19 +266,13 @@ import org.whispersystems.textsecuregcm.workers.CertificateCommand;
import org.whispersystems.textsecuregcm.workers.CheckDynamicConfigurationCommand;
import org.whispersystems.textsecuregcm.workers.DeleteUserCommand;
import org.whispersystems.textsecuregcm.workers.IdleDeviceNotificationSchedulerFactory;
import org.whispersystems.textsecuregcm.workers.LockAccountsWithoutPniIdentityKeysCommand;
import org.whispersystems.textsecuregcm.workers.LockAccountsWithoutPqKeysCommand;
import org.whispersystems.textsecuregcm.workers.MessagePersisterServiceCommand;
import org.whispersystems.textsecuregcm.workers.NotifyIdleDevicesCommand;
import org.whispersystems.textsecuregcm.workers.ProcessScheduledJobsServiceCommand;
import org.whispersystems.textsecuregcm.workers.RemoveAccountsWithoutPniIdentityKeysCommand;
import org.whispersystems.textsecuregcm.workers.RemoveAccountsWithoutPqKeysCommand;
import org.whispersystems.textsecuregcm.workers.RemoveExpiredAccountsCommand;
import org.whispersystems.textsecuregcm.workers.RemoveExpiredBackupsCommand;
import org.whispersystems.textsecuregcm.workers.RemoveExpiredLinkedDevicesCommand;
import org.whispersystems.textsecuregcm.workers.RemoveExpiredUsernameHoldsCommand;
import org.whispersystems.textsecuregcm.workers.RemoveLinkedDevicesWithoutPniKeysCommand;
import org.whispersystems.textsecuregcm.workers.RemoveLinkedDevicesWithoutPqKeysCommand;
import org.whispersystems.textsecuregcm.workers.ScheduledApnPushNotificationSenderServiceCommand;
import org.whispersystems.textsecuregcm.workers.ServerVersionCommand;
import org.whispersystems.textsecuregcm.workers.SetRequestLoggingEnabledTask;
@@ -341,14 +335,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
bootstrap.addCommand(new ProcessScheduledJobsServiceCommand("process-idle-device-notification-jobs",
"Processes scheduled jobs to send notifications to idle devices",
new IdleDeviceNotificationSchedulerFactory()));
bootstrap.addCommand(new RemoveLinkedDevicesWithoutPqKeysCommand());
bootstrap.addCommand(new LockAccountsWithoutPqKeysCommand());
bootstrap.addCommand(new RemoveAccountsWithoutPqKeysCommand());
bootstrap.addCommand(new RemoveLinkedDevicesWithoutPniKeysCommand());
bootstrap.addCommand(new LockAccountsWithoutPniIdentityKeysCommand());
bootstrap.addCommand(new RemoveAccountsWithoutPniIdentityKeysCommand());
}
@Override

View File

@@ -1,105 +0,0 @@
/*
* Copyright 2025 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.workers;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Metrics;
import net.sourceforge.argparse4j.inf.Subparser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.identity.IdentityType;
import org.whispersystems.textsecuregcm.metrics.DevicePlatformUtil;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import java.time.Duration;
public class LockAccountsWithoutPniIdentityKeysCommand extends AbstractSinglePassCrawlAccountsCommand {
@VisibleForTesting
static final String DRY_RUN_ARGUMENT = "dry-run";
@VisibleForTesting
static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency";
@VisibleForTesting
static final String RETRIES_ARGUMENT = "retries";
private static final String LOCKED_ACCOUNT_COUNTER_NAME =
MetricsUtil.name(LockAccountsWithoutPniIdentityKeysCommand.class, "lockedAccount");
private static final Logger log = LoggerFactory.getLogger(LockAccountsWithoutPniIdentityKeysCommand.class);
public LockAccountsWithoutPniIdentityKeysCommand() {
super("lock-accounts-without-pni-identity-keys", "Locks accounts without PNI identity keys");
}
@Override
public void configure(final Subparser subparser) {
super.configure(subparser);
subparser.addArgument("--dry-run")
.type(Boolean.class)
.dest(DRY_RUN_ARGUMENT)
.required(false)
.setDefault(true)
.help("If true, dont actually lock accounts with expired linked devices");
subparser.addArgument("--max-concurrency")
.type(Integer.class)
.dest(MAX_CONCURRENCY_ARGUMENT)
.setDefault(16)
.help("Max concurrency for DynamoDB operations");
subparser.addArgument("--retries")
.type(Integer.class)
.dest(RETRIES_ARGUMENT)
.setDefault(3)
.help("Maximum number of DynamoDB retries permitted per device");
}
@Override
protected void crawlAccounts(final Flux<Account> accounts) {
final boolean dryRun = getNamespace().getBoolean(DRY_RUN_ARGUMENT);
final int maxConcurrency = getNamespace().getInt(MAX_CONCURRENCY_ARGUMENT);
final int maxRetries = getNamespace().getInt(RETRIES_ARGUMENT);
final AccountsManager accountsManager = getCommandDependencies().accountsManager();
accounts
.filter(account -> !account.hasLockedCredentials())
.filter(account -> account.getIdentityKey(IdentityType.PNI) == null)
.flatMap(accountWithoutPniIdentityKey -> {
final String platform = DevicePlatformUtil.getDevicePlatform(accountWithoutPniIdentityKey.getPrimaryDevice())
.map(Enum::name)
.orElse("unknown");
return dryRun
? Mono.just(platform)
: Mono.fromFuture(() -> accountsManager.updateAsync(accountWithoutPniIdentityKey, Account::lockAuthTokenHash))
.retryWhen(Retry.backoff(maxRetries, Duration.ofSeconds(1))
.onRetryExhaustedThrow((spec, rs) -> rs.failure()))
.thenReturn(platform)
.onErrorResume(throwable -> {
log.warn("Failed to lock account without PNI identity key: {}",
accountWithoutPniIdentityKey.getIdentifier(IdentityType.ACI), throwable);
return Mono.empty();
});
}, maxConcurrency)
.doOnNext(deletedAccountPlatform -> {
Metrics.counter(LOCKED_ACCOUNT_COUNTER_NAME,
"dryRun", String.valueOf(dryRun),
"platform", deletedAccountPlatform)
.increment();
})
.then()
.block();
}
}

View File

@@ -1,103 +0,0 @@
/*
* Copyright 2025 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.workers;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Metrics;
import net.sourceforge.argparse4j.inf.Subparser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.identity.IdentityType;
import org.whispersystems.textsecuregcm.metrics.DevicePlatformUtil;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import java.time.Duration;
public class LockAccountsWithoutPqKeysCommand extends AbstractSinglePassCrawlAccountsCommand {
@VisibleForTesting
static final String DRY_RUN_ARGUMENT = "dry-run";
@VisibleForTesting
static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency";
@VisibleForTesting
static final String RETRIES_ARGUMENT = "retries";
private static final String LOCKED_ACCOUNT_COUNTER_NAME =
MetricsUtil.name(LockAccountsWithoutPqKeysCommand.class, "lockedAccount");
private static final Logger log = LoggerFactory.getLogger(LockAccountsWithoutPqKeysCommand.class);
public LockAccountsWithoutPqKeysCommand() {
super("lock-accounts-without-pq-keys", "Locks accounts with primary devices that don't have PQ keys");
}
@Override
public void configure(final Subparser subparser) {
super.configure(subparser);
subparser.addArgument("--dry-run")
.type(Boolean.class)
.dest(DRY_RUN_ARGUMENT)
.required(false)
.setDefault(true)
.help("If true, dont actually lock accounts with expired linked devices");
subparser.addArgument("--max-concurrency")
.type(Integer.class)
.dest(MAX_CONCURRENCY_ARGUMENT)
.setDefault(16)
.help("Max concurrency for DynamoDB operations");
subparser.addArgument("--retries")
.type(Integer.class)
.dest(RETRIES_ARGUMENT)
.setDefault(3)
.help("Maximum number of DynamoDB retries permitted per device");
}
@Override
protected void crawlAccounts(final Flux<Account> accounts) {
final boolean dryRun = getNamespace().getBoolean(DRY_RUN_ARGUMENT);
final int maxConcurrency = getNamespace().getInt(MAX_CONCURRENCY_ARGUMENT);
final int maxRetries = getNamespace().getInt(RETRIES_ARGUMENT);
final AccountsManager accountsManager = getCommandDependencies().accountsManager();
final PqKeysUtil pqKeysUtil = new PqKeysUtil(getCommandDependencies().keysManager(), maxConcurrency, maxRetries);
accounts
.transform(pqKeysUtil::getAccountsWithoutPqKeys)
.flatMap(accountWithoutPqKeys -> {
final String platform = DevicePlatformUtil.getDevicePlatform(accountWithoutPqKeys.getPrimaryDevice())
.map(Enum::name)
.orElse("unknown");
return dryRun
? Mono.just(platform)
: Mono.fromFuture(() -> accountsManager.updateAsync(accountWithoutPqKeys, Account::lockAuthTokenHash))
.retryWhen(Retry.backoff(maxRetries, Duration.ofSeconds(1))
.onRetryExhaustedThrow((spec, rs) -> rs.failure()))
.thenReturn(platform)
.onErrorResume(throwable -> {
log.warn("Failed to lock account without PQ keys {}", accountWithoutPqKeys.getIdentifier(IdentityType.ACI), throwable);
return Mono.empty();
});
})
.doOnNext(deletedAccountPlatform -> {
Metrics.counter(LOCKED_ACCOUNT_COUNTER_NAME,
"dryRun", String.valueOf(dryRun),
"platform", deletedAccountPlatform)
.increment();
})
.then()
.block();
}
}

View File

@@ -1,112 +0,0 @@
/*
* Copyright 2025 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.workers;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Metrics;
import net.sourceforge.argparse4j.inf.Subparser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.identity.IdentityType;
import org.whispersystems.textsecuregcm.metrics.DevicePlatformUtil;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import java.time.Duration;
public class RemoveAccountsWithoutPniIdentityKeysCommand extends AbstractSinglePassCrawlAccountsCommand {
@VisibleForTesting
static final String DRY_RUN_ARGUMENT = "dry-run";
@VisibleForTesting
static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency";
@VisibleForTesting
static final String RETRIES_ARGUMENT = "retries";
private static final String REMOVED_ACCOUNT_COUNTER_NAME =
MetricsUtil.name(RemoveAccountsWithoutPniIdentityKeysCommand.class, "removedAccount");
private static final Logger log = LoggerFactory.getLogger(RemoveAccountsWithoutPniIdentityKeysCommand.class);
public RemoveAccountsWithoutPniIdentityKeysCommand() {
super("remove-accounts-without-pni-identity-keys", "Deletes accounts without PNI identity keys");
}
@Override
public void configure(final Subparser subparser) {
super.configure(subparser);
subparser.addArgument("--dry-run")
.type(Boolean.class)
.dest(DRY_RUN_ARGUMENT)
.required(false)
.setDefault(true)
.help("If true, dont actually lock accounts with expired linked devices");
subparser.addArgument("--max-concurrency")
.type(Integer.class)
.dest(MAX_CONCURRENCY_ARGUMENT)
.setDefault(16)
.help("Max concurrency for DynamoDB operations");
subparser.addArgument("--retries")
.type(Integer.class)
.dest(RETRIES_ARGUMENT)
.setDefault(3)
.help("Maximum number of DynamoDB retries permitted per device");
}
@Override
protected void crawlAccounts(final Flux<Account> accounts) {
final boolean dryRun = getNamespace().getBoolean(DRY_RUN_ARGUMENT);
final int maxConcurrency = getNamespace().getInt(MAX_CONCURRENCY_ARGUMENT);
final int maxRetries = getNamespace().getInt(RETRIES_ARGUMENT);
final AccountsManager accountsManager = getCommandDependencies().accountsManager();
accounts
.filter(account -> account.getIdentityKey(IdentityType.PNI) == null)
.filter(accountWithoutPniIdentityKey -> {
if (!accountWithoutPniIdentityKey.hasLockedCredentials()) {
log.warn("Account {} is not locked", accountWithoutPniIdentityKey.getIdentifier(IdentityType.ACI));
return false;
}
return true;
})
.flatMap(accountWithoutPniIdentityKey -> {
final String platform = DevicePlatformUtil.getDevicePlatform(accountWithoutPniIdentityKey.getPrimaryDevice())
.map(Enum::name)
.orElse("unknown");
return dryRun
? Mono.just(platform)
: Mono.fromFuture(() -> accountsManager.delete(accountWithoutPniIdentityKey, AccountsManager.DeletionReason.ADMIN_DELETED))
.retryWhen(Retry.backoff(maxRetries, Duration.ofSeconds(1))
.onRetryExhaustedThrow((spec, rs) -> rs.failure()))
.thenReturn(platform)
.onErrorResume(throwable -> {
log.warn("Failed to delete account without PNI identity key: {}",
accountWithoutPniIdentityKey.getIdentifier(IdentityType.ACI), throwable);
return Mono.empty();
});
}, maxConcurrency)
.doOnNext(deletedAccountPlatform -> {
Metrics.counter(REMOVED_ACCOUNT_COUNTER_NAME,
"dryRun", String.valueOf(dryRun),
"platform", deletedAccountPlatform)
.increment();
})
.then()
.block();
}
}

View File

@@ -1,121 +0,0 @@
/*
* Copyright 2025 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.workers;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Metrics;
import java.time.Duration;
import net.sourceforge.argparse4j.inf.Subparser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.identity.IdentityType;
import org.whispersystems.textsecuregcm.metrics.DevicePlatformUtil;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
public class RemoveAccountsWithoutPqKeysCommand extends AbstractSinglePassCrawlAccountsCommand {
@VisibleForTesting
static final String DRY_RUN_ARGUMENT = "dry-run";
@VisibleForTesting
static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency";
@VisibleForTesting
static final String RETRIES_ARGUMENT = "retries";
@VisibleForTesting
static final String MAX_ACCOUNTS_ARGUMENT = "max-accounts";
private static final String REMOVED_ACCOUNT_COUNTER_NAME =
MetricsUtil.name(RemoveAccountsWithoutPqKeysCommand.class, "removedAccount");
private static final Logger log = LoggerFactory.getLogger(RemoveAccountsWithoutPqKeysCommand.class);
public RemoveAccountsWithoutPqKeysCommand() {
super("remove-accounts-without-pq-keys", "Removes accounts with primary devices that don't have PQ keys");
}
@Override
public void configure(final Subparser subparser) {
super.configure(subparser);
subparser.addArgument("--dry-run")
.type(Boolean.class)
.dest(DRY_RUN_ARGUMENT)
.required(false)
.setDefault(true)
.help("If true, dont actually modify accounts with expired linked devices");
subparser.addArgument("--max-concurrency")
.type(Integer.class)
.dest(MAX_CONCURRENCY_ARGUMENT)
.setDefault(16)
.help("Max concurrency for DynamoDB operations");
subparser.addArgument("--retries")
.type(Integer.class)
.dest(RETRIES_ARGUMENT)
.setDefault(3)
.help("Maximum number of DynamoDB retries permitted per device");
subparser.addArgument("--max-accounts")
.type(Integer.class)
.required(true)
.dest(MAX_ACCOUNTS_ARGUMENT)
.help("Maximum number of accounts to remove per run");
}
@Override
protected void crawlAccounts(final Flux<Account> accounts) {
final boolean dryRun = getNamespace().getBoolean(DRY_RUN_ARGUMENT);
final int maxConcurrency = getNamespace().getInt(MAX_CONCURRENCY_ARGUMENT);
final int maxRetries = getNamespace().getInt(RETRIES_ARGUMENT);
final int maxAccounts = getNamespace().getInt(MAX_ACCOUNTS_ARGUMENT);
final AccountsManager accountsManager = getCommandDependencies().accountsManager();
final PqKeysUtil pqKeysUtil = new PqKeysUtil(getCommandDependencies().keysManager(), maxConcurrency, maxRetries);
accounts
.transform(pqKeysUtil::getAccountsWithoutPqKeys)
.take(maxAccounts)
.filter(accountWithoutPqKeys -> {
if (!accountWithoutPqKeys.hasLockedCredentials()) {
log.warn("Account {} is not locked", accountWithoutPqKeys.getIdentifier(IdentityType.ACI));
}
return accountWithoutPqKeys.hasLockedCredentials();
})
.flatMap(accountWithoutPqKeys -> {
final String platform = DevicePlatformUtil.getDevicePlatform(accountWithoutPqKeys.getPrimaryDevice())
.map(Enum::name)
.orElse("unknown");
return dryRun
? Mono.just(platform)
: Mono.fromFuture(() -> accountsManager.delete(accountWithoutPqKeys, AccountsManager.DeletionReason.ADMIN_DELETED))
.retryWhen(Retry.backoff(maxRetries, Duration.ofSeconds(1))
.onRetryExhaustedThrow((spec, rs) -> rs.failure()))
.thenReturn(platform)
.onErrorResume(throwable -> {
log.warn("Failed to remove account without PQ keys {}", accountWithoutPqKeys.getIdentifier(IdentityType.ACI), throwable);
return Mono.empty();
});
})
.doOnNext(deletedAccountPlatform -> {
Metrics.counter(REMOVED_ACCOUNT_COUNTER_NAME,
"dryRun", String.valueOf(dryRun),
"platform", deletedAccountPlatform)
.increment();
})
.then()
.block();
}
}

View File

@@ -1,123 +0,0 @@
/*
* Copyright 2025 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.workers;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.shaded.reactor.util.function.Tuples;
import java.time.Duration;
import net.sourceforge.argparse4j.inf.Subparser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.identity.IdentityType;
import org.whispersystems.textsecuregcm.metrics.DevicePlatformUtil;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.KeysManager;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
public class RemoveLinkedDevicesWithoutPniKeysCommand extends AbstractSinglePassCrawlAccountsCommand {
@VisibleForTesting
static final String DRY_RUN_ARGUMENT = "dry-run";
@VisibleForTesting
static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency";
@VisibleForTesting
static final String RETRIES_ARGUMENT = "retries";
private static final String REMOVED_DEVICE_COUNTER_NAME =
MetricsUtil.name(RemoveLinkedDevicesWithoutPniKeysCommand.class, "removedDevice");
private static final Logger log = LoggerFactory.getLogger(RemoveLinkedDevicesWithoutPniKeysCommand.class);
public RemoveLinkedDevicesWithoutPniKeysCommand() {
super("remove-linked-devices-without-pni-keys", "Removes linked devices that do not have PNI signed pre-keys");
}
@Override
public void configure(final Subparser subparser) {
super.configure(subparser);
subparser.addArgument("--dry-run")
.type(Boolean.class)
.dest(DRY_RUN_ARGUMENT)
.required(false)
.setDefault(true)
.help("If true, dont actually modify accounts with expired linked devices");
subparser.addArgument("--max-concurrency")
.type(Integer.class)
.dest(MAX_CONCURRENCY_ARGUMENT)
.setDefault(16)
.help("Max concurrency for DynamoDB operations");
subparser.addArgument("--retries")
.type(Integer.class)
.dest(RETRIES_ARGUMENT)
.setDefault(3)
.help("Maximum number of DynamoDB retries permitted per device");
}
@Override
protected void crawlAccounts(final Flux<Account> accounts) {
final boolean dryRun = getNamespace().getBoolean(DRY_RUN_ARGUMENT);
final int maxConcurrency = getNamespace().getInt(MAX_CONCURRENCY_ARGUMENT);
final int maxRetries = getNamespace().getInt(RETRIES_ARGUMENT);
final AccountsManager accountsManager = getCommandDependencies().accountsManager();
final KeysManager keysManager = getCommandDependencies().keysManager();
accounts
.filter(account -> !account.hasLockedCredentials())
.filter(account -> account.getDevices().size() > 1)
.flatMap(account -> Flux.fromIterable(account.getDevices())
.filter(device -> !device.isPrimary())
.map(device -> Tuples.of(account, device)))
.flatMap(accountAndDevice -> {
final Account account = accountAndDevice.getT1();
final Device device = accountAndDevice.getT2();
return Mono.fromFuture(
() -> keysManager.getEcSignedPreKey(account.getIdentifier(IdentityType.PNI), device.getId()))
.retryWhen(Retry.backoff(maxRetries, Duration.ofSeconds(1))
.onRetryExhaustedThrow((spec, rs) -> rs.failure()))
.onErrorResume(throwable -> {
log.warn("Failed to get PNI signed pre-key presence for account/device: {}:{}",
account.getIdentifier(IdentityType.ACI), device.getId());
return Mono.empty();
})
.map(maybeEcSignedPreKey -> Tuples.of(account, device, maybeEcSignedPreKey));
}, maxConcurrency)
.filter(tuple -> tuple.getT3().isEmpty())
.flatMap(accountAndDevice -> {
final Account account = accountAndDevice.getT1();
final Device device = accountAndDevice.getT2();
return dryRun
? Mono.just(device)
: Mono.fromFuture(() -> accountsManager.removeDevice(account, device.getId()))
.retryWhen(Retry.backoff(maxRetries, Duration.ofSeconds(1))
.onRetryExhaustedThrow((spec, rs) -> rs.failure()))
.onErrorResume(throwable -> {
log.warn("Failed to remove device: {}:{}", account.getIdentifier(IdentityType.ACI), device.getId());
return Mono.empty();
})
.then(Mono.just(device));
}, maxConcurrency)
.doOnNext(removedDevice -> Metrics.counter(REMOVED_DEVICE_COUNTER_NAME,
"dryRun", String.valueOf(dryRun),
"platform", DevicePlatformUtil.getDevicePlatform(removedDevice).map(Enum::name).orElse("unknown"))
.increment())
.then()
.block();
}
}

View File

@@ -1,111 +0,0 @@
/*
* Copyright 2025 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.workers;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Metrics;
import java.time.Duration;
import net.sourceforge.argparse4j.inf.Subparser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.identity.IdentityType;
import org.whispersystems.textsecuregcm.metrics.DevicePlatformUtil;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.KeysManager;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;
import reactor.util.retry.Retry;
public class RemoveLinkedDevicesWithoutPqKeysCommand extends AbstractSinglePassCrawlAccountsCommand {
@VisibleForTesting
static final String DRY_RUN_ARGUMENT = "dry-run";
@VisibleForTesting
static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency";
@VisibleForTesting
static final String RETRIES_ARGUMENT = "retries";
private static final String REMOVED_DEVICE_COUNTER_NAME =
MetricsUtil.name(RemoveLinkedDevicesWithoutPqKeysCommand.class, "removedDevice");
private static final Logger log = LoggerFactory.getLogger(RemoveLinkedDevicesWithoutPqKeysCommand.class);
public RemoveLinkedDevicesWithoutPqKeysCommand() {
super("remove-linked-devices-without-pq-keys", "Removes linked devices that don't have PQ keys");
}
@Override
public void configure(final Subparser subparser) {
super.configure(subparser);
subparser.addArgument("--dry-run")
.type(Boolean.class)
.dest(DRY_RUN_ARGUMENT)
.required(false)
.setDefault(true)
.help("If true, dont actually modify accounts with expired linked devices");
subparser.addArgument("--max-concurrency")
.type(Integer.class)
.dest(MAX_CONCURRENCY_ARGUMENT)
.setDefault(16)
.help("Max concurrency for DynamoDB operations");
subparser.addArgument("--retries")
.type(Integer.class)
.dest(RETRIES_ARGUMENT)
.setDefault(3)
.help("Maximum number of DynamoDB retries permitted per device");
}
@Override
protected void crawlAccounts(final Flux<Account> accounts) {
final boolean dryRun = getNamespace().getBoolean(DRY_RUN_ARGUMENT);
final int maxConcurrency = getNamespace().getInt(MAX_CONCURRENCY_ARGUMENT);
final int maxRetries = getNamespace().getInt(RETRIES_ARGUMENT);
final AccountsManager accountsManager = getCommandDependencies().accountsManager();
final KeysManager keysManager = getCommandDependencies().keysManager();
accounts
.filter(account -> account.getDevices().size() > 1)
.flatMap(
account -> Mono.fromFuture(() -> keysManager.getPqEnabledDevices(account.getIdentifier(IdentityType.ACI)))
.retryWhen(Retry.backoff(maxRetries, Duration.ofSeconds(1))
.onRetryExhaustedThrow((spec, rs) -> rs.failure()))
.onErrorResume(throwable -> {
log.warn("Failed to get PQ key presence for account: {}", account.getIdentifier(IdentityType.ACI));
return Mono.empty();
})
.flatMapMany(pqEnabledDeviceIds -> Flux.fromIterable(account.getDevices())
.filter(device -> !device.isPrimary())
.filter(device -> !pqEnabledDeviceIds.contains(device.getId()))
.map(device -> Tuples.of(account, device))), maxConcurrency)
.flatMap(accountAndDevice -> dryRun
? Mono.just(accountAndDevice.getT2())
: Mono.fromFuture(() -> accountsManager.removeDevice(accountAndDevice.getT1(), accountAndDevice.getT2().getId()))
.retryWhen(Retry.backoff(maxRetries, Duration.ofSeconds(1))
.onRetryExhaustedThrow((spec, rs) -> rs.failure()))
.onErrorResume(throwable -> {
log.warn("Failed to remove linked device without PQ keys: {}:{}",
accountAndDevice.getT1().getIdentifier(IdentityType.ACI), accountAndDevice.getT2().getId());
return Mono.empty();
})
.map(ignored -> accountAndDevice.getT2()), maxConcurrency)
.doOnNext(removedDevice -> Metrics.counter(REMOVED_DEVICE_COUNTER_NAME,
"dryRun", String.valueOf(dryRun),
"platform", DevicePlatformUtil.getDevicePlatform(removedDevice).map(Enum::name).orElse("unknown"))
.increment())
.then()
.block();
}
}