Create deleted-accounts records keyed by both e164 and PNI

This commit is contained in:
Jonathan Klabunde Tomer
2024-11-25 12:42:16 -08:00
committed by GitHub
parent 49d6a5e32d
commit ffed19d198
5 changed files with 362 additions and 5 deletions

View File

@@ -262,6 +262,7 @@ import org.whispersystems.textsecuregcm.workers.CheckDynamicConfigurationCommand
import org.whispersystems.textsecuregcm.workers.DeleteUserCommand;
import org.whispersystems.textsecuregcm.workers.IdleDeviceNotificationSchedulerFactory;
import org.whispersystems.textsecuregcm.workers.MessagePersisterServiceCommand;
import org.whispersystems.textsecuregcm.workers.MigrateDeletedAccountsCommand;
import org.whispersystems.textsecuregcm.workers.MigrateRegistrationRecoveryPasswordsCommand;
import org.whispersystems.textsecuregcm.workers.NotifyIdleDevicesCommand;
import org.whispersystems.textsecuregcm.workers.ProcessScheduledJobsServiceCommand;
@@ -331,6 +332,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
"Processes scheduled jobs to send notifications to idle devices",
new IdleDeviceNotificationSchedulerFactory()));
bootstrap.addCommand(new MigrateDeletedAccountsCommand());
bootstrap.addCommand(new MigrateRegistrationRecoveryPasswordsCommand());
}

View File

