mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-20 05:28:05 +01:00
Retire MigrateRegistrationRecoveryPasswordsCommand
This commit is contained in:
committed by
Jon Chambers
parent
96fb0ac3ae
commit
6967e4e54b
@@ -263,7 +263,6 @@ import org.whispersystems.textsecuregcm.workers.DeleteUserCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.IdleDeviceNotificationSchedulerFactory;
|
||||
import org.whispersystems.textsecuregcm.workers.MessagePersisterServiceCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.MigrateDeletedAccountsCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.MigrateRegistrationRecoveryPasswordsCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.NotifyIdleDevicesCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.ProcessScheduledJobsServiceCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.RemoveExpiredAccountsCommand;
|
||||
@@ -333,7 +332,6 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
new IdleDeviceNotificationSchedulerFactory()));
|
||||
|
||||
bootstrap.addCommand(new MigrateDeletedAccountsCommand());
|
||||
bootstrap.addCommand(new MigrateRegistrationRecoveryPasswordsCommand());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -16,22 +16,15 @@ import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import org.whispersystems.textsecuregcm.auth.SaltedTokenHash;
|
||||
import org.whispersystems.textsecuregcm.util.AttributeValues;
|
||||
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
|
||||
import org.whispersystems.textsecuregcm.util.Pair;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
import reactor.util.function.Tuple3;
|
||||
import reactor.util.function.Tuples;
|
||||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
|
||||
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
|
||||
import software.amazon.awssdk.services.dynamodb.model.Delete;
|
||||
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.Put;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
|
||||
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException;
|
||||
|
||||
public class RegistrationRecoveryPasswords {
|
||||
|
||||
@@ -73,19 +66,6 @@ public class RegistrationRecoveryPasswords {
|
||||
.map(RegistrationRecoveryPasswords::saltedTokenHashFromItem));
|
||||
}
|
||||
|
||||
CompletableFuture<Optional<Pair<SaltedTokenHash, Long>>> lookupWithExpiration(final String key) {
|
||||
return asyncClient.getItem(GetItemRequest.builder()
|
||||
.tableName(tableName)
|
||||
.key(Map.of(KEY_E164, AttributeValues.fromString(key)))
|
||||
.consistentRead(true)
|
||||
.build())
|
||||
.thenApply(getItemResponse -> Optional.ofNullable(getItemResponse.item())
|
||||
.filter(item -> item.containsKey(ATTR_SALT))
|
||||
.filter(item -> item.containsKey(ATTR_HASH))
|
||||
.filter(item -> item.containsKey(ATTR_EXP))
|
||||
.map(item -> new Pair<>(saltedTokenHashFromItem(item), Long.parseLong(item.get(ATTR_EXP).n()))));
|
||||
}
|
||||
|
||||
public CompletableFuture<Optional<SaltedTokenHash>> lookup(final UUID phoneNumberIdentifier) {
|
||||
return lookup(phoneNumberIdentifier.toString());
|
||||
}
|
||||
@@ -141,95 +121,6 @@ public class RegistrationRecoveryPasswords {
|
||||
return clock.instant().plus(expiration).getEpochSecond();
|
||||
}
|
||||
|
||||
public Flux<Tuple3<String, SaltedTokenHash, Long>> getE164AssociatedRegistrationRecoveryPasswords(final int segments, final Scheduler scheduler) {
|
||||
if (segments < 1) {
|
||||
throw new IllegalArgumentException("Total number of segments must be positive");
|
||||
}
|
||||
|
||||
return Flux.range(0, segments)
|
||||
.parallel()
|
||||
.runOn(scheduler)
|
||||
.flatMap(segment -> asyncClient.scanPaginator(ScanRequest.builder()
|
||||
.tableName(tableName)
|
||||
.consistentRead(true)
|
||||
.segment(segment)
|
||||
.totalSegments(segments)
|
||||
.filterExpression("begins_with(#key, :e164Prefix)")
|
||||
.expressionAttributeNames(Map.of("#key", KEY_E164))
|
||||
.expressionAttributeValues(Map.of(":e164Prefix", AttributeValue.fromS("+")))
|
||||
.build())
|
||||
.items()
|
||||
.map(item -> Tuples.of(item.get(KEY_E164).s(), saltedTokenHashFromItem(item), Long.parseLong(item.get(ATTR_EXP).n()))))
|
||||
.sequential();
|
||||
}
|
||||
|
||||
public CompletableFuture<Boolean> insertPniRecord(final String phoneNumber,
|
||||
final UUID phoneNumberIdentifier,
|
||||
final SaltedTokenHash saltedTokenHash,
|
||||
final long expirationSeconds) {
|
||||
|
||||
// We try to write both the old and new record inside a transaction, but with different conditions. For the
|
||||
// E164-based record, we insist that the record be entirely unchanged. This prevents us from writing an out-of-sync
|
||||
// record if we read one thing in the `Scan` pass, but then somebody updated the record before we tried to write
|
||||
// the PNI-based record. We refresh and retry if this happens.
|
||||
//
|
||||
// For the PNI-based record, we only want to write the record if one doesn't already exist for the given PNI. If one
|
||||
// already exists, we'll just leave it alone.
|
||||
return asyncClient.transactWriteItems(TransactWriteItemsRequest.builder()
|
||||
.transactItems(
|
||||
TransactWriteItem.builder()
|
||||
.put(Put.builder()
|
||||
.tableName(tableName)
|
||||
.item(Map.of(
|
||||
KEY_E164, AttributeValues.fromString(phoneNumber),
|
||||
ATTR_EXP, AttributeValues.fromLong(expirationSeconds),
|
||||
ATTR_SALT, AttributeValues.fromString(saltedTokenHash.salt()),
|
||||
ATTR_HASH, AttributeValues.fromString(saltedTokenHash.hash())))
|
||||
.conditionExpression("#key = :key AND #expiration = :expiration AND #salt = :salt AND #hash = :hash")
|
||||
.expressionAttributeNames(Map.of(
|
||||
"#key", KEY_E164,
|
||||
"#expiration", ATTR_EXP,
|
||||
"#salt", ATTR_SALT,
|
||||
"#hash", ATTR_HASH))
|
||||
.expressionAttributeValues(Map.of(
|
||||
":key", AttributeValues.fromString(phoneNumber),
|
||||
":expiration", AttributeValues.fromLong(expirationSeconds),
|
||||
":salt", AttributeValues.fromString(saltedTokenHash.salt()),
|
||||
":hash", AttributeValues.fromString(saltedTokenHash.hash())))
|
||||
.build())
|
||||
.build(),
|
||||
|
||||
TransactWriteItem.builder()
|
||||
.put(Put.builder()
|
||||
.tableName(tableName)
|
||||
.item(Map.of(
|
||||
KEY_E164, AttributeValues.fromString(phoneNumberIdentifier.toString()),
|
||||
ATTR_EXP, AttributeValues.fromLong(expirationSeconds),
|
||||
ATTR_SALT, AttributeValues.fromString(saltedTokenHash.salt()),
|
||||
ATTR_HASH, AttributeValues.fromString(saltedTokenHash.hash())))
|
||||
.conditionExpression("attribute_not_exists(#key)")
|
||||
.expressionAttributeNames(Map.of("#key", KEY_E164))
|
||||
.build())
|
||||
.build())
|
||||
.build())
|
||||
.thenApply(ignored -> true)
|
||||
.exceptionally(throwable -> {
|
||||
if (ExceptionUtils.unwrap(throwable) instanceof TransactionCanceledException transactionCanceledException) {
|
||||
if ("ConditionalCheckFailed".equals(transactionCanceledException.cancellationReasons().get(1).code())) {
|
||||
// A PNI-associated record has already been stored; we can just treat this as success
|
||||
return false;
|
||||
}
|
||||
|
||||
if ("ConditionalCheckFailed".equals(transactionCanceledException.cancellationReasons().get(0).code())) {
|
||||
// No PNI-associated record is present, but the original record has changed
|
||||
throw new ContestedOptimisticLockException();
|
||||
}
|
||||
}
|
||||
|
||||
throw ExceptionUtils.wrap(throwable);
|
||||
});
|
||||
}
|
||||
|
||||
private static SaltedTokenHash saltedTokenHashFromItem(final Map<String, AttributeValue> item) {
|
||||
return new SaltedTokenHash(item.get(ATTR_HASH).s(), item.get(ATTR_SALT).s());
|
||||
}
|
||||
|
||||
@@ -10,15 +10,10 @@ import static java.util.Objects.requireNonNull;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.HexFormat;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.auth.SaltedTokenHash;
|
||||
import org.whispersystems.textsecuregcm.util.ExceptionUtils;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
import reactor.util.function.Tuple3;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
|
||||
|
||||
public class RegistrationRecoveryPasswordsManager {
|
||||
@@ -72,42 +67,6 @@ public class RegistrationRecoveryPasswordsManager {
|
||||
}));
|
||||
}
|
||||
|
||||
public Flux<Tuple3<String, SaltedTokenHash, Long>> getE164AssociatedRegistrationRecoveryPasswords(final int segments, final Scheduler scheduler) {
|
||||
return registrationRecoveryPasswords.getE164AssociatedRegistrationRecoveryPasswords(segments, scheduler);
|
||||
}
|
||||
|
||||
public CompletableFuture<Boolean> migrateE164Record(final String number, final SaltedTokenHash saltedTokenHash, final long expirationSeconds) {
|
||||
return phoneNumberIdentifiers.getPhoneNumberIdentifier(number)
|
||||
.thenCompose(phoneNumberIdentifier -> migrateE164Record(number, phoneNumberIdentifier, saltedTokenHash, expirationSeconds, 10));
|
||||
}
|
||||
|
||||
public CompletableFuture<Boolean> migrateE164Record(final String number,
|
||||
final UUID phoneNumberIdentifier,
|
||||
final SaltedTokenHash saltedTokenHash,
|
||||
final long expirationSeconds,
|
||||
final int remainingAttempts) {
|
||||
|
||||
if (remainingAttempts <= 0) {
|
||||
return CompletableFuture.failedFuture(new ContestedOptimisticLockException());
|
||||
}
|
||||
|
||||
return registrationRecoveryPasswords.insertPniRecord(number, phoneNumberIdentifier, saltedTokenHash, expirationSeconds)
|
||||
.exceptionallyCompose(throwable -> {
|
||||
if (ExceptionUtils.unwrap(throwable) instanceof ContestedOptimisticLockException) {
|
||||
// Something about the original record changed; refresh and retry
|
||||
return registrationRecoveryPasswords.lookupWithExpiration(number)
|
||||
.thenCompose(maybePair -> maybePair
|
||||
.map(pair -> migrateE164Record(number, phoneNumberIdentifier, pair.first(), pair.second(), remainingAttempts - 1))
|
||||
.orElseGet(() -> {
|
||||
// The original record was deleted, and we can declare victory
|
||||
return CompletableFuture.completedFuture(false);
|
||||
}));
|
||||
}
|
||||
|
||||
return CompletableFuture.failedFuture(throwable);
|
||||
});
|
||||
}
|
||||
|
||||
private static String bytesToString(final byte[] bytes) {
|
||||
return HexFormat.of().formatHex(bytes);
|
||||
}
|
||||
|
||||
@@ -1,134 +0,0 @@
|
||||
/*
|
||||
* Copyright 2024 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.workers;
|
||||
|
||||
import io.dropwizard.core.Application;
|
||||
import io.dropwizard.core.setup.Environment;
|
||||
import io.micrometer.core.instrument.Counter;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
import net.sourceforge.argparse4j.inf.Namespace;
|
||||
import net.sourceforge.argparse4j.inf.Subparser;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
|
||||
import org.whispersystems.textsecuregcm.auth.SaltedTokenHash;
|
||||
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
||||
import org.whispersystems.textsecuregcm.storage.RegistrationRecoveryPasswordsManager;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
import reactor.util.function.Tuple3;
|
||||
import reactor.util.retry.Retry;
|
||||
|
||||
public class MigrateRegistrationRecoveryPasswordsCommand extends AbstractCommandWithDependencies {
|
||||
|
||||
private static final String DRY_RUN_ARGUMENT = "dry-run";
|
||||
private static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency";
|
||||
private static final String SEGMENTS_ARGUMENT = "segments";
|
||||
private static final String BUFFER_ARGUMENT = "buffer";
|
||||
|
||||
private static final String RECORDS_INSPECTED_COUNTER_NAME =
|
||||
MetricsUtil.name(MigrateRegistrationRecoveryPasswordsCommand.class, "recordsInspected");
|
||||
|
||||
private static final String RECORDS_MIGRATED_COUNTER_NAME =
|
||||
MetricsUtil.name(MigrateRegistrationRecoveryPasswordsCommand.class, "recordsMigrated");
|
||||
|
||||
private static final String DRY_RUN_TAG = "dryRun";
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(MigrateRegistrationRecoveryPasswordsCommand.class);
|
||||
|
||||
public MigrateRegistrationRecoveryPasswordsCommand() {
|
||||
|
||||
super(new Application<>() {
|
||||
@Override
|
||||
public void run(final WhisperServerConfiguration configuration, final Environment environment) {
|
||||
}
|
||||
}, "migrate-registration-recovery-passwords", "Migrate e164-based registration recovery passwords to PNI-based records");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(final Subparser subparser) {
|
||||
super.configure(subparser);
|
||||
|
||||
subparser.addArgument("--dry-run")
|
||||
.type(Boolean.class)
|
||||
.dest(DRY_RUN_ARGUMENT)
|
||||
.required(false)
|
||||
.setDefault(true)
|
||||
.help("If true, don’t actually modify accounts with expired linked devices");
|
||||
|
||||
subparser.addArgument("--max-concurrency")
|
||||
.type(Integer.class)
|
||||
.dest(MAX_CONCURRENCY_ARGUMENT)
|
||||
.setDefault(16)
|
||||
.help("Max concurrency for DynamoDB operations");
|
||||
|
||||
subparser.addArgument("--segments")
|
||||
.type(Integer.class)
|
||||
.dest(SEGMENTS_ARGUMENT)
|
||||
.required(false)
|
||||
.setDefault(1)
|
||||
.help("The total number of segments for a DynamoDB scan");
|
||||
|
||||
subparser.addArgument("--buffer")
|
||||
.type(Integer.class)
|
||||
.dest(BUFFER_ARGUMENT)
|
||||
.setDefault(16_384)
|
||||
.help("Records to buffer");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void run(final Environment environment, final Namespace namespace,
|
||||
final WhisperServerConfiguration configuration, final CommandDependencies commandDependencies) throws Exception {
|
||||
|
||||
final boolean dryRun = namespace.getBoolean(DRY_RUN_ARGUMENT);
|
||||
final int maxConcurrency = namespace.getInt(MAX_CONCURRENCY_ARGUMENT);
|
||||
final int segments = namespace.getInt(SEGMENTS_ARGUMENT);
|
||||
final int bufferSize = namespace.getInt(BUFFER_ARGUMENT);
|
||||
|
||||
final Counter recordsInspectedCounter =
|
||||
Metrics.counter(RECORDS_INSPECTED_COUNTER_NAME, DRY_RUN_TAG, String.valueOf(dryRun));
|
||||
|
||||
final Counter recordsMigratedCounter =
|
||||
Metrics.counter(RECORDS_MIGRATED_COUNTER_NAME, DRY_RUN_TAG, String.valueOf(dryRun));
|
||||
|
||||
final RegistrationRecoveryPasswordsManager registrationRecoveryPasswordsManager =
|
||||
commandDependencies.registrationRecoveryPasswordsManager();
|
||||
|
||||
registrationRecoveryPasswordsManager.getE164AssociatedRegistrationRecoveryPasswords(segments, Schedulers.parallel())
|
||||
.buffer(bufferSize)
|
||||
.map(source -> {
|
||||
final List<Tuple3<String, SaltedTokenHash, Long>> shuffled = new ArrayList<>(source);
|
||||
Collections.shuffle(shuffled);
|
||||
return shuffled;
|
||||
})
|
||||
.limitRate(2)
|
||||
.flatMapIterable(Function.identity())
|
||||
.doOnNext(tuple -> recordsInspectedCounter.increment())
|
||||
.flatMap(tuple -> {
|
||||
final String e164 = tuple.getT1();
|
||||
final SaltedTokenHash saltedTokenHash = tuple.getT2();
|
||||
final long expiration = tuple.getT3();
|
||||
|
||||
return dryRun
|
||||
? Mono.just(false)
|
||||
: Mono.fromFuture(() -> registrationRecoveryPasswordsManager.migrateE164Record(e164, saltedTokenHash, expiration))
|
||||
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
|
||||
.onErrorResume(throwable -> {
|
||||
logger.warn("Failed to migrate record for {}", e164, throwable);
|
||||
return Mono.empty();
|
||||
});
|
||||
}, maxConcurrency)
|
||||
.filter(migrated -> migrated)
|
||||
.doOnNext(ignored -> recordsMigratedCounter.increment())
|
||||
.then()
|
||||
.block();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user