Add a command for regenerating account constraint tables

This commit is contained in:
Jon Chambers
2025-05-12 16:55:55 -04:00
committed by Jon Chambers
parent 9ec66dac7f
commit 43a534f05b
9 changed files with 446 additions and 5 deletions

View File

@@ -269,6 +269,7 @@ import org.whispersystems.textsecuregcm.workers.IdleDeviceNotificationSchedulerF
import org.whispersystems.textsecuregcm.workers.MessagePersisterServiceCommand;
import org.whispersystems.textsecuregcm.workers.NotifyIdleDevicesCommand;
import org.whispersystems.textsecuregcm.workers.ProcessScheduledJobsServiceCommand;
import org.whispersystems.textsecuregcm.workers.RegenerateAccountConstraintDataCommand;
import org.whispersystems.textsecuregcm.workers.RemoveExpiredAccountsCommand;
import org.whispersystems.textsecuregcm.workers.RemoveExpiredBackupsCommand;
import org.whispersystems.textsecuregcm.workers.RemoveExpiredLinkedDevicesCommand;
@@ -335,6 +336,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
bootstrap.addCommand(new ProcessScheduledJobsServiceCommand("process-idle-device-notification-jobs",
"Processes scheduled jobs to send notifications to idle devices",
new IdleDeviceNotificationSchedulerFactory()));
bootstrap.addCommand(new RegenerateAccountConstraintDataCommand());
}
@Override

View File