@@ -47,6 +47,8 @@ import org.whispersystems.textsecuregcm.util.UUIDUtil;
import org.whispersystems.textsecuregcm.util.Util;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
@@ -57,6 +59,7 @@ import software.amazon.awssdk.services.dynamodb.model.Delete;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
import software.amazon.awssdk.services.dynamodb.model.Put;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
import software.amazon.awssdk.services.dynamodb.model.ReturnValuesOnConditionCheckFailure;
@@ -439,8 +442,10 @@ public class Accounts extends AbstractDynamoDbStore {
writeItems.add(buildConstraintTablePut(phoneNumberIdentifierConstraintTableName, uuidAttr, ATTR_PNI_UUID, pniAttr));
writeItems.add(buildRemoveDeletedAccount(number));
writeItems.add(buildRemoveDeletedAccount(phoneNumberIdentifier));
maybeDisplacedAccountIdentifier.ifPresent(displacedAccountIdentifier ->
writeItems.add(buildPutDeletedAccount(displacedAccountIdentifier, originalNumber)));
maybeDisplacedAccountIdentifier.ifPresent(displacedAccountIdentifier -> {
writeItems.add(buildPutDeletedAccount(displacedAccountIdentifier, originalNumber));
writeItems.add(buildPutDeletedAccount(displacedAccountIdentifier, originalPni));
});
// The `catch (TransactionCanceledException) block needs to check whether the cancellation reason is the account
// update write item
@@ -1163,7 +1168,19 @@ public class Accounts extends AbstractDynamoDbStore {
.item(Map.of(
DELETED_ACCOUNTS_KEY_ACCOUNT_E164, AttributeValues.fromString(e164),
DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, AttributeValues.fromUUID(uuid),
DELETED_ACCOUNTS_ATTR_EXPIRES, AttributeValues.fromLong(Instant.now().plus(DELETED_ACCOUNTS_TIME_TO_LIVE).getEpochSecond())))
DELETED_ACCOUNTS_ATTR_EXPIRES, AttributeValues.fromLong(clock.instant().plus(DELETED_ACCOUNTS_TIME_TO_LIVE).getEpochSecond())))
.build())
.build();
}
private TransactWriteItem buildPutDeletedAccount(final UUID aci, final UUID pni) {
return TransactWriteItem.builder()
.put(Put.builder()
.tableName(deletedAccountsTableName)
.item(Map.of(
DELETED_ACCOUNTS_KEY_ACCOUNT_E164, AttributeValues.fromString(pni.toString()),
DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, AttributeValues.fromUUID(aci),
DELETED_ACCOUNTS_ATTR_EXPIRES, AttributeValues.fromLong(clock.instant().plus(DELETED_ACCOUNTS_TIME_TO_LIVE).getEpochSecond())))
.build())
.build();
}
@@ -1203,6 +1220,16 @@ public class Accounts extends AbstractDynamoDbStore {
return Optional.ofNullable(AttributeValues.getUUID(response.item(), DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, null));
}
public Optional<UUID> findRecentlyDeletedAccountIdentifier(final UUID phoneNumberIdentifier) {
final GetItemResponse response = db().getItem(GetItemRequest.builder()
.tableName(deletedAccountsTableName)
.consistentRead(true)
.key(Map.of(DELETED_ACCOUNTS_KEY_ACCOUNT_E164, AttributeValues.fromString(phoneNumberIdentifier.toString())))
.build());
return Optional.ofNullable(AttributeValues.getUUID(response.item(), DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, null));
}
public Optional<String> findRecentlyDeletedE164(final UUID uuid) {
final QueryResponse response = db().query(QueryRequest.builder()
.tableName(deletedAccountsTableName)
@@ -1232,7 +1259,8 @@ public class Accounts extends AbstractDynamoDbStore {
buildDelete(phoneNumberConstraintTableName, ATTR_ACCOUNT_E164, account.getNumber()),
buildDelete(accountsTableName, KEY_ACCOUNT_UUID, uuid),
buildDelete(phoneNumberIdentifierConstraintTableName, ATTR_PNI_UUID, account.getPhoneNumberIdentifier()),
buildPutDeletedAccount(uuid, account.getNumber())
buildPutDeletedAccount(uuid, account.getNumber()),
buildPutDeletedAccount(uuid, account.getPhoneNumberIdentifier())
));
account.getUsernameHash().ifPresent(usernameHash -> transactWriteItems.add(
@@ -1268,6 +1296,68 @@ public class Accounts extends AbstractDynamoDbStore {
.sequential();
}
public Flux<Tuple3<String, UUID, Long>> getE164KeyedDeletedAccounts(final int segments, final Scheduler scheduler) {
if (segments < 1) {
throw new IllegalArgumentException("Total number of segments must be positive");
}
return Flux.range(0, segments)
.parallel()
.runOn(scheduler)
.flatMap(segment -> asyncClient.scanPaginator(ScanRequest.builder()
.tableName(deletedAccountsTableName)
.consistentRead(true)
.segment(segment)
.totalSegments(segments)
.build())
.items())
.map(item ->
Tuples.of(
item.get(DELETED_ACCOUNTS_KEY_ACCOUNT_E164).s(),
AttributeValues.getUUID(item, DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, null),
AttributeValues.getLong(item, DELETED_ACCOUNTS_ATTR_EXPIRES, 0)))
.filter(item -> item.getT1().startsWith("+"))
.sequential();
}
public CompletableFuture<Boolean> insertPniDeletedAccount(final String e164, final UUID pni, final UUID aci, final long expiration) {
// This happens under a pessimistic lock, but that wasn't taken before we found the record we want to migrate,
// so make sure the e164 record is unchanged before updating the PNI record
return asyncClient.getItem(GetItemRequest.builder()
.tableName(deletedAccountsTableName)
.consistentRead(true)
.key(Map.of(DELETED_ACCOUNTS_KEY_ACCOUNT_E164, AttributeValues.fromString(e164.toString())))
.build())
.thenComposeAsync(getItemResponse ->
getItemResponse.hasItem()
&& AttributeValues.getString(
getItemResponse.item(), DELETED_ACCOUNTS_KEY_ACCOUNT_E164, "").equals(e164)
&& AttributeValues.getUUID(
getItemResponse.item(), DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, UUID.randomUUID()).equals(aci)
&& AttributeValues.getLong(
getItemResponse.item(), DELETED_ACCOUNTS_ATTR_EXPIRES, 0) == expiration
? asyncClient.putItem(
PutItemRequest.builder()
.tableName(deletedAccountsTableName)
.item(
Map.of(
DELETED_ACCOUNTS_KEY_ACCOUNT_E164, AttributeValues.fromString(pni.toString()),
DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, AttributeValues.fromUUID(aci),
DELETED_ACCOUNTS_ATTR_EXPIRES, AttributeValues.fromLong(expiration)))
.conditionExpression("attribute_not_exists(#key)")
.expressionAttributeNames(Map.of("#key", DELETED_ACCOUNTS_KEY_ACCOUNT_E164))
.build())
.thenApply(ignored -> true)
.exceptionally(throwable -> {
if (ExceptionUtils.unwrap(throwable) instanceof ConditionalCheckFailedException) {
// there was already a PNI record; no problem, do nothing
return false;
}
throw ExceptionUtils.wrap(throwable);
})
: CompletableFuture.completedFuture(false));
}
@Nonnull
private Optional<Account> getByIndirectLookup(
final Timer timer,

View File

@@ -90,6 +90,7 @@ import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.Util;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.util.function.Tuple3;
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException;
@@ -1216,6 +1217,19 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
return accounts.getAll(segments, scheduler);
}
public Flux<Tuple3<String, UUID, Long>> getE164KeyedDeletedAccounts(final int segments, final Scheduler scheduler) {
return accounts.getE164KeyedDeletedAccounts(segments, scheduler);
}
public CompletableFuture<Boolean> migrateDeletedAccount(final String e164, final UUID aci, final long expiration) {
return phoneNumberIdentifiers.getPhoneNumberIdentifier(e164)
.thenCompose(
pni -> accountLockManager.withLockAsync(
List.of(pni),
() -> accounts.insertPniDeletedAccount(e164, pni, aci, expiration),
accountLockExecutor));
}
public CompletableFuture<Void> delete(final Account account, final DeletionReason deletionReason) {
final Timer.Sample sample = Timer.start();

View File

@@ -0,0 +1,109 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.workers;
import io.dropwizard.core.Application;
import io.dropwizard.core.setup.Environment;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class MigrateDeletedAccountsCommand extends AbstractCommandWithDependencies {
private static final String RECORDS_INSPECTED_COUNTER_NAME =
MetricsUtil.name(MigrateDeletedAccountsCommand.class, "recordsInspected");
private static final String RECORDS_MIGRATED_COUNTER_NAME =
MetricsUtil.name(MigrateDeletedAccountsCommand.class, "recordsMigrated");
private static final String DRY_RUN_TAG = "dryRun";
private final Logger logger = LoggerFactory.getLogger(getClass());
private static final String SEGMENT_COUNT_ARGUMENT = "segments";
private static final String DRY_RUN_ARGUMENT = "dry-run";
private static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency";
private static final int DEFAULT_SEGMENT_COUNT = 1;
private static final int DEFAULT_CONCURRENCY = 16;
public MigrateDeletedAccountsCommand() {
super(new Application<>() {
@Override
public void run(final WhisperServerConfiguration configuration, final Environment environment) {
}
}, "migrate-deleted-accounts", "Migrates recently-deleted account records from E164 to PNI-keyed schema");
}
@Override
public void configure(final Subparser subparser) {
super.configure(subparser);
subparser.addArgument("--segments")
.type(Integer.class)
.dest(SEGMENT_COUNT_ARGUMENT)
.required(false)
.setDefault(DEFAULT_SEGMENT_COUNT)
.help("The total number of segments for a DynamoDB scan");
subparser.addArgument("--max-concurrency")
.type(Integer.class)
.dest(MAX_CONCURRENCY_ARGUMENT)
.required(false)
.setDefault(DEFAULT_CONCURRENCY)
.help("Max concurrency for migrations.");
subparser.addArgument("--dry-run")
.type(Boolean.class)
.dest(DRY_RUN_ARGUMENT)
.required(false)
.setDefault(true)
.help("If true, dont actually migrate any deleted accounts records");
}
@Override
protected void run(final Environment environment, final Namespace namespace,
final WhisperServerConfiguration configuration, final CommandDependencies commandDependencies) throws Exception {
final int segments = namespace.getInt(SEGMENT_COUNT_ARGUMENT);
final int concurrency = namespace.getInt(MAX_CONCURRENCY_ARGUMENT);
final boolean dryRun = namespace.getBoolean(DRY_RUN_ARGUMENT);
final String deletedAccountsTableName = configuration.getDynamoDbTables().getDeletedAccounts().getTableName();
logger.info("Crawling deleted accounts with {} segments and {} processors",
segments,
Runtime.getRuntime().availableProcessors());
final Counter recordsInspectedCounter =
Metrics.counter(RECORDS_INSPECTED_COUNTER_NAME, DRY_RUN_TAG, String.valueOf(dryRun));
final Counter recordsMigratedCounter =
Metrics.counter(RECORDS_MIGRATED_COUNTER_NAME, DRY_RUN_TAG, String.valueOf(dryRun));
final AccountsManager accounts = commandDependencies.accountsManager();
accounts.getE164KeyedDeletedAccounts(segments, Schedulers.parallel())
.doOnNext(tuple -> recordsInspectedCounter.increment())
.flatMap(
tuple -> dryRun
? Mono.just(false)
: Mono.fromFuture(
accounts.migrateDeletedAccount(
tuple.getT1(), tuple.getT2(), tuple.getT3())),
concurrency)
.filter(migrated -> migrated)
.doOnNext(ignored -> recordsMigratedCounter.increment())
.then()
.block();
}
}