Regenerate phone number identifiers when regenerating secondary table data

This commit is contained in:
Jon Chambers
2025-06-05 15:12:33 -04:00
committed by GitHub
parent 981d929f50
commit 1a7a446150
11 changed files with 111 additions and 50 deletions

View File

@@ -270,7 +270,7 @@ import org.whispersystems.textsecuregcm.workers.IdleDeviceNotificationSchedulerF
import org.whispersystems.textsecuregcm.workers.MessagePersisterServiceCommand;
import org.whispersystems.textsecuregcm.workers.NotifyIdleDevicesCommand;
import org.whispersystems.textsecuregcm.workers.ProcessScheduledJobsServiceCommand;
import org.whispersystems.textsecuregcm.workers.RegenerateAccountConstraintDataCommand;
import org.whispersystems.textsecuregcm.workers.RegenerateSecondaryDynamoDbTableDataCommand;
import org.whispersystems.textsecuregcm.workers.RemoveExpiredAccountsCommand;
import org.whispersystems.textsecuregcm.workers.RemoveExpiredBackupsCommand;
import org.whispersystems.textsecuregcm.workers.RemoveExpiredLinkedDevicesCommand;
@@ -337,7 +337,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
"Processes scheduled jobs to send notifications to idle devices",
new IdleDeviceNotificationSchedulerFactory()));
bootstrap.addCommand(new RegenerateAccountConstraintDataCommand());
bootstrap.addCommand(new RegenerateSecondaryDynamoDbTableDataCommand());
}
@Override

View File