@@ -41,6 +41,7 @@ import org.apache.commons.lang3.StringUtils;
import org.signal.libsignal.zkgroup.backups.BackupCredentialType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.identity.IdentityType;
import org.whispersystems.textsecuregcm.util.AsyncTimerUtil;
import org.whispersystems.textsecuregcm.util.AttributeValues;
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
@@ -59,6 +60,7 @@ import software.amazon.awssdk.services.dynamodb.model.Delete;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
import software.amazon.awssdk.services.dynamodb.model.Put;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
import software.amazon.awssdk.services.dynamodb.model.ReturnValuesOnConditionCheckFailure;
@@ -89,7 +91,8 @@ public class Accounts {
static final List<String> ACCOUNT_FIELDS_TO_EXCLUDE_FROM_SERIALIZATION = List.of("uuid", "usernameLinkHandle");
private static final ObjectWriter ACCOUNT_DDB_JSON_WRITER = SystemMapper.jsonMapper()
@VisibleForTesting
static final ObjectWriter ACCOUNT_DDB_JSON_WRITER = SystemMapper.jsonMapper()
.writer(SystemMapper.excludingField(Account.class, ACCOUNT_FIELDS_TO_EXCLUDE_FROM_SERIALIZATION));
private static final Timer CREATE_TIMER = Metrics.timer(name(Accounts.class, "create"));
@@ -1404,8 +1407,7 @@ public class Accounts {
final String tableName,
final AttributeValue uuidAttr,
final String keyName,
final AttributeValue keyValue
) {
final AttributeValue keyValue) {
return TransactWriteItem.builder()
.put(Put.builder()
.tableName(tableName)
@@ -1470,6 +1472,68 @@ public class Accounts {
.build();
}
public CompletableFuture<Void> regenerateConstraints(final Account account) {
final List<CompletableFuture<?>> constraintFutures = new ArrayList<>();
constraintFutures.add(writeConstraint(phoneNumberConstraintTableName,
account.getIdentifier(IdentityType.ACI),
ATTR_ACCOUNT_E164,
AttributeValues.fromString(account.getNumber())));
constraintFutures.add(writeConstraint(phoneNumberIdentifierConstraintTableName,
account.getIdentifier(IdentityType.ACI),
ATTR_PNI_UUID,
AttributeValues.fromUUID(account.getPhoneNumberIdentifier())));
account.getUsernameHash().ifPresent(usernameHash ->
constraintFutures.add(writeUsernameConstraint(account.getIdentifier(IdentityType.ACI),
usernameHash,
Optional.empty())));
account.getUsernameHolds().forEach(usernameHold ->
constraintFutures.add(writeUsernameConstraint(account.getIdentifier(IdentityType.ACI),
usernameHold.usernameHash(),
Optional.of(Instant.ofEpochSecond(usernameHold.expirationSecs())))));
return CompletableFuture.allOf(constraintFutures.toArray(CompletableFuture[]::new));
}
private CompletableFuture<Void> writeConstraint(
final String tableName,
final UUID accountIdentifier,
final String keyName,
final AttributeValue keyValue) {
return dynamoDbAsyncClient.putItem(PutItemRequest.builder()
.tableName(tableName)
.item(Map.of(
keyName, keyValue,
KEY_ACCOUNT_UUID, AttributeValues.fromUUID(accountIdentifier)))
.build())
.thenRun(Util.NOOP);
}
private CompletableFuture<Void> writeUsernameConstraint(
final UUID accountIdentifier,
final byte[] usernameHash,
final Optional<Instant> maybeExpiration) {
final Map<String, AttributeValue> item = new HashMap<>(Map.of(
UsernameTable.KEY_USERNAME_HASH, AttributeValues.fromByteArray(usernameHash),
UsernameTable.ATTR_ACCOUNT_UUID, AttributeValues.fromUUID(accountIdentifier),
UsernameTable.ATTR_CONFIRMED, AttributeValues.fromBool(maybeExpiration.isEmpty())
));
maybeExpiration.ifPresent(expiration ->
item.put(UsernameTable.ATTR_TTL, AttributeValues.fromLong(expiration.getEpochSecond())));
return dynamoDbAsyncClient.putItem(PutItemRequest.builder()
.tableName(usernamesConstraintTableName)
.item(item)
.build())
.thenRun(Util.NOOP);
}
@Nonnull
private static String extractCancellationReasonCodes(final TransactionCanceledException exception) {
return exception.cancellationReasons().stream()

View File

@@ -75,6 +75,7 @@ import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
* Construct utilities commonly used by worker commands
*/
record CommandDependencies(
Accounts accounts,
AccountsManager accountsManager,
ProfilesManager profilesManager,
ReportMessageManager reportMessageManager,
@@ -290,6 +291,7 @@ record CommandDependencies(
environment.lifecycle().manage(new ManagedAwsCrt());
return new CommandDependencies(
accounts,
accountsManager,
profilesManager,
reportMessageManager,

View File

@@ -0,0 +1,84 @@
/*
* Copyright 2025 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.workers;
import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import java.time.Duration;
import net.sourceforge.argparse4j.inf.Subparser;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Accounts;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
public class RegenerateAccountConstraintDataCommand extends AbstractSinglePassCrawlAccountsCommand {
@VisibleForTesting
static final String DRY_RUN_ARGUMENT = "dry-run";
@VisibleForTesting
static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency";
@VisibleForTesting
static final String RETRIES_ARGUMENT = "retries";
private static final String PROCESSED_ACCOUNTS_COUNTER_NAME =
MetricsUtil.name(RegenerateAccountConstraintDataCommand.class, "processedAccounts");
public RegenerateAccountConstraintDataCommand() {
super("regenerate-account-constraint-data", "Regenerates account constraint data from a core account table");
}
@Override
public void configure(final Subparser subparser) {
super.configure(subparser);
subparser.addArgument("--dry-run")
.type(Boolean.class)
.dest(DRY_RUN_ARGUMENT)
.required(false)
.setDefault(true)
.help("If true, dont actually write constraint data");
subparser.addArgument("--max-concurrency")
.type(Integer.class)
.dest(MAX_CONCURRENCY_ARGUMENT)
.setDefault(16)
.help("Max concurrency for DynamoDB operations");
subparser.addArgument("--retries")
.type(Integer.class)
.dest(RETRIES_ARGUMENT)
.setDefault(8)
.help("Maximum number of DynamoDB retries permitted per account");
}
@Override
protected void crawlAccounts(final Flux<Account> accountRecords) {
final boolean dryRun = getNamespace().getBoolean(DRY_RUN_ARGUMENT);
final int maxConcurrency = getNamespace().getInt(MAX_CONCURRENCY_ARGUMENT);
final int maxRetries = getNamespace().getInt(RETRIES_ARGUMENT);
final Accounts accounts = getCommandDependencies().accounts();
final Counter processedAccountsCounter = Metrics.counter(PROCESSED_ACCOUNTS_COUNTER_NAME,
"dryRun", String.valueOf(dryRun));
accountRecords
.doOnNext(ignored -> processedAccountsCounter.increment())
.flatMap(account -> dryRun
? Mono.empty()
: Mono.fromFuture(() -> accounts.regenerateConstraints(account))
.retryWhen(Retry.backoff(maxRetries, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(4))
.onRetryExhaustedThrow((spec, rs) -> rs.failure())),
maxConcurrency)
.then()
.block();
}
}