mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-20 05:38:04 +01:00
Read deleted accounts by PNI rather than e164
This commit is contained in:
committed by
GitHub
parent
0e04cac800
commit
557a6ecd4f
@@ -264,7 +264,6 @@ import org.whispersystems.textsecuregcm.workers.DeleteE164RegistrationRecoveryPa
|
||||
import org.whispersystems.textsecuregcm.workers.DeleteUserCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.IdleDeviceNotificationSchedulerFactory;
|
||||
import org.whispersystems.textsecuregcm.workers.MessagePersisterServiceCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.MigrateDeletedAccountsCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.NotifyIdleDevicesCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.ProcessScheduledJobsServiceCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.RemoveExpiredAccountsCommand;
|
||||
@@ -333,7 +332,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
"Processes scheduled jobs to send notifications to idle devices",
|
||||
new IdleDeviceNotificationSchedulerFactory()));
|
||||
|
||||
bootstrap.addCommand(new MigrateDeletedAccountsCommand());
|
||||
bootstrap.addCommand(new DeleteE164RegistrationRecoveryPasswordsCommand());
|
||||
bootstrap.addCommand(new BackfillBeninPhoneNumberFormsCommand());
|
||||
}
|
||||
@@ -1104,8 +1102,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
new KeysController(rateLimiters, keysManager, accountsManager, zkSecretParams, Clock.systemUTC()),
|
||||
new KeyTransparencyController(keyTransparencyServiceClient),
|
||||
new MessageController(rateLimiters, messageByteLimitCardinalityEstimator, messageSender, receiptSender,
|
||||
accountsManager, messagesManager, pushNotificationManager, pushNotificationScheduler, reportMessageManager,
|
||||
multiRecipientMessageExecutor, messageDeliveryScheduler, clientReleaseManager,
|
||||
accountsManager, messagesManager, phoneNumberIdentifiers, pushNotificationManager, pushNotificationScheduler,
|
||||
reportMessageManager, multiRecipientMessageExecutor, messageDeliveryScheduler, clientReleaseManager,
|
||||
dynamicConfigurationManager, zkSecretParams, spamChecker, messageMetrics, messageDeliveryLoopMonitor,
|
||||
Clock.systemUTC()),
|
||||
new PaymentsController(currencyManager, paymentsCredentialsGenerator),
|
||||
|
||||
@@ -123,6 +123,7 @@ import org.whispersystems.textsecuregcm.storage.ClientReleaseManager;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
|
||||
import org.whispersystems.textsecuregcm.storage.MessagesManager;
|
||||
import org.whispersystems.textsecuregcm.storage.PhoneNumberIdentifiers;
|
||||
import org.whispersystems.textsecuregcm.storage.ReportMessageManager;
|
||||
import org.whispersystems.textsecuregcm.util.DestinationDeviceValidator;
|
||||
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
|
||||
@@ -157,6 +158,7 @@ public class MessageController {
|
||||
private final ReceiptSender receiptSender;
|
||||
private final AccountsManager accountsManager;
|
||||
private final MessagesManager messagesManager;
|
||||
private final PhoneNumberIdentifiers phoneNumberIdentifiers;
|
||||
private final PushNotificationManager pushNotificationManager;
|
||||
private final PushNotificationScheduler pushNotificationScheduler;
|
||||
private final ReportMessageManager reportMessageManager;
|
||||
@@ -216,6 +218,7 @@ public class MessageController {
|
||||
ReceiptSender receiptSender,
|
||||
AccountsManager accountsManager,
|
||||
MessagesManager messagesManager,
|
||||
PhoneNumberIdentifiers phoneNumberIdentifiers,
|
||||
PushNotificationManager pushNotificationManager,
|
||||
PushNotificationScheduler pushNotificationScheduler,
|
||||
ReportMessageManager reportMessageManager,
|
||||
@@ -234,6 +237,7 @@ public class MessageController {
|
||||
this.receiptSender = receiptSender;
|
||||
this.accountsManager = accountsManager;
|
||||
this.messagesManager = messagesManager;
|
||||
this.phoneNumberIdentifiers = phoneNumberIdentifiers;
|
||||
this.pushNotificationManager = pushNotificationManager;
|
||||
this.pushNotificationScheduler = pushNotificationScheduler;
|
||||
this.reportMessageManager = reportMessageManager;
|
||||
@@ -853,10 +857,10 @@ public class MessageController {
|
||||
@Nullable SpamReport spamReport,
|
||||
@HeaderParam(HttpHeaders.USER_AGENT) String userAgent
|
||||
) {
|
||||
|
||||
final Optional<String> sourceNumber;
|
||||
final Optional<UUID> sourceAci;
|
||||
final Optional<UUID> sourcePni;
|
||||
|
||||
if (source.startsWith("+")) {
|
||||
sourceNumber = Optional.of(source);
|
||||
final Optional<Account> maybeAccount = accountsManager.getByE164(source);
|
||||
@@ -864,8 +868,8 @@ public class MessageController {
|
||||
sourceAci = maybeAccount.map(Account::getUuid);
|
||||
sourcePni = maybeAccount.map(Account::getPhoneNumberIdentifier);
|
||||
} else {
|
||||
sourceAci = accountsManager.findRecentlyDeletedAccountIdentifier(source);
|
||||
sourcePni = Optional.ofNullable(accountsManager.getPhoneNumberIdentifier(source));
|
||||
sourcePni = Optional.ofNullable(phoneNumberIdentifiers.getPhoneNumberIdentifier(source).join());
|
||||
sourceAci = sourcePni.flatMap(accountsManager::findRecentlyDeletedAccountIdentifier);
|
||||
}
|
||||
} else {
|
||||
sourceAci = Optional.of(UUID.fromString(source));
|
||||
@@ -874,8 +878,9 @@ public class MessageController {
|
||||
|
||||
if (sourceAccount.isEmpty()) {
|
||||
logger.warn("Could not find source: {}", sourceAci.get());
|
||||
sourceNumber = accountsManager.findRecentlyDeletedE164(sourceAci.get());
|
||||
sourcePni = sourceNumber.map(accountsManager::getPhoneNumberIdentifier);
|
||||
sourcePni = accountsManager.findRecentlyDeletedPhoneNumberIdentifier(sourceAci.get());
|
||||
sourceNumber = sourcePni.flatMap(pni ->
|
||||
Util.getCanonicalNumber(phoneNumberIdentifiers.getPhoneNumber(pni).join()));
|
||||
} else {
|
||||
sourceNumber = sourceAccount.map(Account::getNumber);
|
||||
sourcePni = sourceAccount.map(Account::getPhoneNumberIdentifier);
|
||||
|
||||
@@ -139,10 +139,13 @@ public class Accounts extends AbstractDynamoDbStore {
|
||||
// unidentified access key; byte[] or null
|
||||
static final String ATTR_UAK = "UAK";
|
||||
|
||||
static final String DELETED_ACCOUNTS_KEY_ACCOUNT_E164 = "P";
|
||||
// For historical reasons, deleted-accounts PNI is stored as a string-format UUID rather than a
|
||||
// compact byte array.
|
||||
static final String DELETED_ACCOUNTS_KEY_ACCOUNT_PNI = "P";
|
||||
|
||||
static final String DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID = "U";
|
||||
static final String DELETED_ACCOUNTS_ATTR_EXPIRES = "E";
|
||||
static final String DELETED_ACCOUNTS_UUID_TO_E164_INDEX_NAME = "u_to_p";
|
||||
static final String DELETED_ACCOUNTS_UUID_TO_PNI_INDEX_NAME = "u_to_p";
|
||||
|
||||
static final String USERNAME_LINK_TO_UUID_INDEX = "ul_to_u";
|
||||
|
||||
@@ -1166,7 +1169,7 @@ public class Accounts extends AbstractDynamoDbStore {
|
||||
.put(Put.builder()
|
||||
.tableName(deletedAccountsTableName)
|
||||
.item(Map.of(
|
||||
DELETED_ACCOUNTS_KEY_ACCOUNT_E164, AttributeValues.fromString(e164),
|
||||
DELETED_ACCOUNTS_KEY_ACCOUNT_PNI, AttributeValues.fromString(e164),
|
||||
DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, AttributeValues.fromUUID(uuid),
|
||||
DELETED_ACCOUNTS_ATTR_EXPIRES, AttributeValues.fromLong(clock.instant().plus(DELETED_ACCOUNTS_TIME_TO_LIVE).getEpochSecond())))
|
||||
.build())
|
||||
@@ -1178,7 +1181,7 @@ public class Accounts extends AbstractDynamoDbStore {
|
||||
.put(Put.builder()
|
||||
.tableName(deletedAccountsTableName)
|
||||
.item(Map.of(
|
||||
DELETED_ACCOUNTS_KEY_ACCOUNT_E164, AttributeValues.fromString(pni.toString()),
|
||||
DELETED_ACCOUNTS_KEY_ACCOUNT_PNI, AttributeValues.fromString(pni.toString()),
|
||||
DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, AttributeValues.fromUUID(aci),
|
||||
DELETED_ACCOUNTS_ATTR_EXPIRES, AttributeValues.fromLong(clock.instant().plus(DELETED_ACCOUNTS_TIME_TO_LIVE).getEpochSecond())))
|
||||
.build())
|
||||
@@ -1189,7 +1192,7 @@ public class Accounts extends AbstractDynamoDbStore {
|
||||
return TransactWriteItem.builder()
|
||||
.delete(Delete.builder()
|
||||
.tableName(deletedAccountsTableName)
|
||||
.key(Map.of(DELETED_ACCOUNTS_KEY_ACCOUNT_E164, AttributeValues.fromString(e164)))
|
||||
.key(Map.of(DELETED_ACCOUNTS_KEY_ACCOUNT_PNI, AttributeValues.fromString(e164)))
|
||||
.build())
|
||||
.build();
|
||||
}
|
||||
@@ -1198,7 +1201,7 @@ public class Accounts extends AbstractDynamoDbStore {
|
||||
return TransactWriteItem.builder()
|
||||
.delete(Delete.builder()
|
||||
.tableName(deletedAccountsTableName)
|
||||
.key(Map.of(DELETED_ACCOUNTS_KEY_ACCOUNT_E164, AttributeValues.fromString(pni.toString())))
|
||||
.key(Map.of(DELETED_ACCOUNTS_KEY_ACCOUNT_PNI, AttributeValues.fromString(pni.toString())))
|
||||
.build())
|
||||
.build();
|
||||
}
|
||||
@@ -1210,34 +1213,24 @@ public class Accounts extends AbstractDynamoDbStore {
|
||||
.toCompletableFuture();
|
||||
}
|
||||
|
||||
public Optional<UUID> findRecentlyDeletedAccountIdentifier(final String e164) {
|
||||
final GetItemResponse response = db().getItem(GetItemRequest.builder()
|
||||
.tableName(deletedAccountsTableName)
|
||||
.consistentRead(true)
|
||||
.key(Map.of(DELETED_ACCOUNTS_KEY_ACCOUNT_E164, AttributeValues.fromString(e164)))
|
||||
.build());
|
||||
|
||||
return Optional.ofNullable(AttributeValues.getUUID(response.item(), DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, null));
|
||||
}
|
||||
|
||||
public Optional<UUID> findRecentlyDeletedAccountIdentifier(final UUID phoneNumberIdentifier) {
|
||||
final GetItemResponse response = db().getItem(GetItemRequest.builder()
|
||||
.tableName(deletedAccountsTableName)
|
||||
.consistentRead(true)
|
||||
.key(Map.of(DELETED_ACCOUNTS_KEY_ACCOUNT_E164, AttributeValues.fromString(phoneNumberIdentifier.toString())))
|
||||
.key(Map.of(DELETED_ACCOUNTS_KEY_ACCOUNT_PNI, AttributeValues.fromString(phoneNumberIdentifier.toString())))
|
||||
.build());
|
||||
|
||||
return Optional.ofNullable(AttributeValues.getUUID(response.item(), DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, null));
|
||||
}
|
||||
|
||||
public Optional<String> findRecentlyDeletedE164(final UUID uuid) {
|
||||
public Optional<UUID> findRecentlyDeletedPhoneNumberIdentifier(final UUID uuid) {
|
||||
final QueryResponse response = db().query(QueryRequest.builder()
|
||||
.tableName(deletedAccountsTableName)
|
||||
.indexName(DELETED_ACCOUNTS_UUID_TO_E164_INDEX_NAME)
|
||||
.indexName(DELETED_ACCOUNTS_UUID_TO_PNI_INDEX_NAME)
|
||||
.keyConditionExpression("#uuid = :uuid")
|
||||
.projectionExpression("#e164")
|
||||
.projectionExpression("#pni")
|
||||
.expressionAttributeNames(Map.of("#uuid", DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID,
|
||||
"#e164", DELETED_ACCOUNTS_KEY_ACCOUNT_E164))
|
||||
"#pni", DELETED_ACCOUNTS_KEY_ACCOUNT_PNI))
|
||||
.expressionAttributeValues(Map.of(":uuid", AttributeValues.fromUUID(uuid))).build());
|
||||
|
||||
if (response.count() == 0) {
|
||||
@@ -1245,9 +1238,10 @@ public class Accounts extends AbstractDynamoDbStore {
|
||||
}
|
||||
|
||||
return response.items().stream()
|
||||
.map(item -> item.get(DELETED_ACCOUNTS_KEY_ACCOUNT_E164).s())
|
||||
.filter(e164OrPni -> e164OrPni.startsWith("+"))
|
||||
.findFirst();
|
||||
.map(item -> item.get(DELETED_ACCOUNTS_KEY_ACCOUNT_PNI).s())
|
||||
.filter(e164OrPni -> !e164OrPni.startsWith("+"))
|
||||
.findFirst()
|
||||
.map(UUID::fromString);
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> delete(final UUID uuid, final List<TransactWriteItem> additionalWriteItems) {
|
||||
@@ -1313,51 +1307,13 @@ public class Accounts extends AbstractDynamoDbStore {
|
||||
.items())
|
||||
.map(item ->
|
||||
Tuples.of(
|
||||
item.get(DELETED_ACCOUNTS_KEY_ACCOUNT_E164).s(),
|
||||
item.get(DELETED_ACCOUNTS_KEY_ACCOUNT_PNI).s(),
|
||||
AttributeValues.getUUID(item, DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, null),
|
||||
AttributeValues.getLong(item, DELETED_ACCOUNTS_ATTR_EXPIRES, 0)))
|
||||
.filter(item -> item.getT1().startsWith("+"))
|
||||
.sequential();
|
||||
}
|
||||
|
||||
public CompletableFuture<Boolean> insertPniDeletedAccount(final String e164, final UUID pni, final UUID aci, final long expiration) {
|
||||
// This happens under a pessimistic lock, but that wasn't taken before we found the record we want to migrate,
|
||||
// so make sure the e164 record is unchanged before updating the PNI record
|
||||
return asyncClient.getItem(GetItemRequest.builder()
|
||||
.tableName(deletedAccountsTableName)
|
||||
.consistentRead(true)
|
||||
.key(Map.of(DELETED_ACCOUNTS_KEY_ACCOUNT_E164, AttributeValues.fromString(e164.toString())))
|
||||
.build())
|
||||
.thenComposeAsync(getItemResponse ->
|
||||
getItemResponse.hasItem()
|
||||
&& AttributeValues.getString(
|
||||
getItemResponse.item(), DELETED_ACCOUNTS_KEY_ACCOUNT_E164, "").equals(e164)
|
||||
&& AttributeValues.getUUID(
|
||||
getItemResponse.item(), DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, UUID.randomUUID()).equals(aci)
|
||||
&& AttributeValues.getLong(
|
||||
getItemResponse.item(), DELETED_ACCOUNTS_ATTR_EXPIRES, 0) == expiration
|
||||
? asyncClient.putItem(
|
||||
PutItemRequest.builder()
|
||||
.tableName(deletedAccountsTableName)
|
||||
.item(
|
||||
Map.of(
|
||||
DELETED_ACCOUNTS_KEY_ACCOUNT_E164, AttributeValues.fromString(pni.toString()),
|
||||
DELETED_ACCOUNTS_ATTR_ACCOUNT_UUID, AttributeValues.fromUUID(aci),
|
||||
DELETED_ACCOUNTS_ATTR_EXPIRES, AttributeValues.fromLong(expiration)))
|
||||
.conditionExpression("attribute_not_exists(#key)")
|
||||
.expressionAttributeNames(Map.of("#key", DELETED_ACCOUNTS_KEY_ACCOUNT_E164))
|
||||
.build())
|
||||
.thenApply(ignored -> true)
|
||||
.exceptionally(throwable -> {
|
||||
if (ExceptionUtils.unwrap(throwable) instanceof ConditionalCheckFailedException) {
|
||||
// there was already a PNI record; no problem, do nothing
|
||||
return false;
|
||||
}
|
||||
throw ExceptionUtils.wrap(throwable);
|
||||
})
|
||||
: CompletableFuture.completedFuture(false));
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
private Optional<Account> getByIndirectLookup(
|
||||
final Timer timer,
|
||||
|
||||
@@ -277,7 +277,7 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
||||
return createTimer.record(() -> {
|
||||
accountLockManager.withLock(List.of(pni), () -> {
|
||||
final Optional<UUID> maybeRecentlyDeletedAccountIdentifier =
|
||||
accounts.findRecentlyDeletedAccountIdentifier(number);
|
||||
accounts.findRecentlyDeletedAccountIdentifier(pni);
|
||||
|
||||
// Reuse the ACI from any recently-deleted account with this number to cover cases where somebody is
|
||||
// re-registering.
|
||||
@@ -654,16 +654,16 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
||||
|
||||
// There are three possible states for accounts associated with the target phone number:
|
||||
//
|
||||
// 1. An account exists with the target number; the caller has proved ownership of the number, so delete the
|
||||
// account with the target number. This will leave a "deleted account" record for the deleted account mapping
|
||||
// the UUID of the deleted account to the target phone number. We'll then overwrite that so it points to the
|
||||
// original number to facilitate switching back and forth between numbers.
|
||||
// 2. No account with the target number exists, but one has recently been deleted. In that case, add a "deleted
|
||||
// account" record that maps the ACI of the recently-deleted account to the now-abandoned original phone number
|
||||
// 1. An account exists with the target PNI; the caller has proved ownership of the number, so delete the
|
||||
// account with the target PNI. This will leave a "deleted account" record for the deleted account mapping
|
||||
// the UUID of the deleted account to the target PNI. We'll then overwrite that so it points to the
|
||||
// original PNI to facilitate switching back and forth between numbers.
|
||||
// 2. No account with the target PNI exists, but one has recently been deleted. In that case, add a "deleted
|
||||
// account" record that maps the ACI of the recently-deleted account to the now-abandoned original PNI
|
||||
// of the account changing its number (which facilitates ACI consistency in cases that a party is switching
|
||||
// back and forth between numbers).
|
||||
// 3. No account with the target number exists at all, in which case no additional action is needed.
|
||||
final Optional<UUID> recentlyDeletedAci = accounts.findRecentlyDeletedAccountIdentifier(targetNumber);
|
||||
// 3. No account with the target PNI exists at all, in which case no additional action is needed.
|
||||
final Optional<UUID> recentlyDeletedAci = accounts.findRecentlyDeletedAccountIdentifier(targetPhoneNumberIdentifier);
|
||||
final Optional<Account> maybeExistingAccount = getByE164(targetNumber);
|
||||
final Optional<UUID> maybeDisplacedUuid;
|
||||
|
||||
@@ -1205,31 +1205,18 @@ public class AccountsManager extends RedisPubSubAdapter<String, String> implemen
|
||||
return phoneNumberIdentifiers.getPhoneNumberIdentifier(e164).join();
|
||||
}
|
||||
|
||||
public Optional<UUID> findRecentlyDeletedAccountIdentifier(final String e164) {
|
||||
return accounts.findRecentlyDeletedAccountIdentifier(e164);
|
||||
public Optional<UUID> findRecentlyDeletedAccountIdentifier(final UUID phoneNumberIdentifier) {
|
||||
return accounts.findRecentlyDeletedAccountIdentifier(phoneNumberIdentifier);
|
||||
}
|
||||
|
||||
public Optional<String> findRecentlyDeletedE164(final UUID uuid) {
|
||||
return accounts.findRecentlyDeletedE164(uuid);
|
||||
public Optional<UUID> findRecentlyDeletedPhoneNumberIdentifier(final UUID accountIdentifier) {
|
||||
return accounts.findRecentlyDeletedPhoneNumberIdentifier(accountIdentifier);
|
||||
}
|
||||
|
||||
public Flux<Account> streamAllFromDynamo(final int segments, final Scheduler scheduler) {
|
||||
return accounts.getAll(segments, scheduler);
|
||||
}
|
||||
|
||||
public Flux<Tuple3<String, UUID, Long>> getE164KeyedDeletedAccounts(final int segments, final Scheduler scheduler) {
|
||||
return accounts.getE164KeyedDeletedAccounts(segments, scheduler);
|
||||
}
|
||||
|
||||
public CompletableFuture<Boolean> migrateDeletedAccount(final String e164, final UUID aci, final long expiration) {
|
||||
return phoneNumberIdentifiers.getPhoneNumberIdentifier(e164)
|
||||
.thenCompose(
|
||||
pni -> accountLockManager.withLockAsync(
|
||||
List.of(pni),
|
||||
() -> accounts.insertPniDeletedAccount(e164, pni, aci, expiration),
|
||||
accountLockExecutor));
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> delete(final Account account, final DeletionReason deletionReason) {
|
||||
final Timer.Sample sample = Timer.start();
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ import io.micrometer.core.instrument.Timer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.function.Supplier;
|
||||
@@ -30,7 +31,10 @@ import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
||||
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
|
||||
import software.amazon.awssdk.services.dynamodb.model.BatchGetItemRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.CancellationReason;
|
||||
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.KeysAndAttributes;
|
||||
import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ReturnValue;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ReturnValuesOnConditionCheckFailure;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
|
||||
@@ -84,6 +88,34 @@ public class PhoneNumberIdentifiers {
|
||||
.thenCompose(mappings -> setPniIfRequired(phoneNumber, allPhoneNumberForms, mappings)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the list of phone numbers associated with a given phone number identifier. If this
|
||||
* UUID was not previously assigned as a PNI by {@link #getPhoneNumberIdentifier(String)}, the
|
||||
* returned list will be empty.
|
||||
*
|
||||
* @param UUID a phone number identifier
|
||||
* @return the list of all e164s associated with the given phone number identifier
|
||||
*/
|
||||
public CompletableFuture<List<String>> getPhoneNumber(final UUID phoneNumberIdentifier) {
|
||||
return dynamoDbClient.query(QueryRequest.builder()
|
||||
.tableName(tableName)
|
||||
.indexName(INDEX_NAME)
|
||||
.keyConditionExpression("#pni = :pni")
|
||||
.projectionExpression("#phone_number")
|
||||
.expressionAttributeNames(Map.of(
|
||||
"#phone_number", KEY_E164,
|
||||
"#pni", ATTR_PHONE_NUMBER_IDENTIFIER
|
||||
))
|
||||
.expressionAttributeValues(Map.of(
|
||||
":pni", AttributeValues.fromUUID(phoneNumberIdentifier)
|
||||
))
|
||||
.build())
|
||||
.thenApply(response -> {
|
||||
return response.items().stream().map(item -> item.get(KEY_E164).s()).toList();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@VisibleForTesting
|
||||
static <T, E extends Exception> CompletableFuture<T> retry(
|
||||
final int numRetries, final Class<E> exceptionToRetry, final Supplier<CompletableFuture<T>> supplier) {
|
||||
|
||||
@@ -14,6 +14,7 @@ import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
@@ -24,6 +25,8 @@ import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
import java.util.random.RandomGenerator;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
public class Util {
|
||||
@@ -145,6 +148,44 @@ public class Util {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the preferred form of an e164 from a list of equivalents. Only use this when there is no other reason (such
|
||||
* as the form specifically provided by a user) to prefer a particular form and we want to reduce nondeterminism.
|
||||
*
|
||||
* @apiNote This method is intended to support number format transitions in cases where we do not already have
|
||||
* multiple accounts registered with different forms of the same number. As a result, this method does not cover all
|
||||
* possible cases of equivalent formats, but instead focuses on the cases where we can and choose to prevent multiple
|
||||
* accounts from using different formats of the same number.
|
||||
*
|
||||
* @param e164s a list of equivalent forms of a single phone number
|
||||
*
|
||||
* @return a single preferred canonical form for the number
|
||||
*/
|
||||
public static Optional<String> getCanonicalNumber(List<String> e164s) {
|
||||
if (e164s.size() <= 1) {
|
||||
return e164s.stream().findFirst();
|
||||
}
|
||||
try {
|
||||
final List<PhoneNumber> phoneNumbers = new ArrayList<>(e164s.size());
|
||||
for (String e164 : e164s) {
|
||||
phoneNumbers.add(PHONE_NUMBER_UTIL.parse(e164, null));
|
||||
}
|
||||
final Set<String> regions = phoneNumbers.stream().map(PHONE_NUMBER_UTIL::getRegionCodeForNumber).collect(Collectors.toSet());
|
||||
if (regions.size() != 1) {
|
||||
throw new IllegalArgumentException("Numbers from different countries cannot be equivalent alternate forms");
|
||||
}
|
||||
if (regions.contains("BJ")) {
|
||||
// Benin is changing phone number formats from +229 XXXXXXXX to +229 01XXXXXXXX starting on November 30, 2024
|
||||
// We prefer the longest form for long-term stability
|
||||
return e164s.stream().sorted(Comparator.comparingInt(String::length).reversed()).findFirst();
|
||||
}
|
||||
// No matching country; fall back to something that's at least stable
|
||||
return e164s.stream().sorted().findFirst();
|
||||
} catch (final NumberParseException e) {
|
||||
return e164s.stream().sorted().findFirst();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests whether the decimal form of the given number (without leading zeroes) begins with the decimal form of the
|
||||
* given prefix (without leading zeroes).
|
||||
|
||||
@@ -1,115 +0,0 @@
|
||||
/*
|
||||
* Copyright 2024 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.workers;
|
||||
|
||||
import io.dropwizard.core.Application;
|
||||
import io.dropwizard.core.setup.Environment;
|
||||
import io.micrometer.core.instrument.Counter;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import net.sourceforge.argparse4j.inf.Namespace;
|
||||
import net.sourceforge.argparse4j.inf.Subparser;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
import reactor.util.retry.Retry;
|
||||
import java.time.Duration;
|
||||
|
||||
public class MigrateDeletedAccountsCommand extends AbstractCommandWithDependencies {
|
||||
|
||||
private static final String RECORDS_INSPECTED_COUNTER_NAME =
|
||||
MetricsUtil.name(MigrateDeletedAccountsCommand.class, "recordsInspected");
|
||||
|
||||
private static final String RECORDS_MIGRATED_COUNTER_NAME =
|
||||
MetricsUtil.name(MigrateDeletedAccountsCommand.class, "recordsMigrated");
|
||||
|
||||
private static final String DRY_RUN_TAG = "dryRun";
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
|
||||
private static final String SEGMENT_COUNT_ARGUMENT = "segments";
|
||||
private static final String DRY_RUN_ARGUMENT = "dry-run";
|
||||
private static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency";
|
||||
|
||||
private static final int DEFAULT_SEGMENT_COUNT = 1;
|
||||
private static final int DEFAULT_CONCURRENCY = 16;
|
||||
|
||||
public MigrateDeletedAccountsCommand() {
|
||||
super(new Application<>() {
|
||||
@Override
|
||||
public void run(final WhisperServerConfiguration configuration, final Environment environment) {
|
||||
}
|
||||
}, "migrate-deleted-accounts", "Migrates recently-deleted account records from E164 to PNI-keyed schema");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(final Subparser subparser) {
|
||||
super.configure(subparser);
|
||||
|
||||
subparser.addArgument("--segments")
|
||||
.type(Integer.class)
|
||||
.dest(SEGMENT_COUNT_ARGUMENT)
|
||||
.required(false)
|
||||
.setDefault(DEFAULT_SEGMENT_COUNT)
|
||||
.help("The total number of segments for a DynamoDB scan");
|
||||
|
||||
subparser.addArgument("--max-concurrency")
|
||||
.type(Integer.class)
|
||||
.dest(MAX_CONCURRENCY_ARGUMENT)
|
||||
.required(false)
|
||||
.setDefault(DEFAULT_CONCURRENCY)
|
||||
.help("Max concurrency for migrations.");
|
||||
|
||||
subparser.addArgument("--dry-run")
|
||||
.type(Boolean.class)
|
||||
.dest(DRY_RUN_ARGUMENT)
|
||||
.required(false)
|
||||
.setDefault(true)
|
||||
.help("If true, don’t actually migrate any deleted accounts records");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void run(final Environment environment, final Namespace namespace,
|
||||
final WhisperServerConfiguration configuration, final CommandDependencies commandDependencies) throws Exception {
|
||||
final int segments = namespace.getInt(SEGMENT_COUNT_ARGUMENT);
|
||||
final int concurrency = namespace.getInt(MAX_CONCURRENCY_ARGUMENT);
|
||||
final boolean dryRun = namespace.getBoolean(DRY_RUN_ARGUMENT);
|
||||
|
||||
logger.info("Crawling deleted accounts with {} segments and {} processors",
|
||||
segments,
|
||||
Runtime.getRuntime().availableProcessors());
|
||||
|
||||
final Counter recordsInspectedCounter =
|
||||
Metrics.counter(RECORDS_INSPECTED_COUNTER_NAME, DRY_RUN_TAG, String.valueOf(dryRun));
|
||||
|
||||
final Counter recordsMigratedCounter =
|
||||
Metrics.counter(RECORDS_MIGRATED_COUNTER_NAME, DRY_RUN_TAG, String.valueOf(dryRun));
|
||||
|
||||
final AccountsManager accounts = commandDependencies.accountsManager();
|
||||
|
||||
accounts.getE164KeyedDeletedAccounts(segments, Schedulers.parallel())
|
||||
.doOnNext(tuple -> recordsInspectedCounter.increment())
|
||||
.flatMap(
|
||||
tuple -> dryRun
|
||||
? Mono.just(false)
|
||||
: Mono.fromFuture(
|
||||
accounts.migrateDeletedAccount(
|
||||
tuple.getT1(), tuple.getT2(), tuple.getT3()))
|
||||
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
|
||||
.onErrorResume(throwable -> {
|
||||
logger.warn("Failed to migrate record for {}", tuple.getT1(), throwable);
|
||||
return Mono.empty();
|
||||
}),
|
||||
concurrency)
|
||||
.filter(migrated -> migrated)
|
||||
.doOnNext(ignored -> recordsMigratedCounter.increment())
|
||||
.then()
|
||||
.block();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user