@@ -1472,7 +1472,7 @@ public class Accounts {
.build();
}
public CompletableFuture<Void> regenerateConstraints(final Account account) {
CompletableFuture<Void> regenerateConstraints(final Account account) {
final List<CompletableFuture<?>> constraintFutures = new ArrayList<>();
constraintFutures.add(writeConstraint(phoneNumberConstraintTableName,

View File

@@ -0,0 +1,37 @@
/*
* Copyright 2025 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import java.util.concurrent.CompletableFuture;
/**
* The DynamoDB recovery manager regenerates data for secondary tables in a disaster recovery scenario. In a disaster
* recovery scenario, there is no guarantee that table backups will be consistent, and so we need to derive or update
* some tables from a "core" data source to ensure consistency.
*/
public class DynamoDbRecoveryManager {
private final Accounts accounts;
private final PhoneNumberIdentifiers phoneNumberIdentifiers;
public DynamoDbRecoveryManager(final Accounts accounts, final PhoneNumberIdentifiers phoneNumberIdentifiers) {
this.accounts = accounts;
this.phoneNumberIdentifiers = phoneNumberIdentifiers;
}
/**
* Regenerates secondary data (i.e. uniqueness constraints) for a given account.
*
* @param account the account for which to regenerate secondary data
*
* @return a future that completes when secondary for the given account has been regenerated
*/
public CompletableFuture<Void> regenerateData(final Account account) {
return CompletableFuture.allOf(
accounts.regenerateConstraints(account),
phoneNumberIdentifiers.regeneratePhoneNumberIdentifierMappings(account));
}
}

View File

@@ -10,33 +10,24 @@ import static org.whispersystems.textsecuregcm.metrics.MetricsUtil.name;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.identity.IdentityType;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
import org.whispersystems.textsecuregcm.util.Util;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BatchGetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.CancellationReason;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.KeysAndAttributes;
import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
import software.amazon.awssdk.services.dynamodb.model.ReturnValue;
import software.amazon.awssdk.services.dynamodb.model.ReturnValuesOnConditionCheckFailure;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException;
@@ -93,7 +84,7 @@ public class PhoneNumberIdentifiers {
* UUID was not previously assigned as a PNI by {@link #getPhoneNumberIdentifier(String)}, the
* returned list will be empty.
*
* @param UUID a phone number identifier
* @param phoneNumberIdentifier a phone number identifier
* @return the list of all e164s associated with the given phone number identifier
*/
public CompletableFuture<List<String>> getPhoneNumber(final UUID phoneNumberIdentifier) {
@@ -110,12 +101,9 @@ public class PhoneNumberIdentifiers {
":pni", AttributeValues.fromUUID(phoneNumberIdentifier)
))
.build())
.thenApply(response -> {
return response.items().stream().map(item -> item.get(KEY_E164).s()).toList();
});
.thenApply(response -> response.items().stream().map(item -> item.get(KEY_E164).s()).toList());
}
@VisibleForTesting
static <T, E extends Exception> CompletableFuture<T> retry(
final int numRetries, final Class<E> exceptionToRetry, final Supplier<CompletableFuture<T>> supplier) {
@@ -256,4 +244,9 @@ public class PhoneNumberIdentifiers {
item -> AttributeValues.getUUID(item, ATTR_PHONE_NUMBER_IDENTIFIER, null))))
.whenComplete((ignored, throwable) -> sample.stop(GET_PNI_TIMER));
}
CompletableFuture<Void> regeneratePhoneNumberIdentifierMappings(final Account account) {
return setPni(account.getNumber(), Util.getAlternateForms(account.getNumber()), account.getIdentifier(IdentityType.PNI))
.thenRun(Util.NOOP);
}
}

View File

@@ -39,9 +39,9 @@ import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.metrics.MicrometerAwsSdkMetricPublisher;
import org.whispersystems.textsecuregcm.push.APNSender;
import org.whispersystems.textsecuregcm.push.FcmSender;
import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager;
import org.whispersystems.textsecuregcm.push.PushNotificationManager;
import org.whispersystems.textsecuregcm.push.PushNotificationScheduler;
import org.whispersystems.textsecuregcm.push.WebSocketConnectionEventManager;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClient;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
import org.whispersystems.textsecuregcm.securestorage.SecureStorageClient;
@@ -52,6 +52,7 @@ import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.ClientPublicKeys;
import org.whispersystems.textsecuregcm.storage.ClientPublicKeysManager;
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
import org.whispersystems.textsecuregcm.storage.DynamoDbRecoveryManager;
import org.whispersystems.textsecuregcm.storage.IssuedReceiptsManager;
import org.whispersystems.textsecuregcm.storage.KeysManager;
import org.whispersystems.textsecuregcm.storage.MessagesCache;
@@ -77,7 +78,6 @@ import software.amazon.awssdk.services.s3.S3AsyncClient;
* Construct utilities commonly used by worker commands
*/
record CommandDependencies(
Accounts accounts,
AccountsManager accountsManager,
ProfilesManager profilesManager,
ReportMessageManager reportMessageManager,
@@ -97,7 +97,8 @@ record CommandDependencies(
IssuedReceiptsManager issuedReceiptsManager,
DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
DynamoDbAsyncClient dynamoDbAsyncClient,
PhoneNumberIdentifiers phoneNumberIdentifiers) {
PhoneNumberIdentifiers phoneNumberIdentifiers,
DynamoDbRecoveryManager dynamoDbRecoveryManager) {
static CommandDependencies build(
final String name,
@@ -294,13 +295,15 @@ record CommandDependencies(
WebSocketConnectionEventManager webSocketConnectionEventManager =
new WebSocketConnectionEventManager(accountsManager, pushNotificationManager, messagesCluster, clientEventExecutor, asyncOperationQueueingExecutor);
final DynamoDbRecoveryManager dynamoDbRecoveryManager =
new DynamoDbRecoveryManager(accounts, phoneNumberIdentifiers);
environment.lifecycle().manage(apnSender);
environment.lifecycle().manage(disconnectionRequestManager);
environment.lifecycle().manage(webSocketConnectionEventManager);
environment.lifecycle().manage(new ManagedAwsCrt());
return new CommandDependencies(
accounts,
accountsManager,
profilesManager,
reportMessageManager,
@@ -320,7 +323,8 @@ record CommandDependencies(
issuedReceiptsManager,
dynamicConfigurationManager,
dynamoDbAsyncClient,
phoneNumberIdentifiers
phoneNumberIdentifiers,
dynamoDbRecoveryManager
);
}

View File

@@ -12,12 +12,12 @@ import java.time.Duration;
import net.sourceforge.argparse4j.inf.Subparser;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Accounts;
import org.whispersystems.textsecuregcm.storage.DynamoDbRecoveryManager;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
public class RegenerateAccountConstraintDataCommand extends AbstractSinglePassCrawlAccountsCommand {
public class RegenerateSecondaryDynamoDbTableDataCommand extends AbstractSinglePassCrawlAccountsCommand {
@VisibleForTesting
static final String DRY_RUN_ARGUMENT = "dry-run";
@@ -29,10 +29,10 @@ public class RegenerateAccountConstraintDataCommand extends AbstractSinglePassCr
static final String RETRIES_ARGUMENT = "retries";
private static final String PROCESSED_ACCOUNTS_COUNTER_NAME =
MetricsUtil.name(RegenerateAccountConstraintDataCommand.class, "processedAccounts");
MetricsUtil.name(RegenerateSecondaryDynamoDbTableDataCommand.class, "processedAccounts");
public RegenerateAccountConstraintDataCommand() {
super("regenerate-account-constraint-data", "Regenerates account constraint data from a core account table");
public RegenerateSecondaryDynamoDbTableDataCommand() {
super("regenerate-secondary-dynamodb-table-data", "Regenerates secondary DynamoDB table data from core tables");
}
@Override
@@ -65,7 +65,7 @@ public class RegenerateAccountConstraintDataCommand extends AbstractSinglePassCr
final int maxConcurrency = getNamespace().getInt(MAX_CONCURRENCY_ARGUMENT);
final int maxRetries = getNamespace().getInt(RETRIES_ARGUMENT);
final Accounts accounts = getCommandDependencies().accounts();
final DynamoDbRecoveryManager dynamoDbRecoveryManager = getCommandDependencies().dynamoDbRecoveryManager();
final Counter processedAccountsCounter = Metrics.counter(PROCESSED_ACCOUNTS_COUNTER_NAME,
"dryRun", String.valueOf(dryRun));
@@ -74,7 +74,7 @@ public class RegenerateAccountConstraintDataCommand extends AbstractSinglePassCr
.doOnNext(ignored -> processedAccountsCounter.increment())
.flatMap(account -> dryRun
? Mono.empty()
: Mono.fromFuture(() -> accounts.regenerateConstraints(account))
: Mono.fromFuture(() -> dynamoDbRecoveryManager.regenerateData(account))
.retryWhen(Retry.backoff(maxRetries, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(4))
.onRetryExhaustedThrow((spec, rs) -> rs.failure())),
maxConcurrency)