mirror of
https://github.com/signalapp/Signal-Server
synced 2026-04-20 23:48:06 +01:00
Add a pessimistic locking system for operations on recently-deleted account records
This commit is contained in:
committed by
Jon Chambers
parent
b757d4b334
commit
32a95f96ff
@@ -163,6 +163,11 @@ public class WhisperServerConfiguration extends Configuration {
|
||||
@JsonProperty
|
||||
private DeletedAccountsDynamoDbConfiguration deletedAccountsDynamoDb;
|
||||
|
||||
@Valid
|
||||
@NotNull
|
||||
@JsonProperty
|
||||
private DynamoDbConfiguration deletedAccountsLockDynamoDb;
|
||||
|
||||
@Valid
|
||||
@NotNull
|
||||
@JsonProperty
|
||||
@@ -391,6 +396,10 @@ public class WhisperServerConfiguration extends Configuration {
|
||||
return deletedAccountsDynamoDb;
|
||||
}
|
||||
|
||||
public DynamoDbConfiguration getDeletedAccountsLockDynamoDbConfiguration() {
|
||||
return deletedAccountsLockDynamoDb;
|
||||
}
|
||||
|
||||
public DatabaseConfiguration getAbuseDatabaseConfiguration() {
|
||||
return abuseDatabase;
|
||||
}
|
||||
|
||||
@@ -6,6 +6,10 @@ package org.whispersystems.textsecuregcm;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
|
||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
|
||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.jdbi3.strategies.DefaultNameStrategy;
|
||||
import com.fasterxml.jackson.annotation.JsonAutoDetect;
|
||||
@@ -153,6 +157,7 @@ import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.ActiveUserCounter;
|
||||
import org.whispersystems.textsecuregcm.storage.DeletedAccounts;
|
||||
import org.whispersystems.textsecuregcm.storage.DeletedAccountsDirectoryReconciler;
|
||||
import org.whispersystems.textsecuregcm.storage.DeletedAccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.DeletedAccountsTableCrawler;
|
||||
import org.whispersystems.textsecuregcm.storage.DirectoryReconciler;
|
||||
import org.whispersystems.textsecuregcm.storage.DirectoryReconciliationClient;
|
||||
@@ -347,6 +352,13 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
DynamoDbClient pendingDevicesDynamoDbClient = DynamoDbFromConfig.client(config.getPendingDevicesDynamoDbConfiguration(),
|
||||
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
|
||||
|
||||
AmazonDynamoDB deletedAccountsLockDynamoDbClient = AmazonDynamoDBClientBuilder.standard()
|
||||
.withRegion(config.getDeletedAccountsLockDynamoDbConfiguration().getRegion())
|
||||
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) config.getDeletedAccountsLockDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
|
||||
.withRequestTimeout((int) config.getDeletedAccountsLockDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
|
||||
.withCredentials(InstanceProfileCredentialsProvider.getInstance())
|
||||
.build();
|
||||
|
||||
DeletedAccounts deletedAccounts = new DeletedAccounts(deletedAccountsDynamoDbClient, config.getDeletedAccountsDynamoDbConfiguration().getTableName(), config.getDeletedAccountsDynamoDbConfiguration().getNeedsReconciliationIndexName());
|
||||
MigrationDeletedAccounts migrationDeletedAccounts = new MigrationDeletedAccounts(recentlyDeletedAccountsDynamoDb, config.getMigrationDeletedAccountsDynamoDbConfiguration().getTableName());
|
||||
MigrationRetryAccounts migrationRetryAccounts = new MigrationRetryAccounts(migrationRetryAccountsDynamoDb, config.getMigrationRetryAccountsDynamoDbConfiguration().getTableName());
|
||||
@@ -428,7 +440,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
PushLatencyManager pushLatencyManager = new PushLatencyManager(metricsCluster);
|
||||
ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, Metrics.globalRegistry);
|
||||
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, pushLatencyManager, reportMessageManager);
|
||||
AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster, deletedAccounts, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager, dynamicConfigurationManager);
|
||||
DeletedAccountsManager deletedAccountsManager = new DeletedAccountsManager(deletedAccounts, deletedAccountsLockDynamoDbClient, config.getDeletedAccountsLockDynamoDbConfiguration().getTableName());
|
||||
AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster, deletedAccountsManager, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager, dynamicConfigurationManager);
|
||||
RemoteConfigsManager remoteConfigsManager = new RemoteConfigsManager(remoteConfigs);
|
||||
DeadLetterHandler deadLetterHandler = new DeadLetterHandler(accountsManager, messagesManager);
|
||||
DispatchManager dispatchManager = new DispatchManager(pubSubClientFactory, Optional.of(deadLetterHandler));
|
||||
@@ -484,7 +497,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheCluster);
|
||||
AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners, config.getAccountDatabaseCrawlerConfiguration().getChunkSize(), config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs(), dynamicConfigurationManager);
|
||||
|
||||
DeletedAccountsTableCrawler deletedAccountsTableCrawler = new DeletedAccountsTableCrawler(deletedAccounts, deletedAccountsDirectoryReconcilers, cacheCluster, recurringJobExecutor);
|
||||
DeletedAccountsTableCrawler deletedAccountsTableCrawler = new DeletedAccountsTableCrawler(deletedAccountsManager, deletedAccountsDirectoryReconcilers, cacheCluster, recurringJobExecutor);
|
||||
MigrationRetryAccountsTableCrawler migrationRetryAccountsTableCrawler = new MigrationRetryAccountsTableCrawler(migrationRetryAccounts, accountsManager, accountsDynamoDb, cacheCluster, recurringJobExecutor);
|
||||
|
||||
apnSender.setApnFallbackManager(apnFallbackManager);
|
||||
|
||||
@@ -738,7 +738,7 @@ public class AccountController {
|
||||
@Timed
|
||||
@DELETE
|
||||
@Path("/me")
|
||||
public void deleteAccount(@Auth Account account) {
|
||||
public void deleteAccount(@Auth Account account) throws InterruptedException {
|
||||
accounts.delete(account, AccountsManager.DeletionReason.USER_REQUEST);
|
||||
}
|
||||
|
||||
|
||||
@@ -71,7 +71,7 @@ public class AccountsManager {
|
||||
private final Accounts accounts;
|
||||
private final AccountsDynamoDb accountsDynamoDb;
|
||||
private final FaultTolerantRedisCluster cacheCluster;
|
||||
private final DeletedAccounts deletedAccounts;
|
||||
private final DeletedAccountsManager deletedAccountsManager;
|
||||
private final DirectoryQueue directoryQueue;
|
||||
private final KeysDynamoDb keysDynamoDb;
|
||||
private final MessagesManager messagesManager;
|
||||
@@ -99,7 +99,7 @@ public class AccountsManager {
|
||||
}
|
||||
|
||||
public AccountsManager(Accounts accounts, AccountsDynamoDb accountsDynamoDb, FaultTolerantRedisCluster cacheCluster,
|
||||
final DeletedAccounts deletedAccounts,
|
||||
final DeletedAccountsManager deletedAccountsManager,
|
||||
final DirectoryQueue directoryQueue,
|
||||
final KeysDynamoDb keysDynamoDb, final MessagesManager messagesManager, final UsernamesManager usernamesManager,
|
||||
final ProfilesManager profilesManager, final SecureStorageClient secureStorageClient,
|
||||
@@ -109,7 +109,7 @@ public class AccountsManager {
|
||||
this.accounts = accounts;
|
||||
this.accountsDynamoDb = accountsDynamoDb;
|
||||
this.cacheCluster = cacheCluster;
|
||||
this.deletedAccounts = deletedAccounts;
|
||||
this.deletedAccountsManager = deletedAccountsManager;
|
||||
this.directoryQueue = directoryQueue;
|
||||
this.keysDynamoDb = keysDynamoDb;
|
||||
this.messagesManager = messagesManager;
|
||||
@@ -314,7 +314,7 @@ public class AccountsManager {
|
||||
return accountsDynamoDb.getAllFrom(uuid, length, maxPageSize);
|
||||
}
|
||||
|
||||
public void delete(final Account account, final DeletionReason deletionReason) {
|
||||
public void delete(final Account account, final DeletionReason deletionReason) throws InterruptedException {
|
||||
try (final Timer.Context ignored = deleteTimer.time()) {
|
||||
final CompletableFuture<Void> deleteStorageServiceDataFuture = secureStorageClient.deleteStoredData(account.getUuid());
|
||||
final CompletableFuture<Void> deleteBackupServiceDataFuture = secureBackupClient.deleteBackups(account.getUuid());
|
||||
@@ -340,9 +340,9 @@ public class AccountsManager {
|
||||
}
|
||||
}
|
||||
|
||||
deletedAccounts.put(account.getUuid(), account.getNumber());
|
||||
deletedAccountsManager.put(account.getUuid(), account.getNumber());
|
||||
|
||||
} catch (final Exception e) {
|
||||
} catch (final RuntimeException | InterruptedException e) {
|
||||
logger.warn("Failed to delete account", e);
|
||||
|
||||
Metrics.counter(DELETE_ERROR_COUNTER_NAME,
|
||||
|
||||
@@ -6,13 +6,24 @@ package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
import org.whispersystems.textsecuregcm.util.AttributeValues;
|
||||
import org.whispersystems.textsecuregcm.util.Pair;
|
||||
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
|
||||
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
|
||||
import software.amazon.awssdk.services.dynamodb.model.BatchGetItemRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.BatchGetItemResponse;
|
||||
import software.amazon.awssdk.services.dynamodb.model.KeysAndAttributes;
|
||||
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
|
||||
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
|
||||
@@ -27,6 +38,9 @@ public class DeletedAccounts extends AbstractDynamoDbStore {
|
||||
|
||||
static final Duration TIME_TO_LIVE = Duration.ofDays(30);
|
||||
|
||||
// Note that this limit is imposed by DynamoDB itself; going above 100 will result in errors
|
||||
static final int GET_BATCH_SIZE = 100;
|
||||
|
||||
private final String tableName;
|
||||
private final String needsReconciliationIndexName;
|
||||
|
||||
@@ -37,7 +51,7 @@ public class DeletedAccounts extends AbstractDynamoDbStore {
|
||||
this.needsReconciliationIndexName = needsReconciliationIndexName;
|
||||
}
|
||||
|
||||
public void put(UUID uuid, String e164) {
|
||||
void put(UUID uuid, String e164) {
|
||||
db().putItem(PutItemRequest.builder()
|
||||
.tableName(tableName)
|
||||
.item(Map.of(
|
||||
@@ -48,7 +62,7 @@ public class DeletedAccounts extends AbstractDynamoDbStore {
|
||||
.build());
|
||||
}
|
||||
|
||||
public List<Pair<UUID, String>> listAccountsToReconcile(final int max) {
|
||||
List<Pair<UUID, String>> listAccountsToReconcile(final int max) {
|
||||
|
||||
final ScanRequest scanRequest = ScanRequest.builder()
|
||||
.tableName(tableName)
|
||||
@@ -64,7 +78,42 @@ public class DeletedAccounts extends AbstractDynamoDbStore {
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public void markReconciled(final List<String> phoneNumbersReconciled) {
|
||||
Set<String> getAccountsNeedingReconciliation(final Collection<String> e164s) {
|
||||
final Queue<Map<String, AttributeValue>> pendingKeys = e164s.stream()
|
||||
.map(e164 -> Map.of(KEY_ACCOUNT_E164, AttributeValues.fromString(e164)))
|
||||
.collect(Collectors.toCollection(() -> new ArrayDeque<>(e164s.size())));
|
||||
|
||||
final Set<String> accountsNeedingReconciliation = new HashSet<>(e164s.size());
|
||||
final List<Map<String, AttributeValue>> batchKeys = new ArrayList<>(GET_BATCH_SIZE);
|
||||
|
||||
while (!pendingKeys.isEmpty()) {
|
||||
batchKeys.clear();
|
||||
|
||||
for (int i = 0; i < GET_BATCH_SIZE && !pendingKeys.isEmpty(); i++) {
|
||||
batchKeys.add(pendingKeys.remove());
|
||||
}
|
||||
|
||||
final BatchGetItemResponse response = db().batchGetItem(BatchGetItemRequest.builder()
|
||||
.requestItems(Map.of(tableName, KeysAndAttributes.builder()
|
||||
.consistentRead(true)
|
||||
.keys(batchKeys)
|
||||
.build()))
|
||||
.build());
|
||||
|
||||
response.responses().getOrDefault(tableName, Collections.emptyList()).stream()
|
||||
.filter(attributes -> AttributeValues.getInt(attributes, ATTR_NEEDS_CDS_RECONCILIATION, 0) == 1)
|
||||
.map(attributes -> AttributeValues.getString(attributes, KEY_ACCOUNT_E164, null))
|
||||
.forEach(accountsNeedingReconciliation::add);
|
||||
|
||||
if (response.hasUnprocessedKeys() && response.unprocessedKeys().containsKey(tableName)) {
|
||||
pendingKeys.addAll(response.unprocessedKeys().get(tableName).keys());
|
||||
}
|
||||
}
|
||||
|
||||
return accountsNeedingReconciliation;
|
||||
}
|
||||
|
||||
void markReconciled(final Collection<String> phoneNumbersReconciled) {
|
||||
|
||||
phoneNumbersReconciled.forEach(number -> db().updateItem(
|
||||
UpdateItemRequest.builder()
|
||||
|
||||
@@ -0,0 +1,120 @@
|
||||
/*
|
||||
* Copyright 2013-2021 Signal Messenger, LLC
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import com.amazonaws.services.dynamodbv2.AcquireLockOptions;
|
||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
|
||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
|
||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions;
|
||||
import com.amazonaws.services.dynamodbv2.LockItem;
|
||||
import com.amazonaws.services.dynamodbv2.ReleaseLockOptions;
|
||||
import com.amazonaws.services.dynamodbv2.model.LockCurrentlyUnavailableException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.util.Pair;
|
||||
|
||||
public class DeletedAccountsManager {
|
||||
|
||||
private final DeletedAccounts deletedAccounts;
|
||||
|
||||
private final AmazonDynamoDBLockClient lockClient;
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(DeletedAccountsManager.class);
|
||||
|
||||
@FunctionalInterface
|
||||
public interface DeletedAccountReconciliationConsumer {
|
||||
|
||||
/**
|
||||
* Reconcile a list of deleted account records.
|
||||
*
|
||||
* @param deletedAccounts the account records to reconcile
|
||||
* @return a list of account records that were successfully reconciled; accounts that were not successfully
|
||||
* reconciled may be retried later
|
||||
* @throws ChunkProcessingFailedException in the event of an error while processing the batch of account records
|
||||
*/
|
||||
Collection<String> reconcile(List<Pair<UUID, String>> deletedAccounts) throws ChunkProcessingFailedException;
|
||||
}
|
||||
|
||||
public DeletedAccountsManager(final DeletedAccounts deletedAccounts, final AmazonDynamoDB lockDynamoDb, final String lockTableName) {
|
||||
this.deletedAccounts = deletedAccounts;
|
||||
|
||||
lockClient = new AmazonDynamoDBLockClient(
|
||||
AmazonDynamoDBLockClientOptions.builder(lockDynamoDb, lockTableName)
|
||||
.withPartitionKeyName(DeletedAccounts.KEY_ACCOUNT_E164)
|
||||
.withLeaseDuration(15L)
|
||||
.withHeartbeatPeriod(2L)
|
||||
.withTimeUnit(TimeUnit.SECONDS)
|
||||
.withCreateHeartbeatBackgroundThread(true)
|
||||
.build());
|
||||
}
|
||||
|
||||
public void put(final UUID uuid, final String e164) throws InterruptedException {
|
||||
withLock(e164, () -> deletedAccounts.put(uuid, e164));
|
||||
}
|
||||
|
||||
private void withLock(final String e164, final Runnable task) throws InterruptedException {
|
||||
final LockItem lockItem = lockClient.acquireLock(AcquireLockOptions.builder(e164)
|
||||
.withAcquireReleasedLocksConsistently(true)
|
||||
.build());
|
||||
|
||||
try {
|
||||
task.run();
|
||||
} finally {
|
||||
lockClient.releaseLock(ReleaseLockOptions.builder(lockItem)
|
||||
.withBestEffort(true)
|
||||
.build());
|
||||
}
|
||||
}
|
||||
|
||||
public void lockAndReconcileAccounts(final int max, final DeletedAccountReconciliationConsumer consumer) throws ChunkProcessingFailedException {
|
||||
final List<LockItem> lockItems = new ArrayList<>();
|
||||
final List<Pair<UUID, String>> reconciliationCandidates = deletedAccounts.listAccountsToReconcile(max).stream()
|
||||
.filter(pair -> {
|
||||
boolean lockAcquired = false;
|
||||
|
||||
try {
|
||||
lockItems.add(lockClient.acquireLock(AcquireLockOptions.builder(pair.second())
|
||||
.withAcquireReleasedLocksConsistently(true)
|
||||
.withShouldSkipBlockingWait(true)
|
||||
.build()));
|
||||
|
||||
lockAcquired = true;
|
||||
} catch (final InterruptedException e) {
|
||||
log.warn("Interrupted while acquiring lock for reconciliation", e);
|
||||
} catch (final LockCurrentlyUnavailableException ignored) {
|
||||
}
|
||||
|
||||
return lockAcquired;
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
|
||||
assert lockItems.size() == reconciliationCandidates.size();
|
||||
|
||||
// A deleted account's status may have changed in the time between getting a list of candidates and acquiring a lock
|
||||
// on the candidate records. Now that we hold the lock, check which of the candidates still need to be reconciled.
|
||||
final Set<String> numbersNeedingReconciliationAfterLock =
|
||||
deletedAccounts.getAccountsNeedingReconciliation(reconciliationCandidates.stream()
|
||||
.map(Pair::second)
|
||||
.collect(Collectors.toList()));
|
||||
|
||||
final List<Pair<UUID, String>> accountsToReconcile = reconciliationCandidates.stream()
|
||||
.filter(candidate -> numbersNeedingReconciliationAfterLock.contains(candidate.second()))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
try {
|
||||
deletedAccounts.markReconciled(consumer.reconcile(accountsToReconcile));
|
||||
} finally {
|
||||
lockItems.forEach(lockItem -> lockClient.releaseLock(ReleaseLockOptions.builder(lockItem).withBestEffort(true).build()));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -6,7 +6,6 @@ package org.whispersystems.textsecuregcm.storage;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
import io.micrometer.core.instrument.DistributionSummary;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
@@ -27,42 +26,41 @@ public class DeletedAccountsTableCrawler extends ManagedPeriodicWork {
|
||||
private static final int MAX_BATCH_SIZE = 5_000;
|
||||
private static final String BATCH_SIZE_DISTRIBUTION_NAME = name(DeletedAccountsTableCrawler.class, "batchSize");
|
||||
|
||||
private final DeletedAccounts deletedAccounts;
|
||||
private final DeletedAccountsManager deletedAccountsManager;
|
||||
private final List<DeletedAccountsDirectoryReconciler> reconcilers;
|
||||
|
||||
public DeletedAccountsTableCrawler(
|
||||
final DeletedAccounts deletedAccounts,
|
||||
final DeletedAccountsManager deletedAccountsManager,
|
||||
final List<DeletedAccountsDirectoryReconciler> reconcilers,
|
||||
final FaultTolerantRedisCluster cluster,
|
||||
final ScheduledExecutorService executorService) throws IOException {
|
||||
|
||||
super(new ManagedPeriodicWorkLock(ACTIVE_WORKER_KEY, cluster), WORKER_TTL, RUN_INTERVAL, executorService);
|
||||
|
||||
this.deletedAccounts = deletedAccounts;
|
||||
this.deletedAccountsManager = deletedAccountsManager;
|
||||
this.reconcilers = reconcilers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doPeriodicWork() throws Exception {
|
||||
|
||||
final List<Pair<UUID, String>> deletedAccounts = this.deletedAccounts.listAccountsToReconcile(MAX_BATCH_SIZE);
|
||||
deletedAccountsManager.lockAndReconcileAccounts(MAX_BATCH_SIZE, deletedAccounts -> {
|
||||
final List<User> deletedUsers = deletedAccounts.stream()
|
||||
.map(pair -> new User(pair.first(), pair.second()))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
final List<User> deletedUsers = deletedAccounts.stream()
|
||||
.map(pair -> new User(pair.first(), pair.second()))
|
||||
.collect(Collectors.toList());
|
||||
for (DeletedAccountsDirectoryReconciler reconciler : reconcilers) {
|
||||
reconciler.onCrawlChunk(deletedUsers);
|
||||
}
|
||||
|
||||
for (DeletedAccountsDirectoryReconciler reconciler : reconcilers) {
|
||||
reconciler.onCrawlChunk(deletedUsers);
|
||||
}
|
||||
final List<String> reconciledPhoneNumbers = deletedAccounts.stream()
|
||||
.map(Pair::second)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
final List<String> reconciledPhoneNumbers = deletedAccounts.stream()
|
||||
.map(Pair::second)
|
||||
.collect(Collectors.toList());
|
||||
Metrics.summary(BATCH_SIZE_DISTRIBUTION_NAME).record(reconciledPhoneNumbers.size());
|
||||
|
||||
this.deletedAccounts.markReconciled(reconciledPhoneNumbers);
|
||||
|
||||
Metrics.summary(BATCH_SIZE_DISTRIBUTION_NAME)
|
||||
.record(reconciledPhoneNumbers.size());
|
||||
return reconciledPhoneNumbers;
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -7,6 +7,10 @@ package org.whispersystems.textsecuregcm.workers;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
import com.amazonaws.ClientConfiguration;
|
||||
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
|
||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
|
||||
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import io.dropwizard.Application;
|
||||
import io.dropwizard.cli.EnvironmentCommand;
|
||||
@@ -38,6 +42,7 @@ import org.whispersystems.textsecuregcm.storage.AccountsDynamoDb;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager.DeletionReason;
|
||||
import org.whispersystems.textsecuregcm.storage.DeletedAccounts;
|
||||
import org.whispersystems.textsecuregcm.storage.DeletedAccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.DynamicConfigurationManager;
|
||||
import org.whispersystems.textsecuregcm.storage.FaultTolerantDatabase;
|
||||
import org.whispersystems.textsecuregcm.storage.KeysDynamoDb;
|
||||
@@ -133,6 +138,13 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
|
||||
DynamoDbClient migrationRetryAccountsDynamoDb = DynamoDbFromConfig.client(configuration.getMigrationRetryAccountsDynamoDbConfiguration(),
|
||||
software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider.create());
|
||||
|
||||
AmazonDynamoDB deletedAccountsLockDynamoDbClient = AmazonDynamoDBClientBuilder.standard()
|
||||
.withRegion(configuration.getDeletedAccountsLockDynamoDbConfiguration().getRegion())
|
||||
.withClientConfiguration(new ClientConfiguration().withClientExecutionTimeout(((int) configuration.getDeletedAccountsLockDynamoDbConfiguration().getClientExecutionTimeout().toMillis()))
|
||||
.withRequestTimeout((int) configuration.getDeletedAccountsLockDynamoDbConfiguration().getClientRequestTimeout().toMillis()))
|
||||
.withCredentials(InstanceProfileCredentialsProvider.getInstance())
|
||||
.build();
|
||||
|
||||
DeletedAccounts deletedAccounts = new DeletedAccounts(deletedAccountsDynamoDbClient, configuration.getDeletedAccountsDynamoDbConfiguration().getTableName(), configuration.getDeletedAccountsDynamoDbConfiguration().getNeedsReconciliationIndexName());
|
||||
MigrationDeletedAccounts migrationDeletedAccounts = new MigrationDeletedAccounts(migrationDeletedAccountsDynamoDb, configuration.getMigrationDeletedAccountsDynamoDbConfiguration().getTableName());
|
||||
MigrationRetryAccounts migrationRetryAccounts = new MigrationRetryAccounts(migrationRetryAccountsDynamoDb, configuration.getMigrationRetryAccountsDynamoDbConfiguration().getTableName());
|
||||
@@ -157,7 +169,8 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
|
||||
ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(reportMessagesDynamoDb, configuration.getReportMessageDynamoDbConfiguration().getTableName());
|
||||
ReportMessageManager reportMessageManager = new ReportMessageManager(reportMessageDynamoDb, Metrics.globalRegistry);
|
||||
MessagesManager messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, pushLatencyManager, reportMessageManager);
|
||||
AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster, deletedAccounts, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager, dynamicConfigurationManager);
|
||||
DeletedAccountsManager deletedAccountsManager = new DeletedAccountsManager(deletedAccounts, deletedAccountsLockDynamoDbClient, configuration.getDeletedAccountsLockDynamoDbConfiguration().getTableName());
|
||||
AccountsManager accountsManager = new AccountsManager(accounts, accountsDynamoDb, cacheCluster, deletedAccountsManager, directoryQueue, keysDynamoDb, messagesManager, usernamesManager, profilesManager, secureStorageClient, secureBackupClient, experimentEnrollmentManager, dynamicConfigurationManager);
|
||||
|
||||
for (String user: users) {
|
||||
Optional<Account> account = accountsManager.get(user);
|
||||
|
||||
Reference in New Issue
Block a user