Make account deletion an asynchronous operation

This commit is contained in:
Jon Chambers
2023-10-04 10:44:50 -04:00
committed by GitHub
parent 010eadcd10
commit bb7e0528c4
21 changed files with 284 additions and 140 deletions

View File

@@ -443,6 +443,11 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
.maxThreads(2)
.minThreads(2)
.build();
ExecutorService accountLockExecutor = environment.lifecycle()
.executorService(name(getClass(), "accountLock-%d"))
.minThreads(8)
.maxThreads(8)
.build();
ScheduledExecutorService subscriptionProcessorRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "subscriptionProcessorRetry-%d")).threads(1).build();
@@ -518,7 +523,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
accountLockManager, keys, messagesManager, profilesManager,
secureStorageClient, secureBackupClient, secureValueRecovery2Client,
clientPresenceManager,
experimentEnrollmentManager, registrationRecoveryPasswordsManager, clock);
experimentEnrollmentManager, registrationRecoveryPasswordsManager, accountLockExecutor, clock);
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
APNSender apnSender = new APNSender(apnSenderExecutor, config.getApnConfiguration());
FcmSender fcmSender = new FcmSender(fcmSenderExecutor, config.getFcmConfiguration().credentials().value());

View File

@@ -12,6 +12,7 @@ import java.util.Base64;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
@@ -505,8 +506,8 @@ public class AccountController {
@DELETE
@Path("/me")
public void deleteAccount(@Auth DisabledPermittedAuthenticatedAccount auth) throws InterruptedException {
accounts.delete(auth.getAccount(), AccountsManager.DeletionReason.USER_REQUEST);
public CompletableFuture<Void> deleteAccount(@Auth DisabledPermittedAuthenticatedAccount auth) throws InterruptedException {
return accounts.delete(auth.getAccount(), AccountsManager.DeletionReason.USER_REQUEST);
}
private void clearUsernameLink(final Account account) {

View File

@@ -12,8 +12,6 @@ import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -25,11 +23,9 @@ public class AccountCleaner extends AccountDatabaseCrawlerListener {
private static final Counter DELETED_ACCOUNT_COUNTER = Metrics.counter(name(AccountCleaner.class, "deletedAccounts"));
private final AccountsManager accountsManager;
private final Executor deletionExecutor;
public AccountCleaner(final AccountsManager accountsManager, final Executor deletionExecutor) {
public AccountCleaner(final AccountsManager accountsManager) {
this.accountsManager = accountsManager;
this.deletionExecutor = deletionExecutor;
}
@Override
@@ -44,13 +40,7 @@ public class AccountCleaner extends AccountDatabaseCrawlerListener {
protected void onCrawlChunk(Optional<UUID> fromUuid, List<Account> chunkAccounts) {
final List<CompletableFuture<Void>> deletionFutures = chunkAccounts.stream()
.filter(AccountCleaner::isExpired)
.map(account -> CompletableFuture.runAsync(() -> {
try {
accountsManager.delete(account, AccountsManager.DeletionReason.EXPIRED);
} catch (final InterruptedException e) {
throw new CompletionException(e);
}
}, deletionExecutor)
.map(account -> accountsManager.delete(account, AccountsManager.DeletionReason.EXPIRED)
.whenComplete((ignored, throwable) -> {
if (throwable != null) {
log.warn("Failed to delete account {}", account.getUuid(), throwable);

View File

@@ -8,7 +8,12 @@ import com.amazonaws.services.dynamodbv2.ReleaseLockOptions;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.whispersystems.textsecuregcm.util.Util;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
public class AccountLockManager {
@@ -65,4 +70,44 @@ public class AccountLockManager {
.build()));
}
}
/**
* Acquires a distributed, pessimistic lock for the accounts identified by the given phone numbers. By design, the
* accounts need not actually exist in order to acquire a lock; this allows lock acquisition for operations that span
* account lifecycle changes (like deleting an account or changing a phone number). The given task runs once locks for
* all given phone numbers have been acquired, and the locks are released as soon as the task completes by any means.
*
* @param e164s the phone numbers for which to acquire a distributed, pessimistic lock
* @param taskSupplier a supplier for the task to execute once locks have been acquired
* @param executor the executor on which to acquire and release locks
*
* @return a future that completes normally when the given task has executed successfully and all locks have been
* released; the returned future may fail with an {@link InterruptedException} if interrupted while acquiring a lock
*/ public CompletableFuture<Void> withLockAsync(final List<String> e164s,
final Supplier<CompletableFuture<?>> taskSupplier,
final Executor executor) {
if (e164s.isEmpty()) {
throw new IllegalArgumentException("List of e164s to lock must not be empty");
}
final List<LockItem> lockItems = new ArrayList<>(e164s.size());
return CompletableFuture.runAsync(() -> {
for (final String e164 : e164s) {
try {
lockItems.add(lockClient.acquireLock(AcquireLockOptions.builder(e164)
.withAcquireReleasedLocksConsistently(true)
.build()));
} catch (final InterruptedException e) {
throw new CompletionException(e);
}
}
}, executor)
.thenCompose(ignored -> taskSupplier.get())
.whenCompleteAsync((ignored, throwable) -> lockItems.forEach(lockItem -> lockClient.releaseLock(ReleaseLockOptions.builder(lockItem)
.withBestEffort(true)
.build())), executor)
.thenRun(Util.NOOP);
}
}

View File

@@ -40,6 +40,7 @@ import org.whispersystems.textsecuregcm.util.AttributeValues;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.UUIDUtil;
import org.whispersystems.textsecuregcm.util.Util;
import reactor.core.publisher.Flux;
import reactor.core.publisher.ParallelFlux;
import reactor.core.scheduler.Scheduler;
@@ -786,23 +787,28 @@ public class Accounts extends AbstractDynamoDbStore {
return Optional.ofNullable(response.items().get(0).get(DELETED_ACCOUNTS_KEY_ACCOUNT_E164).s());
}
public void delete(final UUID uuid) {
DELETE_TIMER.record(() -> getByAccountIdentifier(uuid).ifPresent(account -> {
public CompletableFuture<Void> delete(final UUID uuid) {
final Timer.Sample sample = Timer.start();
final List<TransactWriteItem> transactWriteItems = new ArrayList<>(List.of(
buildDelete(phoneNumberConstraintTableName, ATTR_ACCOUNT_E164, account.getNumber()),
buildDelete(accountsTableName, KEY_ACCOUNT_UUID, uuid),
buildDelete(phoneNumberIdentifierConstraintTableName, ATTR_PNI_UUID, account.getPhoneNumberIdentifier()),
buildPutDeletedAccount(uuid, account.getNumber())
));
return getByAccountIdentifierAsync(uuid)
.thenCompose(maybeAccount -> maybeAccount.map(account -> {
final List<TransactWriteItem> transactWriteItems = new ArrayList<>(List.of(
buildDelete(phoneNumberConstraintTableName, ATTR_ACCOUNT_E164, account.getNumber()),
buildDelete(accountsTableName, KEY_ACCOUNT_UUID, uuid),
buildDelete(phoneNumberIdentifierConstraintTableName, ATTR_PNI_UUID, account.getPhoneNumberIdentifier()),
buildPutDeletedAccount(uuid, account.getNumber())
));
account.getUsernameHash().ifPresent(usernameHash -> transactWriteItems.add(
buildDelete(usernamesConstraintTableName, ATTR_USERNAME_HASH, usernameHash)));
account.getUsernameHash().ifPresent(usernameHash -> transactWriteItems.add(
buildDelete(usernamesConstraintTableName, ATTR_USERNAME_HASH, usernameHash)));
final TransactWriteItemsRequest request = TransactWriteItemsRequest.builder()
.transactItems(transactWriteItems).build();
db().transactWriteItems(request);
}));
return asyncClient.transactWriteItems(TransactWriteItemsRequest.builder()
.transactItems(transactWriteItems)
.build())
.thenRun(Util.NOOP);
})
.orElseGet(() -> CompletableFuture.completedFuture(null)))
.thenRun(() -> sample.stop(DELETE_TIMER));
}
ParallelFlux<Account> getAll(final int segments, final Scheduler scheduler) {

View File

@@ -35,6 +35,7 @@ import java.util.OptionalInt;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -110,6 +111,7 @@ public class AccountsManager {
private final ClientPresenceManager clientPresenceManager;
private final ExperimentEnrollmentManager experimentEnrollmentManager;
private final RegistrationRecoveryPasswordsManager registrationRecoveryPasswordsManager;
private final Executor accountLockExecutor;
private final Clock clock;
private static final ObjectWriter ACCOUNT_REDIS_JSON_WRITER = SystemMapper.jsonMapper()
@@ -155,6 +157,7 @@ public class AccountsManager {
final ClientPresenceManager clientPresenceManager,
final ExperimentEnrollmentManager experimentEnrollmentManager,
final RegistrationRecoveryPasswordsManager registrationRecoveryPasswordsManager,
final Executor accountLockExecutor,
final Clock clock) {
this.accounts = accounts;
this.phoneNumberIdentifiers = phoneNumberIdentifiers;
@@ -169,6 +172,7 @@ public class AccountsManager {
this.clientPresenceManager = clientPresenceManager;
this.experimentEnrollmentManager = experimentEnrollmentManager;
this.registrationRecoveryPasswordsManager = requireNonNull(registrationRecoveryPasswordsManager);
this.accountLockExecutor = accountLockExecutor;
this.clock = requireNonNull(clock);
}
@@ -234,7 +238,7 @@ public class AccountsManager {
keysManager.delete(account.getPhoneNumberIdentifier()));
messagesManager.clear(actualUuid).join();
profilesManager.deleteAll(actualUuid);
profilesManager.deleteAll(actualUuid).join();
deleteKeysFuture.join();
@@ -301,7 +305,7 @@ public class AccountsManager {
final Optional<UUID> maybeDisplacedUuid;
if (maybeExistingAccount.isPresent()) {
delete(maybeExistingAccount.get());
delete(maybeExistingAccount.get()).join();
maybeDisplacedUuid = maybeExistingAccount.map(Account::getUuid);
} else {
maybeDisplacedUuid = recentlyDeletedAci;
@@ -847,50 +851,39 @@ public class AccountsManager {
return accounts.getAll(segments, scheduler);
}
public void delete(final Account account, final DeletionReason deletionReason) throws InterruptedException {
try (final Timer.Context ignored = deleteTimer.time()) {
accountLockManager.withLock(List.of(account.getNumber()), () -> delete(account));
} catch (final RuntimeException | InterruptedException e) {
logger.warn("Failed to delete account", e);
throw e;
}
public CompletableFuture<Void> delete(final Account account, final DeletionReason deletionReason) {
@SuppressWarnings("resource") final Timer.Context timerContext = deleteTimer.time();
Metrics.counter(DELETE_COUNTER_NAME,
COUNTRY_CODE_TAG_NAME, Util.getCountryCode(account.getNumber()),
DELETION_REASON_TAG_NAME, deletionReason.tagValue)
.increment();
return accountLockManager.withLockAsync(List.of(account.getNumber()), () -> delete(account), accountLockExecutor)
.whenComplete((ignored, throwable) -> {
timerContext.close();
if (throwable == null) {
Metrics.counter(DELETE_COUNTER_NAME,
COUNTRY_CODE_TAG_NAME, Util.getCountryCode(account.getNumber()),
DELETION_REASON_TAG_NAME, deletionReason.tagValue)
.increment();
} else {
logger.warn("Failed to delete account", throwable);
}
});
}
private void delete(final Account account) {
final CompletableFuture<Void> deleteStorageServiceDataFuture = secureStorageClient.deleteStoredData(
account.getUuid());
final CompletableFuture<Void> deleteBackupServiceDataFuture = secureBackupClient.deleteBackups(account.getUuid());
final CompletableFuture<Void> deleteSecureValueRecoveryServiceDataFuture = secureValueRecovery2Client.deleteBackups(
account.getUuid());
final CompletableFuture<Void> deleteKeysFuture = CompletableFuture.allOf(
keysManager.delete(account.getUuid()),
keysManager.delete(account.getPhoneNumberIdentifier()));
final CompletableFuture<Void> deleteMessagesFuture = CompletableFuture.allOf(
messagesManager.clear(account.getUuid()),
messagesManager.clear(account.getPhoneNumberIdentifier()));
profilesManager.deleteAll(account.getUuid());
registrationRecoveryPasswordsManager.removeForNumber(account.getNumber());
deleteKeysFuture.join();
deleteMessagesFuture.join();
deleteStorageServiceDataFuture.join();
deleteBackupServiceDataFuture.join();
deleteSecureValueRecoveryServiceDataFuture.join();
accounts.delete(account.getUuid());
redisDelete(account);
RedisOperation.unchecked(() ->
account.getDevices().forEach(device ->
clientPresenceManager.disconnectPresence(account.getUuid(), device.getId())));
private CompletableFuture<Void> delete(final Account account) {
return CompletableFuture.allOf(
secureStorageClient.deleteStoredData(account.getUuid()),
secureBackupClient.deleteBackups(account.getUuid()),
secureValueRecovery2Client.deleteBackups(account.getUuid()),
keysManager.delete(account.getUuid()),
keysManager.delete(account.getPhoneNumberIdentifier()),
messagesManager.clear(account.getUuid()),
messagesManager.clear(account.getPhoneNumberIdentifier()),
profilesManager.deleteAll(account.getUuid()),
registrationRecoveryPasswordsManager.removeForNumber(account.getNumber()))
.thenCompose(ignored -> CompletableFuture.allOf(accounts.delete(account.getUuid()), redisDeleteAsync(account)))
.thenRun(() -> RedisOperation.unchecked(() ->
account.getDevices().forEach(device ->
clientPresenceManager.disconnectPresence(account.getUuid(), device.getId()))));
}
private String getUsernameHashAccountMapKey(byte[] usernameHash) {

View File

@@ -22,6 +22,8 @@ import org.apache.commons.lang3.StringUtils;
import org.whispersystems.textsecuregcm.util.AsyncTimerUtil;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import org.whispersystems.textsecuregcm.util.Util;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
@@ -30,7 +32,6 @@ import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.services.dynamodb.paginators.QueryIterable;
public class Profiles {
@@ -77,6 +78,8 @@ public class Profiles {
private static final Timer DELETE_PROFILES_TIMER = Metrics.timer(name(Profiles.class, "delete"));
private static final String PARSE_BYTE_ARRAY_COUNTER_NAME = name(Profiles.class, "parseByteArray");
private static final int MAX_CONCURRENCY = 32;
public Profiles(final DynamoDbClient dynamoDbClient,
final DynamoDbAsyncClient dynamoDbAsyncClient,
final String tableName) {
@@ -244,27 +247,28 @@ public class Profiles {
return AttributeValues.extractByteArray(attributeValue, PARSE_BYTE_ARRAY_COUNTER_NAME);
}
public void deleteAll(final UUID uuid) {
DELETE_PROFILES_TIMER.record(() -> {
final AttributeValue uuidAttributeValue = AttributeValues.fromUUID(uuid);
public CompletableFuture<Void> deleteAll(final UUID uuid) {
final Timer.Sample sample = Timer.start();
final QueryIterable queryIterable = dynamoDbClient.queryPaginator(QueryRequest.builder()
.tableName(tableName)
.keyConditionExpression("#uuid = :uuid")
.expressionAttributeNames(Map.of("#uuid", KEY_ACCOUNT_UUID))
.expressionAttributeValues(Map.of(":uuid", uuidAttributeValue))
.projectionExpression(ATTR_VERSION)
.consistentRead(true)
.build());
final AttributeValue uuidAttributeValue = AttributeValues.fromUUID(uuid);
CompletableFuture.allOf(queryIterable.items().stream()
.map(item -> dynamoDbAsyncClient.deleteItem(DeleteItemRequest.builder()
.tableName(tableName)
.key(Map.of(
KEY_ACCOUNT_UUID, uuidAttributeValue,
ATTR_VERSION, item.get(ATTR_VERSION)))
.build()))
.toArray(CompletableFuture[]::new)).join();
});
return Flux.from(dynamoDbAsyncClient.queryPaginator(QueryRequest.builder()
.tableName(tableName)
.keyConditionExpression("#uuid = :uuid")
.expressionAttributeNames(Map.of("#uuid", KEY_ACCOUNT_UUID))
.expressionAttributeValues(Map.of(":uuid", uuidAttributeValue))
.projectionExpression(ATTR_VERSION)
.consistentRead(true)
.build())
.items())
.flatMap(item -> Mono.fromFuture(() -> dynamoDbAsyncClient.deleteItem(DeleteItemRequest.builder()
.tableName(tableName)
.key(Map.of(
KEY_ACCOUNT_UUID, uuidAttributeValue,
ATTR_VERSION, item.get(ATTR_VERSION)))
.build())), MAX_CONCURRENCY)
.doOnComplete(() -> sample.stop(DELETE_PROFILES_TIMER))
.then()
.toFuture();
}
}

View File

@@ -47,9 +47,8 @@ public class ProfilesManager {
.thenCompose(ignored -> redisSetAsync(uuid, versionedProfile));
}
public void deleteAll(UUID uuid) {
redisDelete(uuid);
profiles.deleteAll(uuid);
public CompletableFuture<Void> deleteAll(UUID uuid) {
return CompletableFuture.allOf(redisDelete(uuid), profiles.deleteAll(uuid));
}
public Optional<VersionedProfile> get(UUID uuid, String version) {
@@ -132,8 +131,10 @@ public class ProfilesManager {
}
}
private void redisDelete(UUID uuid) {
cacheCluster.useCluster(connection -> connection.sync().del(getCacheKey(uuid)));
private CompletableFuture<Void> redisDelete(UUID uuid) {
return cacheCluster.withCluster(connection -> connection.async().del(getCacheKey(uuid)))
.toCompletableFuture()
.thenRun(Util.NOOP);
}
private String getCacheKey(UUID uuid) {

View File

@@ -114,6 +114,8 @@ public class AssignUsernameCommand extends EnvironmentCommand<WhisperServerConfi
.executorService(name(getClass(), "secureValueRecoveryService-%d")).maxThreads(1).minThreads(1).build();
ExecutorService storageServiceExecutor = environment.lifecycle()
.executorService(name(getClass(), "storageService-%d")).maxThreads(1).minThreads(1).build();
ExecutorService accountLockExecutor = environment.lifecycle()
.executorService(name(getClass(), "accountLock-%d")).minThreads(1).maxThreads(1).build();
ScheduledExecutorService secureValueRecoveryServiceRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(getClass(), "secureValueRecoveryServiceRetry-%d")).threads(1).build();
ScheduledExecutorService storageServiceRetryExecutor = environment.lifecycle()
@@ -206,7 +208,7 @@ public class AssignUsernameCommand extends EnvironmentCommand<WhisperServerConfi
AccountsManager accountsManager = new AccountsManager(accounts, phoneNumberIdentifiers, cacheCluster,
accountLockManager, keys, messagesManager, profilesManager,
secureStorageClient, secureBackupClient, secureValueRecovery2Client, clientPresenceManager,
experimentEnrollmentManager, registrationRecoveryPasswordsManager, Clock.systemUTC());
experimentEnrollmentManager, registrationRecoveryPasswordsManager, accountLockExecutor, Clock.systemUTC());
final String usernameHash = namespace.getString("usernameHash");
final String encryptedUsername = namespace.getString("encryptedUsername");

View File

@@ -89,6 +89,8 @@ record CommandDependencies(
.executorService(name(name, "secureValueRecoveryService-%d")).maxThreads(8).minThreads(8).build();
ExecutorService storageServiceExecutor = environment.lifecycle()
.executorService(name(name, "storageService-%d")).maxThreads(8).minThreads(8).build();
ExecutorService accountLockExecutor = environment.lifecycle()
.executorService(name(name, "accountLock-%d")).minThreads(8).maxThreads(8).build();
ScheduledExecutorService secureValueRecoveryServiceRetryExecutor = environment.lifecycle()
.scheduledExecutorService(name(name, "secureValueRecoveryServiceRetry-%d")).threads(1).build();
@@ -185,7 +187,7 @@ record CommandDependencies(
accountLockManager, keys, messagesManager, profilesManager,
secureStorageClient, secureBackupClient, secureValueRecovery2Client,
clientPresenceManager,
experimentEnrollmentManager, registrationRecoveryPasswordsManager, clock);
experimentEnrollmentManager, registrationRecoveryPasswordsManager, accountLockExecutor, clock);
environment.lifecycle().manage(messagesCache);
environment.lifecycle().manage(clientPresenceManager);

View File

@@ -125,16 +125,13 @@ public class CrawlAccountsCommand extends EnvironmentCommand<WhisperServerConfig
);
}
case ACCOUNT_CLEANER -> {
final ExecutorService accountDeletionExecutor = environment.lifecycle()
.executorService(name(getClass(), "accountCleaner-%d")).maxThreads(workers).minThreads(workers).build();
final AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(
cacheCluster, AccountDatabaseCrawlerCache.ACCOUNT_CLEANER_PREFIX);
yield new AccountDatabaseCrawler("Account cleaner crawler",
accountsManager,
accountDatabaseCrawlerCache,
List.of(new AccountCleaner(accountsManager, accountDeletionExecutor)),
List.of(new AccountCleaner(accountsManager)),
configuration.getAccountDatabaseCrawlerConfiguration().getChunkSize()
);
}

View File

@@ -58,7 +58,7 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
Optional<Account> account = accountsManager.getByE164(user);
if (account.isPresent()) {
accountsManager.delete(account.get(), DeletionReason.ADMIN_DELETED);
accountsManager.delete(account.get(), DeletionReason.ADMIN_DELETED).join();
logger.warn("Removed " + account.get().getNumber());
} else {
logger.warn("Account not found");